Skip to content
This repository was archived by the owner on Apr 8, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion projects/adapter/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions projects/adapter/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ posthog = "^1.4.5"
pandas = "^1.3.4"
"backports.functools_lru_cache" = "^1.6.4"
sqlalchemy = "^1.4.41"
connectorx = "^0.3.1"

# Adapters

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dbt.adapters.base.connections import AdapterResponse
from dbt.adapters.fal_experimental.adapter_support import new_connection
import pandas as pd
import connectorx as cx

# [bigquery] extras dependencies
import google.cloud.bigquery as bigquery
Expand All @@ -14,6 +15,15 @@ def read_relation_as_df(adapter: BaseAdapter, relation: BaseRelation) -> pd.Data

assert adapter.type() == "bigquery"

db_creds = adapter.config.credentials._db_creds
method = getattr(db_creds, 'method', None)

# Connectorx only supports service-account json authentication
if method and method.value == "service-account":
connection_str = f"bigquery://{db_creds.keyfile}"
df = cx.read_sql(connection_str, sql)
return df

with new_connection(adapter, "fal-bigquery:read_relation_as_df") as conn:

connection_manager: BaseConnectionManager = adapter.connections # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,50 @@

import pandas as pd
import sqlalchemy
import connectorx as cx

from dbt.adapters.base import BaseRelation
from dbt.adapters.base.connections import AdapterResponse
from dbt.adapters.fal_experimental.adapter_support import drop_relation_if_it_exists, new_connection
from dbt.adapters.postgres import PostgresAdapter
from dbt.adapters.postgres import PostgresAdapter, PostgresCredentials

def _generate_connection_string(credentials: PostgresCredentials) -> str:
username = credentials.user
password = credentials.password
host = credentials.host
port = credentials.port
database = credentials.database
connection_string = f"postgresql://{username}:{password}@{host}:{port}/{database}"
return connection_string

def read_relation_as_df(
adapter: PostgresAdapter, relation: BaseRelation
) -> pd.DataFrame:
assert adapter.type() == "postgres"

with new_connection(adapter, "fal-postgres:read_relation_as_df") as connection:
# If the given adapter supports the DBAPI (PEP 249), we can
# use its connection directly for the engine.
alchemy_engine = sqlalchemy.create_engine(
"postgresql+psycopg2://",
creator=lambda *args, **kwargs: connection.handle,
)

return pd.read_sql_table(
con=alchemy_engine,
table_name=relation.identifier,
schema=relation.schema,
)

db_creds = adapter.config.credentials._db_creds

# Profiles with properties 'sslmode' or 'role' are handled the old way
# connectorx doesn't handle custom SSL settting or connection role
if getattr(db_creds, 'sslmode', None) or getattr(db_creds, 'role', None):
with new_connection(adapter, "fal-postgres:read_relation_as_df") as connection:
# If the given adapter supports the DBAPI (PEP 249), we can
# use its connection directly for the engine.
alchemy_engine = sqlalchemy.create_engine(
"postgresql+psycopg2://",
creator=lambda *args, **kwargs: connection.handle,
)
return pd.read_sql_table(
con=alchemy_engine,
table_name=relation.identifier,
schema=relation.schema,
)

# Other profiles are handled with connectorx
connection_str = _generate_connection_string(adapter.config.credentials._db_creds)
sql = f"SELECT * FROM {relation}"
df = cx.read_sql(connection_str, sql)

return df

def write_df_to_relation(
adapter: PostgresAdapter,
Expand Down