Skip to content

Commit

Permalink
Update dbt_build_and_load_turso.py (#3)
Browse files Browse the repository at this point in the history
- remove dependency on libsql-experimental
- shift loading strategy to use libsql remote protocol
  • Loading branch information
ndrewwm authored Dec 29, 2024
1 parent 34f03aa commit 87e2a7b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 26 deletions.
84 changes: 59 additions & 25 deletions flows/dbt_build_and_load_turso.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import os
import sqlite3

import requests
import duckdb
import libsql_experimental as libsql
from dbt.cli.main import dbtRunner
from prefect import flow, get_run_logger, task
from prefect.futures import PrefectFuture
from prefect.blocks.system import Secret
from prefect.client.schemas.schedules import CronSchedule

Expand Down Expand Up @@ -50,6 +51,15 @@ def pull_data(token: str) -> None:

duck = duckdb.connect(f"md:my_db?motherduck_token={token}")
duck.sql("attach 'spottmp.db' (type sqlite);")
duck.sql(
"""drop table if exists spottmp.dim_artist;
drop table if exists spottmp.dim_album;
drop table if exists spottmp.dim_track;
drop table if exists spottmp.fct_track_play;
drop table if exists spottmp.rpt_track_counts;
drop table if exists spottmp.rpt_artist_counts;
drop table if exists spottmp.rpt_discovery_rates;"""
)
duck.sql(
"create table spottmp.dim_artist as select * from my_db.spotify.dim_artist;"
)
Expand All @@ -71,16 +81,21 @@ def pull_data(token: str) -> None:


@task
def generate_ddl() -> tuple[list[str], list[str]]:
"""Dump the SQLite data to a .sql file, for execution in turso replica.
Returns the DDL for execution."""
def generate_ddl() -> None:
"""Dump the SQLite data to a .sql file, for execution against the turso db."""

get_run_logger().info("Dumping local sqlite statements...")
db = sqlite3.connect("spottmp.db")

with open("./dump.sql", mode="w") as file:
for line in db.iterdump():
file.write(f"{line}\n")
return


@task
def read_ddl() -> tuple[list[str], list[str]]:
"""Returns the DDL statements, ready for execution"""

creates = []
inserts = []
Expand All @@ -97,33 +112,52 @@ def generate_ddl() -> tuple[list[str], list[str]]:


@task
def load_turso(creates: list[str], inserts: list[str], creds: dict[str, str]) -> None:
"""Load the turso database."""
def turso_execute(stmt: str | list[str], creds: dict[str, str]):
"""Execute a statement against the turso database."""

if not isinstance(stmt, list):
stmt = [stmt]
statements = [{"type": "execute", "stmt": {"sql": statement}} for statement in stmt]
statements.append({"type": "close"})

url = f"{creds['url'].replace('libsql://', 'https://')}/v2/pipeline"
headers = {
"Authorization": f"Bearer {creds['token']}",
"Content-Type": "application/json",
}
payload = {"requests": statements}
req = requests.post(url=url, headers=headers, json=payload, timeout=60)
req.raise_for_status()
return req.json()

get_run_logger().info("Setting up turso replica...")
turso = libsql.connect(
"turso.db",
sync_url=creds["turso"]["url"],
auth_token=creds["turso"]["token"],
)

@flow
def turso_load(creds: dict[str, str]) -> None:
"""Load the turso database."""

logger = get_run_logger()
creates, inserts = read_ddl()
tables = [
statement.replace("CREATE TABLE ", "").split("(")[0] for statement in creates
]
for table in tables:
get_run_logger().info("Deleting %s...", table)
turso.execute(f"drop table if exists {table};")

for create in creates:
get_run_logger().info("Executing %s", create)
turso.execute(create)
logger.info("Dropping tables...")
turso_execute([f"drop table if exists {table}" for table in tables], creds)

logger.info("Creating tables...")
turso_execute(creates, creds)

futures: list[PrefectFuture] = []
for table in tables:
logger.info("Inserting data into %s...", table)
statements = [
insert for insert in inserts if f'INSERT INTO "{table}"' in insert
]
futures.append(turso_execute.submit(statements, creds))

get_run_logger().info("Inserting data...")
values = "".join(inserts)
turso.execute(values)
for future in futures:
future.wait()

turso.commit()
turso.sync()
return


Expand All @@ -134,8 +168,8 @@ def dbt_build_and_load_turso() -> None:
creds = get_credentials()
dbt_build(creds["motherduck"])
pull_data(creds["motherduck"])
creates, inserts = generate_ddl()
load_turso(creates, inserts, creds)
generate_ddl()
turso_load(creds["turso"])


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ duckdb==1.1.3
dbt-core==1.8.4
dbt-duckdb==1.8.2
fastapi==0.111.1
libsql-experimental==0.0.36

0 comments on commit 87e2a7b

Please sign in to comment.