Skip to content
This repository was archived by the owner on Apr 8, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions projects/fal/src/fal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
DbtSingularTest,
)
from fal.fal_script import Context, CurrentModel
from fal.serverless.runner import FalServerlessRunner
85 changes: 85 additions & 0 deletions projects/fal/src/fal/serverless/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from dataclasses import dataclass, field
import importlib
from pathlib import Path
from typing import List
from fal_serverless import isolated, sync_dir
import os


@isolated()
def serverless_runner(project_dir, profiles_dir, command, args: List[str] = []):
from dbt.cli.main import dbtRunner

runner = dbtRunner()

cli_args = [
command,
"--project-dir",
project_dir,
"--profiles-dir",
profiles_dir,
]
cli_args.extend(args)

runner.invoke(cli_args)


@dataclass
class FalServerlessRunner:
project_dir: str
profiles_dir: str
dbt_version: str = field(init=False)
data_project_dir: str = field(init=False)
data_profiles_dir: str = field(init=False)

def _sync_directories(self):
sync_dir(
self.project_dir,
self.data_project_dir,
)
sync_dir(self.profiles_dir, self.data_profiles_dir)

def __post_init__(self):
from dbt.cli.main import dbtRunner

cli_args = [
"--project-dir",
self.project_dir,
"--profiles-dir",
self.profiles_dir,
]
runner = dbtRunner()
metadata = runner.invoke(["parse"] + cli_args).result.metadata
plugin_name = metadata.adapter_type

try:
mod = importlib.import_module(f"dbt.adapters.{plugin_name}.__version__")
except ImportError:
raise ValueError(
f"Could not determine which adapter version is being used: {plugin_name}"
)

# easiest way to get the version, there might be better ways in the future
dbt_version = f"dbt-{plugin_name}=={mod.version}"
Comment thread
turbo1912 marked this conversation as resolved.

self.data_project_dir = str(Path("/data") / os.path.basename(self.project_dir))
self.data_profiles_dir = str(
Path("/data") / os.path.basename(self.profiles_dir)
)
self.dbt_version = dbt_version
Comment thread
turbo1912 marked this conversation as resolved.

def seed(self, args: List[str] = [], sync: bool = True):
if sync:
self._sync_directories()

serverless_runner.on(requirements=[self.dbt_version])(
self.data_project_dir, self.data_profiles_dir, "seed", args
)

def run(self, args: List[str] = [], sync: bool = True):
if sync:
self._sync_directories()

serverless_runner.on(requirements=[self.dbt_version])(
self.data_project_dir, self.data_profiles_dir, "run", args
)