SDK around Tinybird APIs.
If you want to manage Workspaces, Data Sources and Pipes you might be looking for the tinybird-cli.
The SDK is meant to programatically ingest NDJSON
data or send any request to an API
instance.
It contains handlers for:
- logging events to a Tinybird Data Source from your Python module.
- logging events from litellm to a Tinybird Data Source.
from tb.datasource import Datasource
with Datasource(datasource_name, tinybird_token) as ds:
ds << {'key': 'value', 'key1': 'value1'}
You can also use the async version:
from tb.a.datasource import AsyncDatasource
async with AsyncDatasource(datasource_name, tinybird_token, api_url='https://api.us-east.tinybird.co') as ds:
await ds << {'key': 'value', 'key1': 'value1'}
Notes:
- The
Datasource
object does some in-memory buffering and uses the events API. - It only supports
ndjson
data - It automatically handles Rate Limits
from tb.a.api import AsyncAPI
async with AsyncAPI(tinybird_token, api_url) as api:
await api.post('datasources',
params={
'name': 'datasource_name',
'mode': 'append',
'format': 'ndjson',
'url': 'https://storage.googleapis.com/davidm-wadus/events.ndjson',
}
)
- It automatically handles Rate Limits
- Works with any Tinybird API
- The
post
,get
,send
methods signatures are equivalent to the requests library.
import logging
from tb.logger import TinybirdLoggingHandler
from dotenv import load_dotenv
load_dotenv()
TB_API_URL = os.getenv("TINYBIRD_API_URL")
TB_WRITE_TOKEN = os.getenv("TINYBIRD_WRITE_TOKEN")
logger = logging.getLogger('your-logger-name')
handler = TinybirdLoggingHandler(TB_API_URL, TB_WRITE_TOKEN, 'your-app-name')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
Each time you call the logger an event to the tb_logs
DataSource in your Workspace is sent.
To configure the DataSource name initialize the TinybirdLogginHandler
like this:
handler = TinybirdLoggingHandler(TB_API_URL, TB_WRITE_TOKEN, 'your-app-name', ds_name="your_tb_ds_name")
If you want to avoid blocking the main thread you can use a queue to send the logs to a different thread.
import logging
from multiprocessing import Queue
from tb.logger import TinybirdLoggingQueueHandler
from dotenv import load_dotenv
load_dotenv()
TB_API_URL = os.getenv("TINYBIRD_API_URL")
TB_WRITE_TOKEN = os.getenv("TINYBIRD_WRITE_TOKEN")
logger = logging.getLogger('your-logger-name')
handler = TinybirdLoggingQueueHandler(Queue(-1), TB_API_URL, TB_WRITE_TOKEN, 'your-app-name', ds_name="your_tb_ds_name")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
Install the ai
extra:
pip install tinybird-python-sdk[ai]
Then use the following handler:
import litellm
from litellm import acompletion
from tb.litellm.handler import TinybirdLitellmAsyncHandler
customHandler = TinybirdLitellmAsyncHandler(
api_url="https://api.us-east.aws.tinybird.co",
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
datasource_name="litellm"
)
litellm.callbacks = [customHandler]
response = await acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True
)
This is the schema for the litellm
data source:
SCHEMA >
`model` LowCardinality(String) `json:$.model` DEFAULT 'unknown',
`messages` Array(Map(String, String)) `json:$.messages[:]` DEFAULT [],
`user` String `json:$.user` DEFAULT 'unknown',
`start_time` DateTime `json:$.start_time` DEFAULT now(),
`end_time` DateTime `json:$.end_time` DEFAULT now(),
`id` String `json:$.id` DEFAULT '',
`stream` Boolean `json:$.stream` DEFAULT false,
`call_type` LowCardinality(String) `json:$.call_type` DEFAULT 'unknown',
`provider` LowCardinality(String) `json:$.provider` DEFAULT 'unknown',
`api_key` String `json:$.api_key` DEFAULT '',
`log_event_type` LowCardinality(String) `json:$.log_event_type` DEFAULT 'unknown',
`llm_api_duration_ms` Float32 `json:$.llm_api_duration_ms` DEFAULT 0,
`cache_hit` Boolean `json:$.cache_hit` DEFAULT false,
`response_status` LowCardinality(String) `json:$.standard_logging_object_status` DEFAULT 'unknown',
`response_time` Float32 `json:$.standard_logging_object_response_time` DEFAULT 0,
`proxy_metadata` String `json:$.proxy_metadata` DEFAULT '',
`organization` String `json:$.proxy_metadata.organization` DEFAULT '',
`environment` String `json:$.proxy_metadata.environment` DEFAULT '',
`project` String `json:$.proxy_metadata.project` DEFAULT '',
`chat_id` String `json:$.proxy_metadata.chat_id` DEFAULT '',
`response` String `json:$.response` DEFAULT '',
`response_id` String `json:$.response.id`,
`response_object` String `json:$.response.object` DEFAULT 'unknown',
`response_choices` Array(String) `json:$.response.choices[:]` DEFAULT [],
`completion_tokens` UInt16 `json:$.response.usage.completion_tokens` DEFAULT 0,
`prompt_tokens` UInt16 `json:$.response.usage.prompt_tokens` DEFAULT 0,
`total_tokens` UInt16 `json:$.response.usage.total_tokens` DEFAULT 0,
`cost` Float32 `json:$.cost` DEFAULT 0,
`exception` String `json:$.exception` DEFAULT '',
`traceback` String `json:$.traceback` DEFAULT '',
`duration` Float32 `json:$.duration` DEFAULT 0
ENGINE MergeTree
ENGINE_SORTING_KEY start_time, organization, project, model
ENGINE_PARTITION_KEY toYYYYMM(start_time)