diff --git a/.env.example b/.env.example index abe6b9d..0ebdf0e 100644 --- a/.env.example +++ b/.env.example @@ -14,4 +14,7 @@ SNOWFLAKE_ROLE= SNOWFLAKE_WAREHOUSE= SNOWFLAKE_DATABASE= SNOWFLAKE_SCHEMA= -SNOWFLAKE_PRIVATE_KEY_FILE_PATH='path to .p8' +# Path to the private key file (.p8 or .pem) - used by dev/prod targets +SNOWFLAKE_PRIVATE_KEY_FILE=/path/to/your/snowflake_key.p8 +# Private key content (PEM format with newlines) - used by test target and CI +# SNOWFLAKE_PRIVATE_KEY= diff --git a/.github/workflows/deploy-docs.yml b/.github/workflows/deploy-docs.yml index 5365c60..e525b9f 100644 --- a/.github/workflows/deploy-docs.yml +++ b/.github/workflows/deploy-docs.yml @@ -4,6 +4,9 @@ on: push: branches: - main + paths: + - 'docs/**' + - 'mkdocs.yml' workflow_dispatch: permissions: diff --git a/.github/workflows/pr_ci.yml b/.github/workflows/pr_ci.yml index 20904cf..8a19725 100644 --- a/.github/workflows/pr_ci.yml +++ b/.github/workflows/pr_ci.yml @@ -11,6 +11,14 @@ on: jobs: PR_CI: runs-on: ubuntu-latest + env: + SNOWFLAKE_ACCOUNT: ${{ vars.SNOWFLAKE_ACCOUNT }} + SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }} + SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }} + SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }} + SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }} + SNOWFLAKE_SCHEMA: ${{ vars.SNOWFLAKE_SCHEMA }} + SNOWFLAKE_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_PRIVATE_KEY }} steps: - name: Checkout code @@ -43,49 +51,16 @@ jobs: - name: Run dbt debug run: | - cd dbt_project && uv run dbt debug --target test - env: - SNOWFLAKE_ACCOUNT: ${{ vars.SNOWFLAKE_ACCOUNT }} - SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }} - SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }} - SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }} - SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }} - SNOWFLAKE_SCHEMA: ${{ vars.SNOWFLAKE_SCHEMA }} - SNOWFLAKE_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_PRIVATE_KEY }} + cd dbt_project && uv run dbt debug --target ci - name: Run dbt deps run: | - cd dbt_project && uv run dbt deps --target test - env: - SNOWFLAKE_ACCOUNT: ${{ vars.SNOWFLAKE_ACCOUNT }} - SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }} - SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }} - SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }} - SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }} - SNOWFLAKE_SCHEMA: ${{ vars.SNOWFLAKE_SCHEMA }} - SNOWFLAKE_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_PRIVATE_KEY }} + cd dbt_project && uv run dbt deps --target ci - name: Run SQLFluff lint - # if: contains(github.event.head_commit.modified, 'dbt_project/models/*.sql') run: | - cd dbt_project && uv run sqlfluff lint models/ - env: - SNOWFLAKE_ACCOUNT: ${{ vars.SNOWFLAKE_ACCOUNT }} - SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }} - SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }} - SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }} - SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }} - SNOWFLAKE_SCHEMA: ${{ vars.SNOWFLAKE_SCHEMA }} - SNOWFLAKE_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_PRIVATE_KEY }} + cd dbt_project && uv run sqlfluff lint models - name: Run dbt tests run: | - cd dbt_project && uv run dbt test --target test - env: - SNOWFLAKE_ACCOUNT: ${{ vars.SNOWFLAKE_ACCOUNT }} - SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }} - SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }} - SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }} - SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }} - SNOWFLAKE_SCHEMA: ${{ vars.SNOWFLAKE_SCHEMA }} - SNOWFLAKE_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_PRIVATE_KEY }} + cd dbt_project && uv run dbt test --target ci diff --git a/CLAUDE.md b/CLAUDE.md index 3452b64..3344bf6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,121 +4,94 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -This is a capstone project for FA DAE2 focused on Ethereum blockchain data extraction and transformation. The project extracts smart contract logs and transactions from Etherscan API, loads them into PostgreSQL, and transforms them using dbt. +Stables Analytics is a capstone project for Foundry AI Academy (DAE2) - a production-grade ELT pipeline for on-chain stablecoin analytics. The project extracts blockchain data via HyperSync GraphQL API, loads it into PostgreSQL/Snowflake, and transforms it using dbt. ## Architecture -The project follows an ELT (Extract, Load, Transform) pipeline: - -### 1. Extract Layer (`scripts/el/`) -- **Primary script**: `extract_etherscan.py` - Extracts logs and transactions from Etherscan API -- Uses the `onchaindata.data_extraction.etherscan` module with `EtherscanClient` -- Supports multiple blockchain networks via chainid mapping in `src/onchaindata/config/chainid.json` -- Features automatic retry logic for failed block ranges with exponential backoff (reduces chunk size by 10x) -- Data stored as Parquet files in `.data/raw/` directory -- Error tracking in `logging/extract_error/` with automatic retry mechanism that logs failed ranges to CSV -- Supports K/M/B suffixes for block numbers (e.g., '18.5M' = 18,500,000) -- Additional extraction capabilities: `extract_graphql.py` for GraphQL-based extraction - -### 2. Load Layer (`scripts/el/`) -- **load.py**: Unified loader script supporting both PostgreSQL and Snowflake -- Uses `onchaindata.data_pipeline.Loader` class with pluggable database clients -- Takes arguments: `-f` (file path), `-c` (client: postgres/snowflake), `-s` (schema), `-t` (table), `-w` (write disposition: append/replace/merge) -- Database clients in `src/onchaindata/utils/`: `PostgresClient`, `SnowflakeClient` - -### 3. Transform Layer (dbt) -- **Location**: `dbt_project/` -- Standard dbt project structure with models organized by layer: - - `models/01_staging/`: Raw data cleanup (e.g., `stg_logs_decoded.sql`) - - `models/intermediate/`: Business logic transformations - - `models/marts/`: Final analytics tables -- Materialization strategy: - - staging: `view` - - intermediate: `ephemeral` - - marts: `table` -- Shared macros in `dbt_project/macros/ethereum_macros.sql`: - - `uint256_to_address`: Extracts Ethereum addresses from uint256 hex strings - - `uint256_to_numeric`: Converts uint256 hex to numeric values -- Sources defined in `models/01_staging/sources.yml` (references `raw` schema) +**Pipeline**: `HyperSync GraphQL → Parquet Files → PostgreSQL/Snowflake → dbt → Analytics Tables` + +The project follows an ELT (Extract, Load, Transform) pattern with three main components: + +### 1. Load Layer (`scripts/load_file.py`) +Unified loader script supporting both PostgreSQL and Snowflake with pluggable database clients. + +**Key Features:** +- Supports Parquet and CSV files +- Three write modes: `append`, `replace`, `merge` (upsert) +- Uses DLT (data load tool) for efficient loading +- Database clients in `scripts/utils/database_client.py`: `PostgresClient`, `SnowflakeClient` + +**Arguments:** +- `-f`: File path (Parquet or CSV) +- `-c`: Client (`postgres` or `snowflake`) +- `-s`: Schema name +- `-t`: Table name +- `-w`: Write disposition (`append`/`replace`/`merge`) +- `-k`: Primary key columns for merge (comma-separated, e.g., `contract_address,chain`) +- `--stage`: Upload to Snowflake stage instead of loading (Snowflake only) +- `--stage_name`: Snowflake stage name when using `--stage` + +### 2. Transform Layer (dbt) +**Location**: `dbt_project/` + +Standard dbt project with three-tier modeling: +- `models/01_staging/`: Raw data cleanup (views, schema: `staging`) +- `models/02_intermediate/`: Business logic (ephemeral, not materialized) +- `models/03_mart/`: Final analytics tables (tables, schema: `mart`) + +**Key Features:** +- SCD Type 2 support via snapshots (`snapshots/snap_stablecoin.sql`) +- Custom macros in `macros/` (e.g., Ethereum address parsing) +- Sources defined in `01_staging/sources.yml` (references `raw` schema) - Configuration: [dbt_project.yml](dbt_project/dbt_project.yml), [profiles.yml](dbt_project/profiles.yml) -### 4. Package Structure (`src/onchaindata/`) -Reusable Python package with modules: -- `data_extraction/`: - - `etherscan.py`: EtherscanClient with rate limiting - - `graphql.py`: GraphQL-based extraction - - `rate_limiter.py`: Rate limiting utilities - - `base.py`: Base classes for API clients -- `data_pipeline/`: - - `loaders.py`: Loader class for database operations -- `utils/`: - - `postgres_client.py`: PostgreSQL client with connection pooling - - `snowflake_client.py`: Snowflake client - - `chain.py`: Chain-related utilities - - `base_client.py`: Base database client interface -- `config/`: Configuration files (chainid.json) +### 3. Database Clients (`scripts/utils/database_client.py`) +Pluggable database client architecture with base class pattern: + +- **BaseDatabaseClient**: Abstract base class defining common interface +- **PostgresClient**: PostgreSQL client using `psycopg` with DLT integration +- **SnowflakeClient**: Snowflake client with key-pair authentication and DLT integration + +Both clients support: +- `from_env()`: Create client from environment variables +- `get_connection()`: Context manager for database connections +- `get_dlt_destination()`: DLT destination configuration ## Development Commands ### Environment Setup ```bash -# Create Docker network (first time only) -docker network create fa-dae2-capstone_kafka_network - # Start PostgreSQL container docker-compose up -d -# Install dependencies using uv +# Install dependencies uv sync -# Set up environment variables +# Configure environment variables cp .env.example .env -export $(cat .env | xargs) - -# Initialize database schema (if needed) -./scripts/sql_pg.sh ./scripts/sql/init.sql -``` - -### Data Extraction -```bash -# Extract logs and transactions for a specific contract address -# Supports K/M/B suffixes for block numbers (e.g., '18.5M') -uv run python scripts/el/extract_etherscan.py \ - -c ethereum \ - -a 0x02950460e2b9529d0e00284a5fa2d7bdf3fa4d72 \ - --logs --transactions \ - --from_block 18.5M --to_block 20M \ - -v # verbose logging - -# Extract data from last N days -uv run python scripts/el/extract_etherscan.py \ - -a 0x02950460e2b9529d0e00284a5fa2d7bdf3fa4d72 \ - --logs --transactions \ - --last_n_days 7 - -# Logging levels: no flag (WARNING), -v (INFO), -vv (DEBUG) +# Edit .env with your credentials ``` ### Data Loading ```bash # Load Parquet file to PostgreSQL (append mode) -uv run python scripts/el/load.py \ - -f .data/raw/ethereum_0xaddress_logs_18500000_20000000.parquet \ +uv run python scripts/load_file.py \ + -f .data/raw/data_12345678.parquet \ -c postgres \ -s raw \ - -t logs \ + -t raw_transfer \ -w append # Load CSV with full replacement -uv run python scripts/el/load.py \ +uv run python scripts/load_file.py \ -f .data/raw/stablecoins.csv \ -c postgres \ -s raw \ -t raw_stablecoin \ -w replace -# Load CSV with merge/upsert (only updated rows needed) -uv run python scripts/el/load.py \ +# Load CSV with merge/upsert (only updated rows) +uv run python scripts/load_file.py \ -f .data/raw/stablecoins_updates.csv \ -c postgres \ -s raw \ @@ -127,155 +100,134 @@ uv run python scripts/el/load.py \ -k contract_address,chain # Load to Snowflake (requires SNOWFLAKE_* env vars) -uv run python scripts/el/load.py \ - -f .data/raw/ethereum_0xaddress_logs_18500000_20000000.parquet \ +uv run python scripts/load_file.py \ + -f .data/raw/data_12345678.parquet \ -c snowflake \ -s raw \ - -t logs \ + -t raw_transfer \ -w append + +# Upload file to Snowflake stage (without loading to table) +uv run python scripts/load_file.py \ + -f .data/raw/data_12345678.parquet \ + -c snowflake \ + -s raw \ + --stage \ + --stage_name my_stage ``` ### dbt Operations ```bash -# Run dbt models -./scripts/dbt.sh run +# Navigate to dbt project directory +cd dbt_project + +# Run all models +dbt run # Run specific model -./scripts/dbt.sh run --select stg_logs_decoded +dbt run --select stg_transfer + +# Run models by folder +dbt run --select 01_staging.* +dbt run --select 03_mart.* # Run tests -./scripts/dbt.sh test +dbt test -# Other dbt commands -./scripts/dbt.sh compile # Compile models -./scripts/dbt.sh docs generate # Generate documentation -./scripts/dbt.sh run --select staging.* # Run all staging models -./scripts/dbt.sh deps # Install dbt packages -``` +# Create/update snapshots (SCD Type 2) +dbt snapshot -### SQL Operations -```bash -# Run SQL scripts directly against PostgreSQL -./scripts/sql_pg.sh ./scripts/sql/init.sql +# Install dbt packages +dbt deps + +# Compile models (check SQL without running) +dbt compile -# Ad-hoc queries -./scripts/sql_pg.sh ./scripts/sql/ad_hoc.sql +# Generate and serve documentation +dbt docs generate +dbt docs serve ``` ## Environment Variables -Required variables (see `.env.example`): +Required variables (see [.env.example](.env.example)): - `POSTGRES_HOST`, `POSTGRES_PORT`, `POSTGRES_DB`, `POSTGRES_USER`, `POSTGRES_PASSWORD` -- `DB_SCHEMA`: Default schema for operations (e.g., `fa02_staging`) +- `DB_SCHEMA`: Default schema for operations - `KAFKA_NETWORK_NAME`: Docker network name -- `ETHERSCAN_API_KEY`: For Etherscan API access Optional (for Snowflake): - `SNOWFLAKE_ACCOUNT`, `SNOWFLAKE_USER`, `SNOWFLAKE_ROLE`, `SNOWFLAKE_WAREHOUSE` -- `SNOWFLAKE_DATABASE`, `SNOWFLAKE_SCHEMA`, `SNOWFLAKE_PRIVATE_KEY_FILE_PATH` +- `SNOWFLAKE_DATABASE`, `SNOWFLAKE_SCHEMA` +- `SNOWFLAKE_PRIVATE_KEY_FILE`: Path to private key file (.p8 or .pem) for local development ## Key Data Flows -1. **Etherscan → Parquet**: `extract_etherscan.py` extracts blockchain data to `.data/raw/*.parquet` -2. **Parquet → PostgreSQL/Snowflake**: `load.py` loads into `raw` schema tables -3. **PostgreSQL → dbt**: dbt models transform `raw.logs` → `staging.stg_logs_decoded` -4. Failed extractions are logged to `logging/extract_error/` and automatically retried with smaller chunk sizes (10x reduction) +1. **HyperSync GraphQL → Parquet**: Extract blockchain data using external indexer to `.data/raw/*.parquet` +2. **Parquet → PostgreSQL/Snowflake**: `load_file.py` loads into `raw` schema tables +3. **Raw → dbt**: dbt models transform raw data through staging → marts -## Stablecoin Reference Data Management +## SCD Type 2: Stablecoin Metadata Tracking -The project uses **SCD Type 2** (Slowly Changing Dimension) to track stablecoin metadata changes over time using dbt snapshots. +The project uses **Slowly Changing Dimension Type 2** via dbt snapshots to maintain historical stablecoin metadata. ### Architecture ``` -CSV File → PostgreSQL (raw.raw_stablecoin) → dbt Snapshot (snapshots.snap_stablecoin) → Dimension (mart.dim_stablecoin) +CSV → raw.raw_stablecoin → dbt snapshot → mart.dim_stablecoin (with history) ``` -### Initial Setup (First Time) +### Initial Setup ```bash -# 1. Load initial stablecoin data (all stablecoins) -uv run python scripts/el/load.py \ +# 1. Load stablecoin metadata +uv run python scripts/load_file.py \ -f .data/raw/stablecoins.csv \ - -c postgres \ - -s raw \ - -t raw_stablecoin \ - -w replace + -c postgres -s raw -t raw_stablecoin -w replace -# 2. Create initial snapshot (establishes baseline) -./scripts/dbt.sh snapshot +# 2. Create baseline snapshot (in dbt_project directory) +cd dbt_project && dbt snapshot -# 3. Build dimension table -./scripts/dbt.sh run --select dim_stablecoin +# 3. Build dimension +dbt run --select dim_stablecoin ``` -### Updating Stablecoin Metadata (Incremental Updates) - -**When to update**: When stablecoin attributes change (name, symbol, backing_type, etc.) or when adding new stablecoins. +### Updating Metadata -**Option A: Merge/Upsert (Recommended - Only Updated Rows)** +**Option A: Merge (Only Changed Rows)** ```bash -# 1. Create CSV with ONLY changed/new stablecoins -# Example: .data/raw/stablecoins_updates.csv -# contract_address,chain,symbol,name,currency,backing_type,decimals -# 0xa0b8...,ethereum,USDC,USD Coin Updated,usd,fiat-backed,6 -# 0x1234...,polygon,NEWCOIN,New Stablecoin,usd,crypto-backed,18 - -# 2. Merge updates into raw table -uv run python scripts/el/load.py \ - -f .data/raw/stablecoins_updates.csv \ - -c postgres \ - -s raw \ - -t raw_stablecoin \ - -w merge \ - -k contract_address,chain - -# 3. Run snapshot to detect and record changes (SCD2) -./scripts/dbt.sh snapshot - -# 4. Refresh dimension table -./scripts/dbt.sh run --select dim_stablecoin +# Load only changed/new records +uv run python scripts/load_file.py \ + -f .data/raw/updates.csv \ + -c postgres -s raw -t raw_stablecoin \ + -w merge -k contract_address,chain + +cd dbt_project +dbt snapshot # Detect changes +dbt run --select dim_stablecoin # Refresh dimension ``` -**Option B: Full Replacement (All Stablecoins)** +**Option B: Replace (All Rows)** ```bash -# 1. Provide CSV with ALL stablecoins (not just updates) -uv run python scripts/el/load.py \ +# Load complete dataset +uv run python scripts/load_file.py \ -f .data/raw/stablecoins_full.csv \ - -c postgres \ - -s raw \ - -t raw_stablecoin \ - -w replace + -c postgres -s raw -t raw_stablecoin -w replace -# 2-4. Same as Option A -./scripts/dbt.sh snapshot -./scripts/dbt.sh run --select dim_stablecoin +cd dbt_project +dbt snapshot +dbt run --select dim_stablecoin ``` -### CSV Format -```csv -contract_address,chain,symbol,name,currency,backing_type,decimals -0x02950460e2b9529d0e00284a5fa2d7bdf3fa4d72,ethereum,CURVE,Curve Finance USD,usd,crypto-backed,18 -0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48,ethereum,USDC,USD Coin,usd,fiat-backed,6 -``` - -### How SCD2 Works -- **First snapshot run**: All records get `valid_from = now()`, `valid_to = NULL`, `is_current = true` -- **When changes detected**: - - Old version: `valid_to` is set to current timestamp, `is_current = false` - - New version: New row with `valid_from = now()`, `valid_to = NULL`, `is_current = true` -- **Result**: Full history of all changes preserved in `mart.dim_stablecoin` - -### Querying SCD2 Dimension +### Querying Historical Data ```sql --- Get current stablecoin metadata +-- Current records only SELECT * FROM mart.dim_stablecoin WHERE is_current = true; --- Get stablecoin metadata at a specific point in time +-- Point-in-time snapshot SELECT * FROM mart.dim_stablecoin WHERE '2024-01-15'::timestamp BETWEEN valid_from AND COALESCE(valid_to, '9999-12-31'); --- See all historical changes for a specific stablecoin +-- Full history for one stablecoin SELECT * FROM mart.dim_stablecoin -WHERE contract_address = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' +WHERE contract_address = '0xa0b8...' ORDER BY valid_from DESC; ``` @@ -283,38 +235,32 @@ ORDER BY valid_from DESC; ``` dbt_project/ -├── dbt_project.yml # Configuration (project: stablecoins) -├── profiles.yml # Database connections (dev=postgres, test/prod=snowflake) +├── dbt_project.yml # Project config (name: stables_analytics) +├── profiles.yml # Connections (dev=postgres, test/prod=snowflake) ├── models/ -│ ├── 01_staging/ # Raw data cleanup (materialized as views) +│ ├── 01_staging/ # Views in 'staging' schema │ │ ├── sources.yml # Source definitions (raw schema) -│ │ ├── models.yml # Model documentation -│ │ └── stg_logs_decoded.sql -│ ├── intermediate/ # Business logic (ephemeral) -│ └── marts/ # Final analytics (tables) +│ │ └── models.yml # Model documentation +│ ├── 02_intermediate/ # Ephemeral models (not materialized) +│ └── 03_mart/ # Tables in 'mart' schema +├── snapshots/ # SCD Type 2 snapshots +├── macros/ # Custom SQL macros ├── tests/ # Data quality tests -├── macros/ # ethereum_macros.sql (uint256_to_address, uint256_to_numeric) -└── packages.yml # dbt dependencies +└── packages.yml # dbt package dependencies ``` -### Model Naming Conventions -- **Staging models**: `stg__.sql` (e.g., `stg_logs_decoded.sql`) -- **Intermediate models**: `int__.sql` (e.g., `int_logs_filtered.sql`) -- **Fact tables**: `fct_.sql` (e.g., `fct_transfers.sql`) -- **Dimension tables**: `dim_.sql` (e.g., `dim_contracts.sql`) - -## Database Schema - -- **raw.logs**: Raw log data with columns: address, topics (JSONB), data, block_number, transaction_hash, time_stamp, etc. -- **raw.transactions**: Transaction data (structure similar to logs) -- **staging.stg_logs_decoded**: Decoded logs with parsed topics (topic0-topic3), indexed on (contract_address, transaction_hash, index) -- dbt creates additional staging/intermediate/mart tables based on models in `dbt_project/models/` - -## Project Structure Notes - -- Runnable scripts are ONLY in `scripts/` directory (organized as `scripts/el/` for extract/load) -- Reusable code is packaged in `src/onchaindata/` as an installable package -- dbt project located at `dbt_project/` with standard structure (staging → intermediate → marts) -- Data files: `.data/raw/` for extracted data, `sampledata/` for examples -- Always run Python scripts with `uv run python` (not direct python) -- Project uses `uv` for dependency management (see `pyproject.toml`) +### Naming Conventions +- Staging: `stg_` (e.g., `stg_transfer`) +- Intermediate: `int__` (e.g., `int_transfer_filtered`) +- Facts: `fct_` (e.g., `fct_daily_volume`) +- Dimensions: `dim_` (e.g., `dim_stablecoin`) + +## Project Structure + +- **scripts/**: Runnable Python scripts + - `load_file.py`: Data loader + - `utils/database_client.py`: Database client abstractions +- **dbt_project/**: dbt transformation layer +- **.data/raw/**: Data files (Parquet, CSV) +- **docs/**: MkDocs documentation +- Always run Python with: `uv run python script.py` diff --git a/dbt_project/.sqlfluff b/dbt_project/.sqlfluff index 2fe13cf..c733771 100644 --- a/dbt_project/.sqlfluff +++ b/dbt_project/.sqlfluff @@ -1,5 +1,5 @@ [sqlfluff] -# Database dialect - Snowflake +# Database dialect (Snowflake, BigQuery, PostgreSQL, etc.) dialect = snowflake # Templater for dbt projects @@ -24,16 +24,16 @@ line_position = trailing # Capitalization rules [sqlfluff:rules:capitalisation.keywords] -capitalisation_policy = upper +capitalisation_policy = lower [sqlfluff:rules:capitalisation.identifiers] extended_capitalisation_policy = lower [sqlfluff:rules:capitalisation.functions] -capitalisation_policy = upper +capitalisation_policy = lower [sqlfluff:rules:capitalisation.literals] -capitalisation_policy = consistent +capitalisation_policy = lower # Aliasing rules [sqlfluff:rules:aliasing.table] @@ -51,13 +51,15 @@ group_by_and_order_by_style = implicit # dbt templater configuration [sqlfluff:templater:dbt] -project_dir = . -profiles_dir = . -profile = stables_analytics -target = test -# Skip dbt compilation and use Jinja templater instead -skip_compilation = True +# project_dir = ./dbt_sf +# profiles_dir = ./dbt_sf +# profile = dbt_sf +target = ci +dbt_skip_compilation_error = True +dbt_skip_connection_error = True # Enable dbt builtins for Jinja templater [sqlfluff:templater:jinja] apply_dbt_builtins = True + +library_path = macros/ \ No newline at end of file diff --git a/dbt_project/.sqlfluffignore b/dbt_project/.sqlfluffignore index b84e7f1..d02d459 100644 --- a/dbt_project/.sqlfluffignore +++ b/dbt_project/.sqlfluffignore @@ -1,19 +1,12 @@ # dbt specific directories -dbt_project/target/ -dbt_project/dbt_packages/ -dbt_project/logs/ +target/ +dbt_packages/ +macros/ -# Non-dbt SQL files (if any) -scripts/sql/ad_hoc.sql - -# Python environments +# Non-dbt SQL files +scripts/ .venv/ -venv/ # Generated files *.pyc -__pycache__/ - -# Data directories -.data/ -sampledata/ +__pycache__/ \ No newline at end of file diff --git a/dbt_project/models/01_staging/models.yml b/dbt_project/models/01_staging/models.yml index 3307516..5a0aa45 100644 --- a/dbt_project/models/01_staging/models.yml +++ b/dbt_project/models/01_staging/models.yml @@ -8,7 +8,7 @@ models: description: "Unique transfer ID (transaction_hash_logindex)" tests: - not_null - - unique + # - unique - name: block_number description: "Block number (cast to bigint)" tests: diff --git a/dbt_project/models/01_staging/stg_stablecoin.sql b/dbt_project/models/01_staging/stg_stablecoin.sql index 0114302..3c185ca 100644 --- a/dbt_project/models/01_staging/stg_stablecoin.sql +++ b/dbt_project/models/01_staging/stg_stablecoin.sql @@ -1,17 +1,17 @@ -WITH source AS ( - SELECT * FROM {{ source('raw_data', 'raw_stablecoin') }} +with source as ( + select * from {{ source('raw_data', 'raw_stablecoin') }} ), -casted AS ( - SELECT - contract_address::VARCHAR(42) AS contract_address, - chain::VARCHAR(20) AS chain, - symbol::VARCHAR(20) AS symbol, - name::VARCHAR(100) AS name, - currency::VARCHAR(10) AS currency, - backing_type::VARCHAR(20) AS backing_type, - decimals::INTEGER AS decimals - FROM source +casted as ( + select + contract_address::VARCHAR(42) as contract_address, + chain::VARCHAR(20) as chain, + symbol::VARCHAR(20) as symbol, + name::VARCHAR(100) as name, + currency::VARCHAR(10) as currency, + backing_type::VARCHAR(20) as backing_type, + decimals::INTEGER as decimals + from source ) -SELECT * FROM casted +select * from casted diff --git a/dbt_project/models/01_staging/stg_transfer.sql b/dbt_project/models/01_staging/stg_transfer.sql index 9edb2cb..29aa457 100644 --- a/dbt_project/models/01_staging/stg_transfer.sql +++ b/dbt_project/models/01_staging/stg_transfer.sql @@ -1,28 +1,28 @@ -WITH source AS ( - SELECT * FROM {{ source('raw_data', 'raw_transfer') }} +with source as ( + select * from {{ source('raw_data', 'raw_transfer') }} ), -casted AS ( - SELECT +casted as ( + select id, - block_number::BIGINT AS block_number, + block_number::BIGINT as block_number, {% if target.type == 'postgres' %} - TO_TIMESTAMP(timestamp::BIGINT) AT TIME ZONE 'UTC' AS block_timestamp, + TO_TIMESTAMP(timestamp::BIGINT) at time zone 'UTC' as block_timestamp, {% else %} - TO_TIMESTAMP(timestamp::BIGINT) AS block_timestamp, + TO_TIMESTAMP(timestamp::BIGINT) as block_timestamp, {% endif %} - contract_address::VARCHAR(42) AS contract_address, + contract_address::VARCHAR(42) as contract_address, {% if target.type == 'postgres' %} - "from"::VARCHAR(42) AS from_address, - "to"::VARCHAR(42) AS to_address, + "from"::VARCHAR(42) as from_address, + "to"::VARCHAR(42) as to_address, {% else %} - "FROM"::VARCHAR(42) AS from_address, - "TO"::VARCHAR(42) AS to_address, + "FROM"::VARCHAR(42) as from_address, + "TO"::VARCHAR(42) as to_address, {% endif %} - value::NUMERIC(38, 0) AS amount_raw, + value::NUMERIC(38, 0) as amount_raw, _dlt_load_id, _dlt_id - FROM source + from source ) -SELECT * FROM casted +select * from casted diff --git a/dbt_project/models/03_mart/dim_stablecoin.sql b/dbt_project/models/03_mart/dim_stablecoin.sql index 1dabbf3..aac4373 100644 --- a/dbt_project/models/03_mart/dim_stablecoin.sql +++ b/dbt_project/models/03_mart/dim_stablecoin.sql @@ -1,12 +1,12 @@ -- SCD Type 2 dimension built on dbt snapshot -- This model adds custom SCD2 column names while leveraging dbt's snapshot functionality -WITH snap_stablecoin AS ( - SELECT * FROM {{ ref('snap_stablecoin') }} +with snap_stablecoin as ( + select * from {{ ref('snap_stablecoin') }} ), -final AS ( - SELECT +final as ( + select -- Business keys contract_address, chain, @@ -19,12 +19,12 @@ final AS ( decimals, -- SCD Type 2 columns (mapped from dbt snapshot columns) - dbt_valid_from AS valid_from, - dbt_valid_to AS valid_to, - dbt_updated_at AS created_at, - COALESCE(dbt_valid_to IS NULL, FALSE) AS is_current + dbt_valid_from as valid_from, + dbt_valid_to as valid_to, + dbt_updated_at as created_at, + COALESCE(dbt_valid_to is null, false) as is_current - FROM snap_stablecoin + from snap_stablecoin ) -SELECT * FROM final +select * from final diff --git a/dbt_project/models/03_mart/fct_transfer.sql b/dbt_project/models/03_mart/fct_transfer.sql index 0062169..f54e5a3 100644 --- a/dbt_project/models/03_mart/fct_transfer.sql +++ b/dbt_project/models/03_mart/fct_transfer.sql @@ -8,24 +8,24 @@ }} -WITH stg_transfer AS ( - SELECT * FROM {{ ref('stg_transfer') }} +with stg_transfer as ( + select * from {{ ref('stg_transfer') }} {% if is_incremental() %} -- Only process new blocks since last run - WHERE block_number >= (SELECT COALESCE(MAX(block_number), 0) AS max_block FROM {{ this }}) + where block_number >= (select COALESCE(MAX(block_number), 0) as max_block from {{ this }}) {% endif %} ), -dim_stablecoin AS ( - SELECT * FROM {{ ref('dim_stablecoin') }} - WHERE is_current = true -- Only use current stablecoin metadata +dim_stablecoin as ( + select * from {{ ref('dim_stablecoin') }} + where is_current = true -- Only use current stablecoin metadata ), -parsed AS ( - SELECT +parsed as ( + select -- Parse natural key from id (format: "0xtxhash_logindex") - SPLIT_PART(id, '_', 2)::INTEGER AS log_index, - TO_CHAR(block_timestamp, 'YYYYMMDD')::INTEGER AS date_key, + SPLIT_PART(id, '_', 2)::INTEGER as log_index, + TO_CHAR(block_timestamp, 'YYYYMMDD')::INTEGER as date_key, -- Time dimension block_number, @@ -33,19 +33,19 @@ parsed AS ( contract_address, -- Contract/token dimension - 'ethereum' AS chain, + 'ethereum' as chain, from_address, -- TODO: get chain from raw data when available -- Address dimensions to_address, - SPLIT_PART(id, '_', 1) AS transaction_hash + SPLIT_PART(id, '_', 1) as transaction_hash - FROM stg_transfer + from stg_transfer ), -enriched AS ( - SELECT +enriched as ( + select -- Keys p.transaction_hash, p.log_index, @@ -66,32 +66,32 @@ enriched AS ( -- Join stablecoin metadata d.symbol, d.name, - COALESCE(d.decimals, 18) AS decimals, + COALESCE(d.decimals, 18) as decimals, -- Determine transaction type - CASE - WHEN p.from_address = '0x0000000000000000000000000000000000000000' THEN 'mint' - WHEN p.to_address = '0x0000000000000000000000000000000000000000' THEN 'burn' - ELSE 'transfer' - END AS transaction_type, + case + when p.from_address = '0x0000000000000000000000000000000000000000' then 'mint' + when p.to_address = '0x0000000000000000000000000000000000000000' then 'burn' + else 'transfer' + end as transaction_type, -- Convert to decimal amount using actual decimals from dim_stablecoin -- For stablecoins, amount ≈ USD value. TODO: have dim_price table - {{ convert_token_amount('s.amount_raw', 'COALESCE(d.decimals, 18)', 2) }} AS amount + {{ convert_token_amount('s.amount_raw', 'COALESCE(d.decimals, 18)', 2) }} as amount - FROM parsed AS p - LEFT JOIN stg_transfer AS s - ON + from parsed as p + left join stg_transfer as s + on p.transaction_hash = SPLIT_PART(s.id, '_', 1) - AND p.log_index = SPLIT_PART(s.id, '_', 2)::INTEGER - LEFT JOIN dim_stablecoin AS d - ON + and p.log_index = SPLIT_PART(s.id, '_', 2)::INTEGER + left join dim_stablecoin as d + on LOWER(p.contract_address) = LOWER(d.contract_address) - AND p.chain = d.chain + and p.chain = d.chain ) -SELECT +select transaction_hash, log_index, date_key, @@ -108,5 +108,5 @@ SELECT amount, -- Audit column to track incremental runs - CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()) AS dbt_loaded_at -FROM enriched + CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()) as dbt_loaded_at +from enriched diff --git a/dbt_project/models/03_mart/models.yml b/dbt_project/models/03_mart/models.yml index 10c7ec2..e71be6a 100644 --- a/dbt_project/models/03_mart/models.yml +++ b/dbt_project/models/03_mart/models.yml @@ -32,12 +32,6 @@ models: description: "Timestamp when record was created" - name: updated_at description: "Timestamp when record was last updated" - # tests: - # - dbt_utils.unique_combination_of_columns: - # combination_of_columns: - # - contract_address - # - chain - # where: "is_current = true" - name: fct_transfer description: "Fact table for stablecoin transfer events with business enrichment" @@ -87,9 +81,9 @@ models: - accepted_values: arguments: values: ["transfer", "mint", "burn"] - tests: - - dbt_utils.unique_combination_of_columns: - arguments: - combination_of_columns: - - transaction_hash - - log_index + # tests: + # - dbt_utils.unique_combination_of_columns: + # arguments: + # combination_of_columns: + # - transaction_hash + # - log_index diff --git a/dbt_project/profiles.yml b/dbt_project/profiles.yml index 9da4186..f836a6d 100644 --- a/dbt_project/profiles.yml +++ b/dbt_project/profiles.yml @@ -4,16 +4,24 @@ stables_analytics: test: type: postgres host: "{{ env_var('POSTGRES_HOST') }}" - port: 5432 + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" user: "{{ env_var('POSTGRES_USER') }}" password: "{{ env_var('POSTGRES_PASSWORD') }}" - dbname: "{{ env_var('POSTGRES_DB') }}" - schema: raw - threads: 4 - keepalives_idle: 0 - connect_timeout: 10 - sslmode: disable + schema: "{{ env_var('POSTGRES_SCHEMA') }}" dev: + type: snowflake + account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}" + user: "{{ env_var('SNOWFLAKE_USER') }}" + private_key_path: "{{ env_var('SNOWFLAKE_PRIVATE_KEY_PATH') }}" + role: "{{ env_var('SNOWFLAKE_ROLE') }}" + database: "{{ env_var('SNOWFLAKE_DATABASE') }}" + warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}" + schema: "{{ env_var('SNOWFLAKE_SCHEMA') }}" + threads: 4 + client_session_keep_alive: False + autocommit: True + ci: type: snowflake account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}" user: "{{ env_var('SNOWFLAKE_USER') }}" @@ -25,12 +33,11 @@ stables_analytics: threads: 4 client_session_keep_alive: False autocommit: True - prod: type: snowflake account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}" user: "{{ env_var('SNOWFLAKE_USER') }}" - private_key: "{{ env_var('SNOWFLAKE_PRIVATE_KEY') }}" + private_key_path: "{{ env_var('SNOWFLAKE_PRIVATE_KEY_PATH') }}" role: "{{ env_var('SNOWFLAKE_ROLE') }}" database: "{{ env_var('SNOWFLAKE_DATABASE') }}" warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}" diff --git a/docs/01_data_pipeline/source.md b/docs/01_data_pipeline/source.md index 26289dd..bb623c5 100644 --- a/docs/01_data_pipeline/source.md +++ b/docs/01_data_pipeline/source.md @@ -7,72 +7,4 @@ More details: [Envio](https://docs.envio.dev/docs/HyperIndex/overview) ```bash git clone https://github.com/newgnart/envio-stablecoins.git pnpm dev -``` - -When the indexer is running, you have few options: - -- **Extract/save to parquet files**, this will save the data to `.data/raw/transfer_{start_block}_{end_block}.parquet` - -```bash -uv run scripts/el/extract_graphql.py \ ---query-file scripts/el/stables_transfers.graphql \ --f transfer \ ---from_block 23650000 ---to_block 23660000 --v -``` - -- **Stream/load directly from the indexer to the postgres** -```bash -uv run scripts/el/stream_graphql.py \ --e http://localhost:8080/v1/graphql \ ---fields id,blockNumber,timestamp,contractAddress,from,to,value \ ---graphql-table stablesTransfers \ --c postgres \ --s raw \ --t raw_transfer -``` - -- **Real-time streaming via Kafka** (recommended for production) - - Kafka enables decoupled, scalable real-time data streaming with features like replay, backpressure handling, and multi-consumer support. - - ```bash - # Terminal 1: Start Kafka producer (polls GraphQL every 5 seconds) - uv run python scripts/kafka/produce_from_graphql.py \ - --endpoint http://localhost:8080/v1/graphql \ - --kafka-topic stablecoin-transfers \ - --poll-interval 5 \ - -v - - # Terminal 2: Start PostgreSQL consumer (batch writes) - uv run python scripts/kafka/consume_to_postgres.py \ - --kafka-topic stablecoin-transfers \ - --schema raw \ - --table transfers_kafka \ - --batch-size 100 \ - -v - - # Terminal 3: Start alert monitor (optional - detects large transfers) - uv run python scripts/kafka/monitor_alerts.py \ - --large-transfer 1000000 \ - --critical-transfer 10000000 \ - -v - ``` - - Benefits: - - **Decoupling**: Separate data ingestion from processing - - **Scalability**: Multiple consumers can process the same stream - - **Replay**: Reprocess historical events from Kafka logs (7-day retention) - - **Real-time alerting**: Detect large transfers within seconds - - See [scripts/kafka/README.md](../../scripts/kafka/README.md) for detailed configuration and monitoring. - -- **Move data from postgres to snowflake** -```bash -uv run python scripts/el/pg2sf_raw_transfer.py \ ---from_block 23650000 \ ---to_block 23660000 \ --v -``` - +``` \ No newline at end of file diff --git a/docs/04_api_reference/041_data_sources.md b/docs/04_api_reference/041_data_sources.md deleted file mode 100644 index 0db9461..0000000 --- a/docs/04_api_reference/041_data_sources.md +++ /dev/null @@ -1,39 +0,0 @@ -## GraphQL -Utilities for getting data from Envio GraphQL API endpoint - -### `GraphQLBatch` - -For extracting data from a GraphQL endpoint and save to Parquet file. - -**Parameters:** - -- `endpoint` (str): GraphQL endpoint URL -- `query` (str): GraphQL query string - -**Methods:** - -- `extract()`: Execute GraphQL query and return results as dictionary -- `extract_to_dataframe()`: Execute GraphQL query and return results as Polars DataFrame - - -### `GraphQLStream` - -For streaming data from a GraphQL endpoint and push to database directly. - -**Parameters:** - -- `endpoint` (str): GraphQL endpoint URL -- `table_name` (str): Name of the table (GraphQL table) to fetch -- `fields` (list): List of fields to fetch -- `poll_interval` (int): Polling interval in seconds - -**Methods:** - -- `stream()`: Stream data from GraphQL endpoint and push to database directly - - Arguments: - - `loader` (Loader): Loader instance for database operations - - `schema` (str): Target schema name - - `table_name` (str): Target table name - -## Etherscan -Utilities for getting data from Etherscan API. diff --git a/docs/04_api_reference/042_data_loading.md b/docs/04_api_reference/042_data_loading.md deleted file mode 100644 index 39ae588..0000000 --- a/docs/04_api_reference/042_data_loading.md +++ /dev/null @@ -1,95 +0,0 @@ -## Database Clients - -### `BaseDatabaseClient` - -Abstract base class for database clients with common patterns. - -**Location:** `onchaindata.utils.base_client` - -**Methods:** -- `get_dlt_destination()`: Get DLT destination for this database -- `get_connection()`: Get a database connection - - -### `PostgresClient` and `SnowflakeClient` - -PostgreSQL and Snowflake database clients with connection pooling. - -**Location:** `onchaindata.utils.postgres_client` and `onchaindata.utils.snowflake_client` - -**Constructor:** -Both can be constructed with classmethod `from_env` - -```python -client = PostgresClient.from_env() -``` - -**Methods:** - - - - - - -## Loader -*A wrapper class inheriting from Database Client and dlt.pipeline* - -**Location:** `onchaindata.data_pipeline.loaders` - -**Constructor:** -```python -from onchaindata.data_pipeline import Loader -from onchaindata.utils import PostgresClient - -client = PostgresClient.from_env() -loader = Loader(client=client) -``` - -**Parameters:** - -- `client` (PostgresClient | SnowflakeClient): Database client instance - -**Methods:** - -#### `load_parquet()` -Load Parquet file to database using DLT. - -```python -loader.load_parquet( - file_path=".data/raw/data.parquet", - schema="raw", - table_name="stables_transfers", - write_disposition="append" # or "replace", "merge" -) -``` - -**Parameters:** - -- `file_path` (str | Path): Path to the Parquet file -- `schema` (str): Target schema name -- `table_name` (str): Target table name -- `write_disposition` (str): How to handle existing data - - `"append"`: Add new records (default) - - `"replace"`: Drop and recreate table - - `"merge"`: Update existing records - -#### `load_dataframe()` -Load Polars DataFrame directly to database. - -```python -import polars as pl - -df = pl.read_parquet(".data/raw/data.parquet") -loader.load_dataframe( - df=df, - schema="raw", - table_name="stables_transfers", - write_disposition="append" -) -``` - -**Special Handling:** - -- For `logs` table, automatically sets `topics` column as JSON type - ---- diff --git a/docs/04_api_reference/043_configuration.md b/docs/04_api_reference/043_configuration.md deleted file mode 100644 index edd0064..0000000 --- a/docs/04_api_reference/043_configuration.md +++ /dev/null @@ -1,123 +0,0 @@ -# Configuration Guide - -This guide covers all configuration options for the project. - -## Environment Variables - -The project uses environment variables for configuration. Copy `.env.example` to `.env` and configure: - -```bash -cp .env.example .env -``` - -### Required Variables - -#### PostgreSQL Configuration -```bash -POSTGRES_HOST=localhost -POSTGRES_PORT=5432 -POSTGRES_DB=postgres -POSTGRES_USER=postgres -POSTGRES_PASSWORD=postgres -DB_SCHEMA=fa02_staging # Default schema for operations -``` - -#### Docker Network -```bash -KAFKA_NETWORK_NAME=fa-dae2-capstone_kafka_network -``` - -### Optional Variables - -#### Snowflake Configuration -For Snowflake data warehouse support: - -```bash -SNOWFLAKE_ACCOUNT=your_account -SNOWFLAKE_USER=your_user -SNOWFLAKE_ROLE=your_role -SNOWFLAKE_WAREHOUSE=your_warehouse -SNOWFLAKE_DATABASE=your_database -SNOWFLAKE_SCHEMA=your_schema -SNOWFLAKE_PRIVATE_KEY_FILE_PATH=/path/to/private_key.p8 -``` - -#### API Keys -For alternative data extraction methods: - -```bash -ETHERSCAN_API_KEY=your_etherscan_api_key # Optional, for Etherscan API extraction -``` - ---- - -## Database Setup - -### PostgreSQL with Docker - -1. Create Docker network (first time only): -```bash -docker network create fa-dae2-capstone_kafka_network -``` - -2. Start PostgreSQL container: -```bash -docker-compose up -d -``` - -3. Load environment variables: -```bash -export $(cat .env | xargs) -``` - -4. Initialize database schema (if needed): -```bash -./scripts/sql_pg.sh ./scripts/sql/init.sql -``` - -### Snowflake Setup - -1. Generate private key for authentication: -```bash -openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_key.p8 -nocrypt -``` - -2. Add public key to Snowflake user: -```sql -ALTER USER your_user SET RSA_PUBLIC_KEY=''; -``` - -3. Set `SNOWFLAKE_PRIVATE_KEY_FILE_PATH` in `.env` - ---- - -## Supported Chains - -The project supports multiple EVM chains via chain ID mapping in `src/onchaindata/config/chainid.json`. - -**Mainnets:** -- `ethereum` (1) -- `arbitrum_one` (42161) -- `base` (8453) -- `polygon` (137) -- `op` (10) -- `bnb_smart_chain` (56) -- `avalanche_c-_chain` (43114) -- And 50+ more chains... - -**Testnets:** -- `sepolia_testnet` (11155111) -- `base_sepolia_testnet` (84532) -- `arbitrum_sepolia_testnet` (421614) -- And more... - -**Usage:** -```python -from onchaindata.data_extraction.etherscan import EtherscanClient - -# Use chain name -client = EtherscanClient(chain="ethereum") - -# Or use chain ID directly -client = EtherscanClient(chainid=1) -``` diff --git a/mkdocs.yml b/mkdocs.yml index bbfa2d6..f8f24bc 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -58,9 +58,3 @@ nav: - Data Sources: 01_data_pipeline/source.md - Batch Layer: 01_data_pipeline/batch_layer.md - Speed Layer: 01_data_pipeline/speed_layer.md - - # - Analytics: 03_analytics.md - # - API Reference: - # - Data Sources: 04_api_reference/041_data_sources.md - # - Data Loading: 04_api_reference/042_data_loading.md - # - Configuration: 04_api_reference/043_configuration.md diff --git a/pyproject.toml b/pyproject.toml index afd3650..6ce9e93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [project] -name = "onchaindata" +name = "stables-analytics" version = "0.1.0" description = "Capstone project package for FA DAE2" readme = "README.md" @@ -24,14 +24,6 @@ dependencies = [ "webdriver-manager>=4.0.2", ] -[tool.uv.sources] -onchaindata = { path = "./src" } [dependency-groups] -dev = [ - "sqlfluff>=3.5.0", - "sqlfluff-templater-dbt>=3.5.0", -] - -[tool.uv] -package = true +dev = ["sqlfluff>=3.5.0", "sqlfluff-templater-dbt>=3.5.0"] diff --git a/scripts/el/extract_etherscan.py b/scripts/el/extract_etherscan.py deleted file mode 100644 index e1d81cf..0000000 --- a/scripts/el/extract_etherscan.py +++ /dev/null @@ -1,323 +0,0 @@ -import json, argparse, logging, os -from datetime import datetime, timedelta, timezone -import logging.handlers - -from pathlib import Path -from re import A -from dotenv import load_dotenv -import polars as pl -import pandas as pd -from onchaindata.data_extraction.etherscan import etherscan_to_parquet, EtherscanClient - -load_dotenv() - -logger = logging.getLogger(__name__) - - -def parse_number_with_suffix(value: str) -> int: - """Parse numbers with K/M/B suffixes (e.g., '18.5M' -> 18500000). - - Args: - value: String number that may include K (thousand), M (million), or B (billion) suffix - - Returns: - Integer value - - Examples: - '18.5M' -> 18500000 - '1.2K' -> 1200 - '3B' -> 3000000000 - '1000' -> 1000 - """ - value = str(value).strip().upper() - - suffixes = { - "K": 1_000, - "M": 1_000_000, - "B": 1_000_000_000, - } - - for suffix, multiplier in suffixes.items(): - if value.endswith(suffix): - number_part = value[:-1] - return int(float(number_part) * multiplier) - - # No suffix, parse as regular int - return int(value) - - -def setup_logging(log_filename: str = None, level: str = "INFO"): - """Sets up logging with console streaming and optional file logging.""" - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - root_logger = logging.getLogger() - root_logger.setLevel(level) - - # Console handler (always present) - console_handler = logging.StreamHandler() - console_handler.setFormatter(formatter) - root_logger.addHandler(console_handler) - - # File handler (optional) - if not Path("logging").exists(): - Path("logging").mkdir(parents=True, exist_ok=True) - if log_filename: - file_handler = logging.handlers.RotatingFileHandler( - f"logging/{log_filename}", maxBytes=5 * 1024 * 1024, backupCount=5 - ) - file_handler.setFormatter(formatter) - root_logger.addHandler(file_handler) - - -def extract_with_retry( - address: str, - etherscan_client: EtherscanClient, - chain: str, - table: str, - from_block: int, - to_block: int, - output_dir: Path, - max_retries: int = 3, -): - """Extract logs or transactions with automatic retry on failures. - - Args: - address: The contract address to extract data from - etherscan_client: Initialized EtherscanClient instance - chain: The blockchain network name - table: Either 'logs' or 'transactions' - from_block: Starting block number - to_block: Ending block number - output_dir: Directory to save output parquet files - max_retries: Maximum number of retry attempts for failed blocks - """ - output_path = ( - output_dir / f"{chain}_{address}_{table}_{from_block}_{to_block}.parquet" - ) - etherscan_to_parquet( - address=address, - etherscan_client=etherscan_client, - table=table, - from_block=from_block, - to_block=to_block, - output_path=output_path, - block_chunk_size=5_000, - ) - - # Retry failed blocks if error file exists - error_file = Path(f"logging/extract_error/{chain}_{address}_{table}.csv") - retries = 0 - while error_file.exists() and retries < max_retries: - logger.info(f"Retrying failed blocks (attempt {retries + 1}/{max_retries})") - retry_failed_blocks( - error_file=error_file, - table=table, - output_path=output_path, - ) - retries += 1 - n = pl.scan_parquet(output_path).select(pl.len()).collect().item() - min_block = ( - pl.scan_parquet(output_path) - .select(pl.col("blockNumber").min()) - .collect() - .item() - ) - max_block = ( - pl.scan_parquet(output_path) - .select(pl.col("blockNumber").max()) - .collect() - .item() - ) - logger.info(f"{chain} - {address} - {table} - {min_block}-{max_block}, {n} ✅") - - -def retry_failed_blocks(error_file: Path, table: str, output_path: Path): - """Retry failed block ranges with smaller chunk size.""" - - df = pd.read_csv(error_file) - - # Create resolved directory if it doesn't exist - resolved_dir = error_file.parent / "resolved" - resolved_dir.mkdir(parents=True, exist_ok=True) - # Generate resolved filename by adding timestamp - resolved_file_path = resolved_dir / f"{error_file.stem}_resolved.csv" - # Save resolved error file, appending if file exists - df.to_csv( - resolved_file_path, - mode="a", - header=not resolved_file_path.exists(), - index=False, - ) - os.remove(error_file) - - for _, row in df.iterrows(): - chain = row.chain - etherscan_client = EtherscanClient(chain=chain) - address = row.address - - from_block = row.from_block - to_block = row.to_block - block_chunk_size = int(row.block_chunk_size / 10) - - etherscan_to_parquet( - address=address, - etherscan_client=etherscan_client, - from_block=from_block, - to_block=to_block, - block_chunk_size=block_chunk_size, - table=table, - output_path=output_path, - ) - - -def rename_parquet_file(file_path: Path, **kwargs): - """ - To rename the parquet file to the actual blocks range of the data - Args: - file_path: Path to the parquet file - """ - min_block = ( - pl.scan_parquet(file_path).select(pl.col("blockNumber").min()).collect().item() - ) - max_block = ( - pl.scan_parquet(file_path).select(pl.col("blockNumber").max()).collect().item() - ) - new_file_path = file_path.with_name( - f"{file_path.stem}_{min_block}_{max_block}.parquet" - ) - new_file_path = ( - file_path.parent - / f"{kwargs['chain']}_{kwargs['address'].lower()}_{kwargs['table']}_{min_block}_{max_block}.parquet" - ) - - if file_path != new_file_path: - file_path.rename(new_file_path) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "-c", - "--chain", - type=str, - default="ethereum", - help="Chain name", - ) - parser.add_argument( - "-a", - "--address", - type=str, - help="Address", - ) - parser.add_argument( - "-d", - "--last_n_days", - type=int, - default=7, - help="Extract data from the last N days", - ) - parser.add_argument( - "--from_block", - type=parse_number_with_suffix, - help="From block (supports K/M/B suffixes, e.g., '18.5M')", - ) - parser.add_argument( - "--to_block", - type=parse_number_with_suffix, - help="To block (supports K/M/B suffixes, e.g., '20M')", - ) - parser.add_argument( - "--logs", - action="store_true", - help="Extract logs", - ) - parser.add_argument( - "--transactions", - action="store_true", - help="Extract transactions", - ) - parser.add_argument( - "-o", - "--output_dir", - type=str, - default=".data/raw", - help="Output directory", - ) - parser.add_argument( - "-v", - action="store_true", - help="Verbose logging (INFO)", - ) - parser.add_argument( - "-vv", - action="store_true", - help="Very verbose logging (DEBUG)", - ) - args = parser.parse_args() - - output_dir = Path(args.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - logging_level = "WARNING" - if args.vv: - logging_level = "DEBUG" - elif args.v: - logging_level = "INFO" - setup_logging(log_filename="extraction.log", level=logging_level) - - etherscan_client = EtherscanClient(chain=args.chain) - - if args.last_n_days: - - from_timestamp = int( - (datetime.now(timezone.utc) - timedelta(days=args.last_n_days)).timestamp() - ) - from_block = etherscan_client.get_block_number_by_timestamp(from_timestamp) - to_block = etherscan_client.get_latest_block() - else: - from_block = ( - args.from_block - or etherscan_client.get_contract_creation_block_number(args.address) - ) - to_block = args.to_block or etherscan_client.get_latest_block() - - if args.logs: - extract_with_retry( - address=args.address.lower(), - etherscan_client=etherscan_client, - chain=args.chain, - table="logs", - from_block=from_block, - to_block=to_block, - output_dir=output_dir, - ) - file_path = ( - output_dir - / f"{args.chain}_{args.address.lower()}_logs_{from_block}_{to_block}.parquet" - ) - rename_parquet_file( - file_path, chain=args.chain, address=args.address, table="logs" - ) - if args.transactions: - extract_with_retry( - address=args.address.lower(), - etherscan_client=etherscan_client, - chain=args.chain, - table="transactions", - from_block=from_block, - to_block=to_block, - output_dir=output_dir, - ) - file_path = ( - output_dir - / f"{args.chain}_{args.address.lower()}_transactions_{from_block}_{to_block}.parquet" - ) - rename_parquet_file( - file_path, chain=args.chain, address=args.address, table="transactions" - ) - - -if __name__ == "__main__": - main() diff --git a/scripts/el/extract_graphql.py b/scripts/el/extract_graphql.py deleted file mode 100644 index a464a7e..0000000 --- a/scripts/el/extract_graphql.py +++ /dev/null @@ -1,219 +0,0 @@ -#!/usr/bin/env python3 -""" -GraphQL data fetcher with streaming and batch modes. - -This script fetches data from a GraphQL endpoint and either: -1. Saves to Parquet file (batch mode) -2. Pushes directly to database (streaming mode) -""" - -import argparse, json, logging -from pathlib import Path - -from dotenv import load_dotenv - -load_dotenv() -from onchaindata.data_pipeline import Loader -from onchaindata.utils import PostgresClient, SnowflakeClient - -from onchaindata.data_extraction import GraphQLBatch, GraphQLStream - -logger = logging.getLogger(__name__) - - -def _add_block_filters_to_query( - query: str, table_name: str, from_block: int = None, to_block: int = None -) -> str: - """ - Add block number filters to a GraphQL query. - - Args: - query: Original GraphQL query string - table_name: Name of the table in the query - from_block: Minimum block number (inclusive) - to_block: Maximum block number (inclusive) - - Returns: - Modified query string with block filters - """ - import re - - # Build where clause - combine conditions in a single blockNumber object - block_conditions = [] - if from_block is not None: - block_conditions.append(f"_gte: {from_block}") - if to_block is not None: - block_conditions.append(f"_lte: {to_block}") - - if not block_conditions: - return query - - # Create proper GraphQL where clause with conditions in single blockNumber object - where_clause = f"blockNumber: {{{', '.join(block_conditions)}}}" - - # Find the table call in the query and add/modify where clause - # Pattern: tableName(...) or tableName(order_by: {...}) - pattern = rf"{table_name}\s*\((.*?)\)" - - def replacer(match): - existing_args = match.group(1).strip() - # Check if there's already a where clause - if "where:" in existing_args: - # Add conditions to existing where clause - # This is complex, so for now we'll just append - return f"{table_name}({existing_args}, where: {{{where_clause}}})" - elif existing_args: - # Has other args (like order_by), add where clause - return f"{table_name}({existing_args}, where: {{{where_clause}}})" - else: - # No existing args, add where clause - return f"{table_name}(where: {{{where_clause}}})" - - modified_query = re.sub(pattern, replacer, query, count=1) - return modified_query - - -def extract(args): - """ - Execute batch mode: fetch data once and save to Parquet. - - Args: - args: Parsed command-line arguments - """ - # Load query - if args.query_file: - with open(args.query_file, "r") as f: - query = f.read() - elif args.query: - query = args.query - else: - raise ValueError("Batch mode requires either --query or --query-file") - - # Modify query to include block number filters if provided - if args.from_block is not None or args.to_block is not None: - query = _add_block_filters_to_query( - query, args.graphql_table, args.from_block, args.to_block - ) - logger.info( - f"Applied block filters: from_block={args.from_block}, to_block={args.to_block}" - ) - - # Fetch data - logger.info(f"Fetching from: {args.endpoint}") - extractor = GraphQLBatch( - endpoint=args.endpoint, - query=query, - ) - df = extractor.extract_to_dataframe(args.graphql_table) - - if df.is_empty(): - logger.info("No data returned from query") - return - - logger.info(f"Fetched {len(df)} records") - - # Save to Parquet - min_block_number = df["blockNumber"].min() - max_block_number = df["blockNumber"].max() - output_path = ( - Path(args.output_dir) - / f"{args.file_name}_{min_block_number}_{max_block_number}.parquet" - ) - output_path.parent.mkdir(parents=True, exist_ok=True) - df.write_parquet(output_path) - - logger.info(f"Saved to: {output_path}") - - -def main(): - parser = argparse.ArgumentParser( - description="Fetch data from GraphQL endpoint with streaming or batch mode" - ) - - # GraphQL endpoint configuration - parser.add_argument( - "-e", - "--endpoint", - type=str, - default="http://localhost:8080/v1/graphql", - help="GraphQL endpoint URL", - ) - parser.add_argument( - "-q", - "--query", - type=str, - help="GraphQL query string (or use --query-file)", - ) - parser.add_argument( - "--query-file", - type=str, - help="Path to file containing GraphQL query", - default="scripts/el/stables_transfers.graphql", - ) - parser.add_argument( - "--graphql-table", - type=str, - help="Name of the table from GraphQL Endpoint, e.g. 'stablesTransfers'", - default="stablesTransfers", - ) - - # Block range filters - parser.add_argument( - "--from_block", - type=int, - help="Starting block number (inclusive)", - default=None, - ) - parser.add_argument( - "--to_block", - type=int, - help="Ending block number (inclusive)", - default=None, - ) - - # Output configuration - parser.add_argument( - "-o", - "--output_dir", - type=str, - default=".data/raw", - help="Output directory", - ) - parser.add_argument( - "-f", - "--file_name", - type=str, - help="Parquet file name to save data to, without extension", - default="data", - ) - - # Logging configuration - parser.add_argument( - "-v", - "--verbose", - action="count", - default=0, - help="Increase verbosity: -v for INFO, -vv for DEBUG", - ) - - args = parser.parse_args() - - # Configure logging based on verbosity - if args.verbose == 0: - log_level = logging.WARNING - elif args.verbose == 1: - log_level = logging.INFO - else: # >= 2 - log_level = logging.DEBUG - - logging.basicConfig( - level=log_level, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - extract(args) - - -if __name__ == "__main__": - main() diff --git a/scripts/el/load.py b/scripts/el/load.py deleted file mode 100644 index fb58426..0000000 --- a/scripts/el/load.py +++ /dev/null @@ -1,92 +0,0 @@ -import argparse -from dotenv import load_dotenv -import polars as pl - -load_dotenv() -from onchaindata.data_pipeline import Loader -from onchaindata.utils import SnowflakeClient, PostgresClient - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "-f", - "--file_path", - type=str, - help="File path", - required=True, - ) - parser.add_argument( - "-c", - "--client", - type=str, - help="Client name, either 'snowflake' or 'postgres'", - required=True, - ) - # warehouse/database is configured in the client - parser.add_argument( - "-s", - "--schema", - type=str, - help="Schema name", - required=True, - ) - parser.add_argument( - "-t", - "--table", - type=str, - help="Table name", - required=True, - ) - parser.add_argument( - "-w", - "--write_disposition", - type=str, - help="Write disposition, either 'append' or 'replace' or 'merge'", - default="append", - ) - parser.add_argument( - "-k", - "--primary_key", - type=str, - help="Comma-separated list of column names to use as primary key for merge. Required when -w merge. Example: 'contract_address,chain'", - default=None, - ) - - args = parser.parse_args() - if args.client == "snowflake": - client = SnowflakeClient().from_env() - elif args.client == "postgres": - client = PostgresClient.from_env() - else: - raise ValueError( - f"Invalid client: {args.client}, use 'snowflake' or 'postgres', or implement new client" - ) - - loader = Loader(client=client) - - if args.file_path.endswith(".csv"): - df = pl.read_csv(args.file_path) - elif args.file_path.endswith(".parquet"): - df = pl.read_parquet(args.file_path) - else: - raise ValueError( - f"Invalid file extension: {args.file_path}, use 'csv' or 'parquet'" - ) - - # Parse primary key if provided - primary_key = None - if args.primary_key: - primary_key = [col.strip() for col in args.primary_key.split(",")] - - loader.load_dataframe( - df=df, - schema=args.schema, - table_name=args.table, - write_disposition=args.write_disposition, - primary_key=primary_key, - ) - - -if __name__ == "__main__": - main() diff --git a/scripts/el/pg2sf_raw_transfer.py b/scripts/el/pg2sf_raw_transfer.py deleted file mode 100755 index cbfd9ea..0000000 --- a/scripts/el/pg2sf_raw_transfer.py +++ /dev/null @@ -1,159 +0,0 @@ -#!/usr/bin/env python3 - -""" -Move data from PostgreSQL (raw.raw_transfer) to Snowflake (raw.transfer). -Queries data from PostgreSQL within a block range and loads to Snowflake. -""" - -import argparse -import logging -from dotenv import load_dotenv -import polars as pl - -load_dotenv() -from onchaindata.utils import PostgresClient, SnowflakeClient -from onchaindata.data_pipeline import Loader - - -def setup_logging(verbose: int = 0): - """Setup logging based on verbosity level.""" - if verbose == 0: - level = logging.WARNING - elif verbose == 1: - level = logging.INFO - else: - level = logging.DEBUG - - logging.basicConfig( - level=level, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) - return logging.getLogger(__name__) - - -def query_postgres_data( - pg_client: PostgresClient, - from_block: int, - to_block: int, - logger: logging.Logger, -) -> pl.DataFrame: - """ - Query data from PostgreSQL raw.raw_transfer table within block range. - - Args: - pg_client: PostgresClient instance - from_block: Starting block number (inclusive) - to_block: Ending block number (inclusive) - logger: Logger instance - - Returns: - Polars DataFrame with queried data - """ - query = f""" - SELECT * - FROM raw.raw_transfer - WHERE block_number::integer >= {from_block} - AND block_number::integer <= {to_block} - ORDER BY block_number - """ - - logger.info(f"Querying PostgreSQL for blocks {from_block} to {to_block}") - logger.debug(f"Query: {query}") - with pg_client.get_connection() as conn: - df = pl.read_database(connection=conn, query=query) - - logger.info(f"Retrieved {len(df)} rows from PostgreSQL") - - return df - - -def load_to_snowflake( - df: pl.DataFrame, - sf_loader: Loader, - logger: logging.Logger, -) -> None: - """ - Load DataFrame to Snowflake raw.transfer table. - - Args: - df: Polars DataFrame to load - sf_loader: Loader instance configured with SnowflakeClient - logger: Logger instance - """ - if len(df) == 0: - logger.warning("No data to load to Snowflake") - return - - logger.info(f"Loading {len(df)} rows to Snowflake raw.transfer") - - result = sf_loader.load_dataframe( - df=df, - schema="raw", - table_name="raw_transfer", - write_disposition="append", - ) - - logger.info(f"Successfully loaded data to Snowflake: {result}") - - -def main(): - parser = argparse.ArgumentParser( - description="Move data from PostgreSQL raw.raw_transfer to Snowflake raw.transfer" - ) - parser.add_argument( - "--from_block", - type=int, - required=True, - help="Starting block number (inclusive)", - ) - parser.add_argument( - "--to_block", - type=int, - required=True, - help="Ending block number (inclusive)", - ) - parser.add_argument( - "-v", - "--verbose", - action="count", - default=0, - help="Increase verbosity level (use -v for INFO, -vv for DEBUG)", - ) - - args = parser.parse_args() - - logger = setup_logging(args.verbose) - - # Validate block range - if args.from_block > args.to_block: - raise ValueError( - f"from_block ({args.from_block}) must be <= to_block ({args.to_block})" - ) - - logger.info("Initializing database clients") - - # Step 1: Initialize clients - pg_client = PostgresClient.from_env() - sf_client = SnowflakeClient().from_env() - sf_loader = Loader(client=sf_client) - - # Step 2: Query data from PostgreSQL - df = query_postgres_data( - pg_client=pg_client, - from_block=args.from_block, - to_block=args.to_block, - logger=logger, - ) - - # Step 3: Load data to Snowflake - load_to_snowflake( - df=df, - sf_loader=sf_loader, - logger=logger, - ) - - logger.info("Data migration completed successfully") - - -if __name__ == "__main__": - main() diff --git a/scripts/el/scrape_etherscan.py b/scripts/el/scrape_etherscan.py deleted file mode 100644 index 40fd8a0..0000000 --- a/scripts/el/scrape_etherscan.py +++ /dev/null @@ -1,275 +0,0 @@ -#!/usr/bin/env python -""" -Script to scrape contract name tags from Etherscan. - -Reads addresses from a CSV file, scrapes their name tags, and saves the results -with a new 'name_tag' column. - -Usage: - uv run python scripts/el/scrape_etherscan.py -i addresses.csv -o addresses_with_tags.csv - uv run python scripts/el/scrape_etherscan.py -i addresses.csv -o addresses_with_tags.csv --address-column contract_address -""" - -import argparse -import logging -import sys -from pathlib import Path - -import pandas as pd - -from onchaindata.data_extraction.etherscan_scraper import EtherscanScraper -from onchaindata.data_extraction.etherscan import EtherscanClient - -logger = logging.getLogger(__name__) - - -def setup_logging(verbose: bool = False): - """Set up logging configuration.""" - level = logging.INFO if verbose else logging.WARNING - logging.basicConfig( - level=level, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - -def extract_contract_name_tags( - df: pd.DataFrame, - etherscan_client: EtherscanClient, - address_column: str = "address", - headless: bool = True, - timeout: int = 10, -) -> pd.DataFrame: - """ - Scrape name tags for addresses in a DataFrame. - - Args: - df: DataFrame containing addresses - address_column: Name of the column containing addresses - headless: Run browser in headless mode - timeout: Maximum wait time for page elements - - Returns: - DataFrame with added 'name_tag' column - """ - # Make a copy to avoid modifying the original - df = df.copy() - - # Validate address column exists - if address_column not in df.columns: - logger.error( - f"Column '{address_column}' not found. Available: {', '.join(df.columns)}" - ) - raise ValueError(f"Column '{address_column}' not found") - - # Initialize name_tag column if it doesn't exist - if "name_tag" not in df.columns: - df["name_tag"] = None - - total = len(df) - logger.info(f"Scraping {total} addresses from Etherscan") - - # Scrape name tags - with EtherscanScraper(headless=headless, timeout=timeout) as scraper: - for idx, row in df.iterrows(): - address = row[address_column] - - # Skip if address is None or empty - if pd.isna(address) or str(address).strip() == "": - logger.debug(f"Skipping empty address at row {idx}") - continue - - try: - name_tag = scraper.get_contract_name_tag(str(address)) - contract_metadata = etherscan_client.get_contract_metadata(address) - df.at[idx, "name_tag"] = name_tag - df.at[idx, "contract_name"] = contract_metadata["ContractName"] - logger.info( - f"[{idx + 1}/{total}] {address}: {name_tag or 'No tag found'} {contract_metadata['ContractName'] or 'No name found'}" - ) - except Exception as e: - logger.warning(f"[{idx + 1}/{total}] {address}: Error - {e}") - df.at[idx, "name_tag"] = None - - # Log summary - tagged_count = df["name_tag"].notna().sum() - named_count = df["contract_name"].notna().sum() - logger.info( - f"Completed: {tagged_count}/{total} addresses tagged, {named_count}/{total} addresses named" - ) - - return df - - -def main(): - parser = argparse.ArgumentParser( - description="Scrape contract name tags from Etherscan and save to CSV", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - parser.add_argument( - "-i", "--input", required=True, help="Input CSV file containing addresses" - ) - - parser.add_argument( - "-o", - "--output", - required=True, - help="Output CSV file to save results with name tags", - ) - - parser.add_argument( - "--address-column", - default="address", - help="Name of the column containing addresses (default: 'address')", - ) - - parser.add_argument( - "--no-headless", - action="store_true", - help="Run browser in visible mode (not headless)", - ) - - parser.add_argument( - "--timeout", - type=int, - default=10, - help="Maximum wait time for page elements in seconds (default: 10)", - ) - - parser.add_argument( - "--save-every", - type=int, - default=100, - help="Save progress every N rows (default: 100)", - ) - - parser.add_argument( - "-v", "--verbose", action="store_true", help="Enable verbose logging" - ) - - args = parser.parse_args() - - # Set up logging - setup_logging(args.verbose) - - try: - # Read input CSV - logger.info(f"Reading from: {args.input}") - df = pd.read_csv(args.input) - - # Validate address column exists - if args.address_column not in df.columns: - logger.error( - f"Column '{args.address_column}' not found. Available: {', '.join(df.columns)}" - ) - raise ValueError(f"Column '{args.address_column}' not found") - - # Check if output file exists to determine already processed addresses - processed_addresses = set() - if Path(args.output).exists(): - logger.info(f"Loading existing progress from: {args.output}") - existing_df = pd.read_csv(args.output) - processed_addresses = set( - existing_df[args.address_column].dropna().astype(str).str.lower() - ) - logger.info(f"Found {len(processed_addresses)} already processed addresses") - - total = len(df) - logger.info(f"Processing {total} addresses from Etherscan") - - # Initialize Etherscan client - etherscan_client = EtherscanClient(chain="ethereum") - - # Buffer to collect results before appending - results_buffer = [] - - # Scrape with periodic saves - with EtherscanScraper( - headless=not args.no_headless, timeout=args.timeout - ) as scraper: - for idx, row in df.iterrows(): - address = row[args.address_column] - - # Skip if address is None or empty - if pd.isna(address) or str(address).strip() == "": - logger.debug(f"Skipping empty address at row {idx}") - continue - - # Skip if already processed - if str(address).lower() in processed_addresses: - logger.debug(f"Skipping already processed address: {address}") - continue - - try: - name_tag = scraper.get_contract_name_tag(str(address)) - contract_metadata = etherscan_client.get_contract_metadata(address) - creation_block_number = ( - etherscan_client.get_contract_creation_block_number(address) - ) - - # Add to buffer - result = row.to_dict() - result["name_tag"] = name_tag - result["contract_name"] = contract_metadata["ContractName"] - result["is_contract"] = creation_block_number is not None - results_buffer.append(result) - - logger.info( - f"[{idx + 1}/{total}] {address}: {name_tag or 'No tag found'} | {contract_metadata['ContractName'] or 'No name found'}" - ) - except Exception as e: - logger.warning(f"[{idx + 1}/{total}] {address}: Error - {e}") - result = row.to_dict() - result["name_tag"] = None - result["contract_name"] = None - result["is_contract"] = False - results_buffer.append(result) - - # Append to CSV every N rows - if len(results_buffer) >= args.save_every: - append_df = pd.DataFrame(results_buffer) - append_df.to_csv( - args.output, - mode="a", - header=not Path(args.output).exists(), - index=False, - ) - logger.info(f"Appended {len(results_buffer)} rows to {args.output}") - - # Add newly saved addresses to processed set - processed_addresses.update( - append_df[args.address_column].astype(str).str.lower() - ) - results_buffer = [] - - # Append remaining results - if results_buffer: - append_df = pd.DataFrame(results_buffer) - append_df.to_csv( - args.output, - mode="a", - header=not Path(args.output).exists(), - index=False, - ) - logger.info(f"Appended final {len(results_buffer)} rows to {args.output}") - - # Read final output for summary - final_df = pd.read_csv(args.output) - tagged_count = final_df["name_tag"].notna().sum() - named_count = final_df["contract_name"].notna().sum() - logger.info( - f"Completed: {tagged_count}/{len(final_df)} addresses tagged, {named_count}/{len(final_df)} addresses named" - ) - logger.info(f"Total rows in output: {len(final_df)}") - - except KeyboardInterrupt: - logger.warning("Scraping interrupted by user") - sys.exit(1) - except Exception as e: - logger.error(f"Error: {e}", exc_info=args.verbose) - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/scripts/el/stables_transfers.graphql b/scripts/el/stables_transfers.graphql deleted file mode 100644 index b26d335..0000000 --- a/scripts/el/stables_transfers.graphql +++ /dev/null @@ -1,11 +0,0 @@ -query stablesTransfers { - stablesTransfers(order_by: { blockNumber: desc }) { - id - blockNumber - timestamp - contractAddress - from - to - value - } -} diff --git a/scripts/el/stream_graphql.py b/scripts/el/stream_graphql.py deleted file mode 100644 index 0fb505a..0000000 --- a/scripts/el/stream_graphql.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -""" -GraphQL data fetcher with streaming and batch modes. - -This script fetches data from a GraphQL endpoint and either: -1. Saves to Parquet file (batch mode) -2. Pushes directly to database (streaming mode) -""" - -import argparse, json, logging -from pathlib import Path - -from dotenv import load_dotenv - -load_dotenv() -from onchaindata.data_pipeline import Loader -from onchaindata.utils import PostgresClient, SnowflakeClient - -from onchaindata.data_extraction import GraphQLStream - -logger = logging.getLogger(__name__) - - -def stream(args): - """ - Execute streaming mode: continuously poll and push to database. - - Args: - args: Parsed command-line arguments - """ - # Validate arguments - if not all([args.database_client, args.schema, args.database_table, args.fields]): - raise ValueError( - "Streaming mode requires --database-client, --schema, --database-table, and --fields" - ) - - # Parse fields - fields = [f.strip() for f in args.fields.split(",")] - - # Initialize database client - if args.database_client == "snowflake": - client = SnowflakeClient().from_env() - elif args.database_client == "postgres": - client = PostgresClient.from_env() - else: - raise ValueError( - f"Invalid client: {args.database_client}, use 'snowflake' or 'postgres', or implement new client" - ) - - loader = Loader(client=client) - streamer = GraphQLStream( - endpoint=args.endpoint, - table_name=args.graphql_table, - fields=fields, - poll_interval=args.poll_interval, - ) - - streamer.stream( - loader=loader, - schema=args.schema, - table_name=args.database_table, - ) - - -def main(): - parser = argparse.ArgumentParser( - description="Fetch data from GraphQL endpoint with streaming or batch mode" - ) - - # GraphQL endpoint configuration - parser.add_argument( - "-e", - "--endpoint", - type=str, - default="http://localhost:8080/v1/graphql", - help="GraphQL endpoint URL", - ) - parser.add_argument( - "--graphql-table", - type=str, - help="Name of the table from GraphQL Endpoint, e.g. 'stablesTransfers'", - default="stablesTransfers", - ) - - parser.add_argument( - "--poll-interval", - type=int, - default=5, - help="Polling interval in seconds for streaming mode (default: 5)", - ) - - parser.add_argument( - "--fields", - type=str, - help="Comma-separated list of fields to fetch (required for streaming mode)", - default="id,blockNumber,timestamp,contractAddress,from,to,value", - ) - - # Database configuration (for streaming mode) - parser.add_argument( - "-c", - "--database-client", - type=str, - choices=["snowflake", "postgres"], - help="Client name (required for streaming mode)", - ) - parser.add_argument( - "-s", - "--schema", - type=str, - help="Schema name (required for streaming mode)", - ) - parser.add_argument( - "-t", - "--database-table", - type=str, - help="Target table name in database (required for streaming mode)", - ) - - # Logging configuration - parser.add_argument( - "-v", - "--verbose", - action="count", - default=0, - help="Increase verbosity: -v for INFO, -vv for DEBUG", - ) - - args = parser.parse_args() - - # Configure logging based on verbosity - if args.verbose == 0: - log_level = logging.WARNING - elif args.verbose == 1: - log_level = logging.INFO - else: # >= 2 - log_level = logging.DEBUG - - logging.basicConfig( - level=log_level, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - stream(args) - - -if __name__ == "__main__": - main() diff --git a/scripts/load_file.py b/scripts/load_file.py new file mode 100644 index 0000000..bc7a4e3 --- /dev/null +++ b/scripts/load_file.py @@ -0,0 +1,221 @@ +import argparse, os +from dotenv import load_dotenv +import polars as pl +import dlt + +load_dotenv() +from utils.database_client import SnowflakeClient, PostgresClient + + +def load_dataframe( + client: SnowflakeClient, + df: pl.DataFrame, + schema: str, + table_name: str, + write_disposition: str = "append", + primary_key: list[str] = None, + **kwargs, +): + """ + Load Polars DataFrame directly to the database using DLT. + + Args: + df: Polars DataFrame to load + schema: Target schema name + table_name: Target table name + write_disposition: How to handle existing data ("append", "replace", "merge") + primary_key: List of column names to use as primary key for merge operations. + Required when write_disposition="merge". + Example: ["contract_address", "chain"] + + Returns: + DLT pipeline run result + """ + # Validate merge requirements + if write_disposition == "merge" and not primary_key: + raise ValueError( + "primary_key must be specified when write_disposition='merge'. " + "Example: primary_key=['contract_address', 'chain']" + ) + + # Convert DataFrame to list of dicts for DLT + data = df.to_dicts() + + # Create a DLT resource from the data + resource = dlt.resource(data, name=table_name) + + # Apply primary key hint for merge operations + if primary_key: + resource.apply_hints(primary_key=primary_key) + + # Create pipeline with destination-specific configuration + pipeline = dlt.pipeline( + pipeline_name="dataframe_loader", + destination=client.get_dlt_destination(), + dataset_name=schema, + ) + + # Load data + result = pipeline.run( + resource, + table_name=table_name, + write_disposition=write_disposition, + ) + + return result + + +def upload_to_snowflake_stage( + client: SnowflakeClient, + file_path: str, + schema: str, + stage_name: str, + # create_if_not_exists: bool = True, +): + """ + Upload a file to a Snowflake stage. + + Args: + client: SnowflakeClient instance + file_path: Path to the file to upload + stage_name: Name of the Snowflake stage (e.g., 'my_stage') + schema: Schema name to set context. + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + file_name = os.path.basename(file_path) + + print(f"Uploading {file_path} to Snowflake stage {stage_name}...") + + with client.get_connection() as conn: + cursor = conn.cursor() + try: + cursor.execute(f"USE SCHEMA {schema};") + cursor.execute(f"CREATE STAGE IF NOT EXISTS {stage_name};") + # PUT command uploads local file to stage + put_command = f"PUT file://{file_path} @{stage_name} AUTO_COMPRESS=FALSE OVERWRITE=TRUE" + print(f"Executing: {put_command}") + cursor.execute(put_command) + + result = cursor.fetchall() + if result: + print(f"Upload result: {result}") + + print(f"✓ Successfully uploaded {file_name} to stage {stage_name}") + + except Exception as e: + print(f"✗ Error uploading file to stage: {str(e)}") + raise + finally: + cursor.close() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-f", + "--file_path", + type=str, + help="File path", + required=True, + ) + parser.add_argument( + "-c", + "--client", + type=str, + help="Client name, either 'snowflake' or 'postgres'", + required=True, + ) + # warehouse/database is configured in the client + parser.add_argument( + "-s", + "--schema", + type=str, + help="Schema name (not required when using --stage)", + required=False, + ) + parser.add_argument( + "-t", + "--table", + type=str, + help="Table name (not required when using --stage)", + required=False, + ) + parser.add_argument( + "-w", + "--write_disposition", + type=str, + help="Write disposition, either 'append' or 'replace' or 'merge'", + default="append", + ) + parser.add_argument( + "-k", + "--primary_key", + type=str, + help="Comma-separated list of column names to use as primary key for merge. Required when -w merge. Example: 'contract_address,chain'", + default=None, + ) + parser.add_argument( + "--stage", + action="store_true", + help="Upload file to Snowflake stage instead of loading into table. Only works with -c snowflake. Requires --stage_name.", + ) + parser.add_argument( + "--stage_name", + type=str, + help="Snowflake stage name (e.g., 'my_stage' or 'my_database.my_schema.my_stage')", + default=None, + ) + + args = parser.parse_args() + if args.client == "snowflake": + client = SnowflakeClient().from_env() + elif args.client == "postgres": + client = PostgresClient.from_env() + else: + raise ValueError( + f"Invalid client: {args.client}, use 'snowflake' or 'postgres', or implement new client" + ) + + # Handle Snowflake stage upload + if args.stage: + if args.client != "snowflake": + raise ValueError("--stage option only works with -c snowflake") + if not args.stage_name: + raise ValueError("--stage_name is required when using --stage") + + upload_to_snowflake_stage( + client=client, + file_path=args.file_path, + schema=args.schema, + stage_name=args.stage_name, + ) + return + + if args.file_path.endswith(".csv"): + df = pl.read_csv(args.file_path) + elif args.file_path.endswith(".parquet"): + df = pl.read_parquet(args.file_path) + else: + raise ValueError( + f"Invalid file extension: {args.file_path}, use 'csv' or 'parquet'" + ) + + # Parse primary key if provided + primary_key = None + if args.primary_key: + primary_key = [col.strip() for col in args.primary_key.split(",")] + + load_dataframe( + client=client, + df=df, + schema=args.schema, + table_name=args.table, + write_disposition=args.write_disposition, + primary_key=primary_key, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/utils/__init__.py b/scripts/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/onchaindata/utils/snowflake_client.py b/scripts/utils/database_client.py similarity index 51% rename from src/onchaindata/utils/snowflake_client.py rename to scripts/utils/database_client.py index 7f83257..905e3f9 100644 --- a/src/onchaindata/utils/snowflake_client.py +++ b/scripts/utils/database_client.py @@ -1,10 +1,108 @@ -import snowflake.connector -import os +from abc import ABC, abstractmethod from contextlib import contextmanager -from typing import Optional, Dict, Any +from typing import Optional, Any, Dict, List +import os + +import psycopg import dlt +import snowflake.connector + + +class BaseDatabaseClient(ABC): + """Abstract base class for database clients with common patterns.""" + + def __init__(self): + """Initialize with connection parameters.""" + self.connection_params = self._build_connection_params() + self._engine = None + + @abstractmethod + def _build_connection_params(self) -> Dict[str, Any]: + """Build connection parameters from environment or config.""" + pass + + @abstractmethod + @contextmanager + def get_connection(self): + """Context manager for database connections.""" + pass + + @abstractmethod + def get_dlt_destination(self): + """Get DLT destination for this database.""" + pass + + @staticmethod + def _get_env_var(key: str, default: str = None) -> str: + """Helper method to get environment variables.""" + return os.getenv(key, default) + + +class PostgresClient(BaseDatabaseClient): + """Object-oriented PostgreSQL client for database operations.""" + + def __init__( + self, + host: str = None, + port: int = None, + database: str = None, + user: str = None, + password: str = None, + ): + """ + Initialize PostgresDestination with database configuration. + + Args: + host: Database host + port: Database port + database: Database name + user: Database user + password: Database password + """ + self.host = host + self.port = port + self.database = database + self.user = user + self.password = password + super().__init__() + + @classmethod + def from_env(cls) -> "PostgresClient": + """Create from environment variables""" + return cls( + host=cls._get_env_var("POSTGRES_HOST"), + port=int(cls._get_env_var("POSTGRES_PORT", "5432")), + database=cls._get_env_var("POSTGRES_DB"), + user=cls._get_env_var("POSTGRES_USER"), + password=cls._get_env_var("POSTGRES_PASSWORD"), + ) + + def _build_connection_params(self) -> Dict[str, Any]: + """Build connection parameters from instance variables.""" + return { + "host": self.host, + "port": self.port, + "dbname": self.database, + "user": self.user, + "password": self.password, + } + + @contextmanager + def get_connection(self): + """Context manager for PostgreSQL connections.""" + conn = None + try: + conn = psycopg.connect(**self.connection_params) + yield conn + finally: + if conn: + conn.close() -from .base_client import BaseDatabaseClient + def get_dlt_destination(self) -> Any: + """Return DLT destination for pipeline operations.""" + params = self.connection_params + connection_url = f"postgresql://{params['user']}:{params['password']}@{params['host']}:{params['port']}/{params['dbname']}" + return dlt.destinations.postgres(connection_url) class SnowflakeClient(BaseDatabaseClient): diff --git a/src/onchaindata/__init__.py b/src/onchaindata/__init__.py deleted file mode 100644 index 0a2889e..0000000 --- a/src/onchaindata/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""FA DAE2 Capstone Project - Main package.""" diff --git a/src/onchaindata/config/__init__.py b/src/onchaindata/config/__init__.py deleted file mode 100644 index 96567cf..0000000 --- a/src/onchaindata/config/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Configuration management for onchaindata package.""" - -from .settings import APIs, ColumnSchemas, APIUrls, APIConfig - -__all__ = [ - "APIs", - "ColumnSchemas", - "APIUrls", - "APIConfig", -] diff --git a/src/onchaindata/config/chainid.json b/src/onchaindata/config/chainid.json deleted file mode 100644 index a0920bd..0000000 --- a/src/onchaindata/config/chainid.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "ethereum": 1, - "sepolia_testnet": 11155111, - "holesky_testnet": 17000, - "hoodi_testnet": 560048, - "abstract": 2741, - "abstract_sepolia_testnet": 11124, - "ape_chain_curtis_testnet": 33111, - "ape_chain": 33139, - "arbitrum_nova": 42170, - "arbitrum_one": 42161, - "arbitrum_sepolia_testnet": 421614, - "avalanche_c-_chain": 43114, - "avalanche_fuji_testnet": 43113, - "base": 8453, - "base_sepolia_testnet": 84532, - "berachain": 80094, - "berachain_bepolia_testnet": 80069, - "bit_torrent_chain": 199, - "bit_torrent_chain_testnet": 1028, - "blast": 81457, - "blast_sepolia_testnet": 168587773, - "bnb_smart_chain": 56, - "bnb_smart_chain_testnet": 97, - "celo_alfajores_testnet": 44787, - "celo": 42220, - "cronos": 25, - "fraxtal": 252, - "fraxtal_testnet": 2522, - "gnosis": 100, - "hyper_evm": 999, - "linea": 59144, - "linea_sepolia_testnet": 59141, - "mantle": 5000, - "mantle_sepolia_testnet": 5003, - "memecore": 4352, - "memecore_testnet": 43521, - "moonbase_alpha_testnet": 1287, - "monad_testnet": 10143, - "moonbeam": 1284, - "moonriver": 1285, - "op": 10, - "op_sepolia_testnet": 11155420, - "polygon_amoy_testnet": 80002, - "polygon": 137, - "katana": 747474, - "sei": 1329, - "sei_testnet": 1328, - "scroll": 534352, - "scroll_sepolia_testnet": 534351, - "sonic_blaze_testnet": 57054, - "sonic": 146, - "sophon": 50104, - "sophon_sepolia_testnet": 531050104, - "swellchain": 1923, - "swellchain_testnet": 1924, - "taiko_hekla_l2_testnet": 167009, - "taiko": 167000, - "unichain": 130, - "unichain_sepolia_testnet": 1301, - "wemix3.0": 1111, - "wemix3.0_testnet": 1112, - "world": 480, - "world_sepolia_testnet": 4801, - "xai": 660279, - "xai_sepolia_testnet": 37714555429, - "xdc_apothem_testnet": 51, - "xdc": 50, - "zk_sync": 324, - "zk_sync_sepolia_testnet": 300, - "op_bnb": 204, - "op_bnb_testnet": 5611 -} \ No newline at end of file diff --git a/src/onchaindata/data_extraction/__init__.py b/src/onchaindata/data_extraction/__init__.py deleted file mode 100644 index 1b68332..0000000 --- a/src/onchaindata/data_extraction/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Data collection modules for blockchain data.""" - -from .etherscan import EtherscanClient, EtherscanExtractor -from .graphql import GraphQLBatch, GraphQLStream - -__all__ = ["EtherscanClient", "EtherscanExtractor", "GraphQLBatch", "GraphQLStream"] diff --git a/src/onchaindata/data_extraction/base.py b/src/onchaindata/data_extraction/base.py deleted file mode 100644 index 881593b..0000000 --- a/src/onchaindata/data_extraction/base.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Abstract base classes for onchain_sleuth package.""" - -import time -import logging -import requests -from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Type, Callable - -from .rate_limiter import RateLimitedSession, RateLimitStrategy - - -class APIError(Exception): - """Exception raised for API-related errors.""" - - pass - - -@dataclass -class APIConfig: - """Configuration for API clients.""" - - base_url: str - api_key: Optional[str] = None - rate_limit: float = 5.0 # requests per second - timeout: int = 30 - retry_attempts: int = 1 - retry_delay_base: float = 1.0 # base delay for exponential backoff - - -class BaseAPIClient(ABC): - """Abstract base class for all API clients.""" - - def __init__( - self, - config: APIConfig, - rate_limit_strategy: RateLimitStrategy = RateLimitStrategy.FIXED_INTERVAL, - ): - self.config = config - self.rate_limit_strategy = rate_limit_strategy - self.logger = logging.getLogger(self.__class__.__name__) - self._session = self._create_session() - - def _create_session(self) -> RateLimitedSession: - """Create configured session with rate limiting.""" - return RateLimitedSession( - calls_per_second=self.config.rate_limit, - strategy=self.rate_limit_strategy, - logger=self.logger, - ) - - @abstractmethod - def _build_request_params(self, **kwargs) -> Dict[str, Any]: - """Build request parameters specific to the API.""" - pass - - @abstractmethod - def _handle_response(self, response: requests.Response) -> Any: - """Handle API response and extract data.""" - pass - - def make_request(self, endpoint: str, params: Dict[str, Any] = None) -> Any: - """Generic request method with error handling and retry logic.""" - # Handle full URLs (when endpoint starts with http) - if endpoint.startswith(("http://", "https://")): - url = endpoint - else: - url = ( - f"{self.config.base_url.rstrip('/')}/{endpoint.lstrip('/')}" - if endpoint - else self.config.base_url - ) - request_params = self._build_request_params(**(params or {})) - - last_exception = None - for attempt in range(self.config.retry_attempts): - try: - response = self._session.get( - url, params=request_params, timeout=self.config.timeout - ) - return self._handle_response(response) - except Exception as e: - last_exception = e - if attempt < self.config.retry_attempts - 1: - delay = self.config.retry_delay_base * (2**attempt) - self.logger.warning( - f"Request failed (attempt {attempt + 1}): {e}. Retrying in {delay}s..." - ) - time.sleep(delay) - else: - self.logger.error( - f"Request failed after {self.config.retry_attempts} attempts: {e}" - ) - - raise APIError() from last_exception - - -class BaseDLTSource(ABC): - """Abstract base class for DLT sources.""" - - def __init__(self, client: BaseAPIClient): - self.client = client - self.logger = logging.getLogger(self.__class__.__name__) - - @abstractmethod - def get_source_name(self) -> str: - """Return the DLT source name.""" - pass - - @abstractmethod - def create_dlt_source(self, **kwargs) -> Any: - """Create DLT source configuration.""" - pass - - -class BaseSource(ABC): - """Abstract base class for DLT source factories.""" - - def __init__(self, client: BaseAPIClient): - self.client = client - self.logger = logging.getLogger(self.__class__.__name__) - - @abstractmethod - def get_available_sources(self) -> List[str]: - """Return list of available source names.""" - pass - - -class BaseDecoder(ABC): - """Abstract base class for decoders.""" - - def __init__(self): - self.logger = logging.getLogger(self.__class__.__name__) - - @abstractmethod - def decode(self, data: Any, **kwargs) -> Any: - """Decode data according to specific strategy.""" - pass diff --git a/src/onchaindata/data_extraction/etherscan.py b/src/onchaindata/data_extraction/etherscan.py deleted file mode 100644 index 161e6b2..0000000 --- a/src/onchaindata/data_extraction/etherscan.py +++ /dev/null @@ -1,730 +0,0 @@ -"""Etherscan API client implementation.""" - -import os, json, csv, logging - -from datetime import datetime -from pathlib import Path -from typing import Any, Dict, List, Optional, Literal, Tuple -from dataclasses import dataclass - -import polars as pl -import dlt -from dlt.sources.rest_api import rest_api_source -from dlt.sources.helpers.rest_client import paginators - -from .base import BaseAPIClient, BaseSource, APIConfig -from .base import APIError - -logger = logging.getLogger(__name__) - - -@dataclass -class APIs: - """API-specific settings.""" - - etherscan_api_key: Optional[str] = None - coingecko_api_key: Optional[str] = None - - # Rate limits (requests per second) - etherscan_rate_limit: float = 5.0 - coingecko_rate_limit: float = 5.0 - defillama_rate_limit: float = 10.0 - - def __post_init__(self): - # Load from environment if not provided - if self.etherscan_api_key is None: - self.etherscan_api_key = os.getenv("ETHERSCAN_API_KEY") - - -class APIUrls: - """API endpoint URLs.""" - - ETHERSCAN = "https://api.etherscan.io/v2/api" - - -class EtherscanClient(BaseAPIClient): - """Etherscan API client implementation.""" - - @classmethod - def _load_chainid_mapping(cls) -> Dict[str, int]: - """Load chain name to chainid mapping from resource file.""" - # Get the path to the chainid.json file relative to this module - current_file = Path(__file__) - chainid_path = current_file.parent.parent / "config" / "chainid.json" - - try: - with chainid_path.open("r") as f: - return json.load(f) - except FileNotFoundError: - raise FileNotFoundError( - f"Chain ID mapping file not found at {chainid_path}" - ) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON in chain ID mapping file: {e}") - - def __init__( - self, - chainid: Optional[int] = None, - chain: Optional[str] = None, - api_key: Optional[str] = None, - calls_per_second: float = 5.0, - ): - # Validate that exactly one of chainid or chain is provided - if chainid is not None and chain is not None: - raise ValueError( - "Cannot specify both 'chainid' and 'chain' parameters. Use only one." - ) - if chainid is None and chain is None: - raise ValueError("Must specify either 'chainid' or 'chain' parameter.") - - # Resolve chainid from chain name if needed - chainid_mapping = self._load_chainid_mapping() - if chain is not None: - if chain not in chainid_mapping: - available_chains = ", ".join(sorted(chainid_mapping.keys())) - raise ValueError( - f"Unknown chain '{chain}'. Available chains: {available_chains}" - ) - chainid = chainid_mapping[chain] - - self.chainid = chainid - chain_name_mapping = {v: k for k, v in chainid_mapping.items()} - self.chain = chain_name_mapping.get(chainid, "unknown") - - # Create APIs instance to load environment variables - apis = APIs() - config = APIConfig( - base_url=APIUrls.ETHERSCAN, - api_key=api_key or apis.etherscan_api_key, - rate_limit=calls_per_second, - ) - super().__init__(config) - - def _build_request_params(self, **kwargs) -> Dict[str, Any]: - """Build request parameters with chain ID and API key.""" - return {"chainid": self.chainid, "apikey": self.config.api_key, **kwargs} - - def _handle_response(self, response) -> Any: - """Handle Etherscan API response.""" - response.raise_for_status() - data = response.json() - - if data.get("status") == "0": - message = data.get("message", "Etherscan API error") - if "rate limit" in message.lower(): - raise APIError(f"Rate limit exceeded: {message}") - raise APIError(f"API error: {message}") - - return data["result"] - - def get_latest_block( - self, timestamp: Optional[int] = None, closest: str = "before" - ) -> int: - """Get the latest block number or block closest to timestamp.""" - if timestamp is None: - timestamp = int(datetime.now().timestamp()) - - pass # Getting latest block - - params = { - "module": "block", - "action": "getblocknobytime", - "timestamp": timestamp, - "closest": closest, - } - result = self.make_request("", params) - - latest_block = int(result) - pass # Latest block retrieved - return latest_block - - def get_contract_abi( - self, address: str, save: bool = True, save_dir: str = "data/abi" - ) -> Dict[str, Any]: - """Get contract ABI and optionally save to file.""" - # Get contract metadata to check for proxy - try: - contract_metadata = self.get_contract_metadata(address) - except Exception as e: - logger.warning(f"Could not get metadata for {address}: {e}") - contract_metadata = {} - - # Fetch main contract ABI - params = { - "module": "contract", - "action": "getabi", - "address": address, - } - result = self.make_request("", params) - abi = json.loads(result) - - # Check if it's a proxy and fetch implementation ABI - implementation_abi = None - implementation_address = None - if contract_metadata.get("Proxy"): - implementation_address = contract_metadata.get("Implementation") - if implementation_address: - pass # Contract is a proxy, fetching implementation ABI - try: - impl_params = { - "module": "contract", - "action": "getabi", - "address": implementation_address, - } - impl_result = self.make_request("", impl_params) - implementation_abi = json.loads(impl_result) - except Exception as e: - logger.warning( - f"Could not fetch implementation ABI for {implementation_address}: {e}" - ) - - if save: - self._save_abi( - address, abi, implementation_address, implementation_abi, save_dir - ) - - return abi, implementation_abi - - def get_contract_metadata(self, address: str) -> Dict[str, Any]: - """Get contract metadata including proxy status.""" - pass # Fetching metadata for contract - - params = { - "module": "contract", - "action": "getsourcecode", - "address": address, - } - result = self.make_request("", params) - - source_data = result[0] if isinstance(result, list) else result - if not source_data: - raise ValueError(f"No source code found for contract {address}") - - return { - "ContractName": source_data.get("ContractName"), - "Proxy": source_data.get("Proxy") == "1", - "Implementation": source_data.get("Implementation"), - } - - def get_contract_creation_block_number(self, address: str): - """Get contract creation block number for given address.""" - try: - return int(self.get_contract_creation_info(address)["blockNumber"]) - except Exception as e: - logger.warning(f"Could not get contract creation block number for") - return None - - def get_transaction_receipt( - self, txhash: str, save: bool = True, save_dir: str = "data/receipts" - ) -> Dict[str, Any]: - """Get transaction receipt for given transaction hash.""" - # Ensure txhash has 0x prefix - if not txhash.startswith("0x"): - txhash = "0x" + txhash - - pass # Getting transaction receipt - - params = { - "module": "proxy", - "action": "eth_getTransactionReceipt", - "txhash": txhash, - } - - result = self.make_request("", params) - - if result is None: - raise APIError(f"Transaction receipt not found for {txhash}") - - if save: - self._save_receipt(txhash, result, save_dir) - - return result - - def get_contract_creation_info( - self, contract_addresses: List[str] - ) -> Dict[str, Any]: - """Get contract creation information for one or more addresses.""" - if isinstance(contract_addresses, str): - contract_addresses = [contract_addresses] - - pass # Getting creation info for contracts - - params = { - "module": "contract", - "action": "getcontractcreation", - "contractaddresses": ",".join(contract_addresses), - } - result = self.make_request("", params) - - if len(contract_addresses) == 1: - return result[0] if isinstance(result, list) else result - return result - - def _save_abi( - self, - address: str, - abi: Dict[str, Any], - implementation_address: Optional[str], - implementation_abi: Optional[Dict[str, Any]], - save_dir: str, - ): - """Save ABI(s) to file.""" - os.makedirs(save_dir, exist_ok=True) - # create a csv file with the following columns: address, implementation_address - csv_path = os.path.join(save_dir, "implementation.csv") - - # Check if file exists to determine whether to write headers - if not csv_path.exists(): - # Create new file with headers - with csv_path.open("w") as f: - f.write("address,implementation_address\n") - - with csv_path.open("a") as f: - f.write(f"{address},{implementation_address}\n") - df = pl.read_csv(csv_path).unique() - df.write_csv(csv_path, separator=",", has_header=True) - - # Save main ABI - main_path = Path(save_dir) / f"{address}.json" - with open(main_path, "w") as f: - json.dump(abi, f, indent=2) - pass # ABI saved - - # Save implementation ABI if available - if implementation_abi: - impl_path = Path(save_dir) / f"{implementation_address}.json" - with open(impl_path, "w") as f: - json.dump(implementation_abi, f, indent=2) - pass # Implementation ABI saved - - def _save_receipt(self, txhash: str, receipt: Dict[str, Any], save_dir: str): - """Save transaction receipt to file.""" - os.makedirs(save_dir, exist_ok=True) - - receipt_path = Path(save_dir) / f"{txhash}.json" - with open(receipt_path, "w") as f: - json.dump(receipt, f, indent=2) - pass # Receipt saved - - def get_block_number_by_timestamp(self, timestamp: int) -> int: - """Get block number by timestamp.""" - params = { - "module": "block", - "action": "getblocknobytime", - "timestamp": timestamp, - "closest": "before", - } - return int(self.make_request("", params)) - - -class EtherscanSource(BaseSource): - """Creating DLT source for Etherscan data.""" - - def __init__(self, client: EtherscanClient): - super().__init__(client) - - def get_available_sources(self) -> List[str]: - """Return list of available source names.""" - return ["logs", "transactions"] - - def create_dlt_source(self, **kwargs): - """Create DLT source for Etherscan API.""" - session = self.client._session - return rest_api_source( - { - "client": { - "base_url": self.client.config.base_url, - "paginator": paginators.PageNumberPaginator( - base_page=1, total_path=None, page_param="page" - ), - "session": session, - }, - "resources": [ - { - "name": "", # Etherscan result is not nested - "endpoint": {"params": kwargs}, - }, - ], - } - ) - - def logs( - self, - address: str, - from_block: int = 0, - to_block: str = "latest", - offset: int = 1000, - ): - """Get event logs for a given address.""" - - def _fetch(): - params = { - "module": "logs", - "action": "getLogs", - "address": address, - "fromBlock": from_block, - "toBlock": to_block, - "offset": offset, - "chainid": self.client.chainid, - "apikey": self.client.config.api_key, - } - - pass # Fetching logs for address - - source = self.create_dlt_source(**params) - for item in source: - item["chain"] = self.client.chain - yield item - - return dlt.resource( - _fetch, - columns={ - "topics": {"data_type": "json"}, - "time_stamp": {"data_type": "bigint"}, - "block_number": {"data_type": "bigint"}, - "log_index": {"data_type": "bigint"}, - "transaction_index": {"data_type": "bigint"}, - "gas_price": {"data_type": "bigint"}, - "gas_used": {"data_type": "bigint"}, - }, - ) - - def transactions( - self, - address: str, - from_block: int = 0, - to_block: str = "latest", - offset: int = 1000, - sort: str = "asc", - ): - """Get transactions for a given address.""" - - def _fetch(): - params = { - "module": "account", - "action": "txlist", - "address": address, - "startblock": from_block, - "endblock": to_block, - "offset": offset, - "sort": sort, - "chainid": self.client.chainid, - "apikey": self.client.config.api_key, - } - - pass # Fetching transactions for address - - source = self.create_dlt_source(**params) - for item in source: - item["chain"] = self.client.chain - yield item - - return dlt.resource( - _fetch, - columns={ - "time_stamp": {"data_type": "bigint"}, - }, - ) - - -class EtherscanExtractor: - """Extracts historical blockchain data and saves to Parquet files. - - Example: - extractor = EtherscanExtractor(etherscan_client) - """ - - def __init__( - self, - client: EtherscanClient, - ): - self.client = client - - def to_parquet( - self, - address: str, - from_block: int, - to_block: str, - chain: str, - table: str, - output_path: Path, - offset: int = 1000, - ): - """ - Core building block function to extract blockchain data to Parquet files. - - Args: - address: Contract address to extract data for - chain: Blockchain network (default: "ethereum") - table: logs, transactions - from_block: Starting block number - to_block: Ending block number or "latest" - offset: Number of records per API call - - Returns: - Path to the created Parquet file, or None if no data extracted - """ - source = EtherscanSource(self.client) - data = [] - - try: - if table == "logs": - resource = source.logs( - address=address, - from_block=from_block, - to_block=to_block, - offset=offset, - ) - - for record in resource: - record = self._process_hex_fields(record) - data.append(record) - - elif table == "transactions": - resource = source.transactions( - address=address, - from_block=from_block, - to_block=to_block, - offset=offset, - ) - - for record in resource: - record = self._process_hex_fields(record) - data.append(record) - - if len(data) == 0: - logger.info( - f"{chain} - {address} - {table} - {from_block}-{to_block}, ✅ no data extracted" - ) - elif len(data) >= 10_000: - """ - when >10000 records, potential missing data due to too many records, so no saving to parquet but - log to logging/extract_error and will retry with smaller chunk size - """ - logger.warning( - f"{chain} - {address} - {table} - {from_block}-{to_block}, ⚠️ {len(data)} records" - ) - - _log_error_to_csv( - address=address, - chain=chain, - table=table, - from_block=from_block, - to_block=to_block, - block_chunk_size=to_block - from_block, - ) - else: - self._save_to_parquet( - chain, address, table, from_block, to_block, data, output_path - ) - except: - """ - this is unlikely to happen, but if it does, treat it as a potential missing data and - log to logging/extract_error and will retry with smaller chunk size - """ - logger.error( - f"{chain} - {address} - {table} - {from_block}-{to_block}, 🚨 unexpected error" - ) - - _log_error_to_csv( - address=address, - chain=chain, - table=table, - from_block=from_block, - to_block=to_block, - block_chunk_size=to_block - from_block, - ) - - def _process_hex_fields(self, record: Dict[str, Any]) -> Dict[str, Any]: - """Convert numeric string fields to integers (handles both hex and decimal formats).""" - numeric_fields = { - "blockNumber", - "timeStamp", - "logIndex", - "transactionIndex", - "gasPrice", - "gasUsed", - "nonce", - "value", - "gas", - "cumulativeGasUsed", - "confirmations", - } - - for field in numeric_fields: - if field in record and isinstance(record[field], str): - str_value = record[field].strip() - if str_value and str_value != "0x": - try: - # Auto-detect format based on prefix - if str_value.startswith("0x"): - # Hex format (logs API) - record[field] = int(str_value, 16) - else: - # Decimal format (transactions API) - record[field] = int(str_value, 10) - except ValueError: - logger.warning( - f"Could not convert {field} value '{str_value}' to int" - ) - record[field] = None - else: - record[field] = None - - return record - - def _save_to_parquet( - self, - chain: str, - address: str, - table: str, - from_block: int, - to_block: int, - data: List[Dict[str, Any]], - output_path: Path, - ) -> str: - """Save data to Parquet file organized by chain_address_table_from_block_to_block.""" - try: - # Create Polars DataFrame - new_lf = pl.LazyFrame(data) - - # Save to Parquet (append if file exists) - if output_path.exists(): - # Use scan_parquet for memory efficiency, then concatenate and collect - existing_lf = pl.scan_parquet(output_path) - - # Ensure column order matches between existing and new data - existing_columns = existing_lf.collect_schema().names() - new_lf = new_lf.select(existing_columns) - - combined_lf = pl.concat([existing_lf, new_lf]).unique() - combined_lf.collect().write_parquet(output_path) - - logger.info( - f"{chain} - {address} - {table} - {from_block}-{to_block}: {len(data)} saved" - ) - - else: - # Write new file - new_lf.collect().write_parquet(output_path) - logger.info( - f"{chain} - {address} - {table} - {from_block}-{to_block}: {len(data)} saved" - ) - - return output_path - - except Exception as e: - logger.error(f"Failed to save data: {e}") - raise - - -def etherscan_to_parquet( - address: str, - etherscan_client: EtherscanClient, - from_block: int, - to_block: int, - output_path: Path, - table, - block_chunk_size: int = 10_000, -) -> Path: - """Backfill blockchain data from Etherscan to protocol-grouped Parquet files in chunks. - - This function efficiently extracts historical data and saves to Parquet files: - - logs of a specific contract address - - transactions to a specific contract address (optional) - - Args: - address: Ethereum address to fetch data for (case-insensitive) - etherscan_client: Configured Etherscan API client for data retrieval - from_block: Starting block number (uses contract creation block if None) - to_block: Ending block number (uses latest block if None) - block_chunk_size: Number of blocks to process per chunk (default: 50,000) - data_dir: Path for parquet file output - table: logs, transactions - Returns: - Path to the parquet file - """ - - extractor = EtherscanExtractor(etherscan_client) - address = address.lower() - chain = etherscan_client.chain - - end_block = to_block - - assert from_block < to_block, "from_block must be less than to_block" - assert ( - block_chunk_size < to_block - from_block - ), "block_chunk_size must be less than to_block - from_block" - - for chunk_start in range( - from_block, end_block - block_chunk_size + 1, block_chunk_size - ): - chunk_end = min(chunk_start + block_chunk_size, end_block) - - extractor.to_parquet( - address=address, - chain=chain, - table=table, - from_block=chunk_start, - to_block=chunk_end, - offset=1000, - output_path=output_path, - ) - if chunk_end < end_block: - extractor.to_parquet( - address=address, - chain=chain, - table=table, - from_block=chunk_end, - to_block=end_block, - offset=1000, - output_path=output_path, - ) - return output_path - - -def _log_error_to_csv( - address: str, - chain: str, - table: str, - from_block: int, - to_block: int, - block_chunk_size: int, -): - """Log an error to CSV file.""" - error_file = Path(f"logging/extract_error/{chain}_{address}_{table}.csv") - error_file.parent.mkdir(parents=True, exist_ok=True) - - # CSV headers - csv_headers = [ - "timestamp", - "address", - "chain", - "from_block", - "to_block", - "block_chunk_size", - ] - - # Check if file exists to determine if we need to write headers - file_exists = error_file.exists() - - # Append error to CSV file immediately - with error_file.open("a", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - - # Write headers if this is a new file - if not file_exists: - writer.writerow(csv_headers) - - timestamp = datetime.now().isoformat() - - writer.writerow( - [ - timestamp, - address, - chain, - from_block, - to_block, - block_chunk_size, - ] - ) diff --git a/src/onchaindata/data_extraction/etherscan_scraper.py b/src/onchaindata/data_extraction/etherscan_scraper.py deleted file mode 100644 index 0dd0c39..0000000 --- a/src/onchaindata/data_extraction/etherscan_scraper.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -Etherscan web scraper using Selenium to extract contract name tags. -""" - -import logging -import time -from typing import Optional - -from selenium import webdriver -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.chrome.service import Service -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import WebDriverWait -from selenium.common.exceptions import TimeoutException, NoSuchElementException -from webdriver_manager.chrome import ChromeDriverManager - -logger = logging.getLogger(__name__) - - -class EtherscanScraper: - """ - Web scraper for extracting contract name tags from Etherscan. - - Example: - >>> scraper = EtherscanScraper() - >>> name_tag = scraper.get_contract_name_tag("0x94cc50e4521bd271c1a997a3a4dc815c2f920b41") - >>> print(name_tag) - 'Curve: crvUSDSUSD-f Pool' - >>> scraper.close() - """ - - BASE_URL = "https://etherscan.io/address/" - - def __init__(self, headless: bool = True, timeout: int = 10): - """ - Initialize the Etherscan scraper. - - Args: - headless: Run browser in headless mode (no GUI) - timeout: Maximum wait time for page elements (seconds) - """ - self.timeout = timeout - self.driver = self._setup_driver(headless) - logger.info("EtherscanScraper initialized") - - def _setup_driver(self, headless: bool) -> webdriver.Chrome: - """ - Set up Chrome WebDriver with appropriate options. - - Args: - headless: Run browser in headless mode - - Returns: - Configured Chrome WebDriver instance - """ - chrome_options = Options() - - if headless: - chrome_options.add_argument("--headless") - - # Additional options for better performance and reliability - chrome_options.add_argument("--no-sandbox") - chrome_options.add_argument("--disable-dev-shm-usage") - chrome_options.add_argument("--disable-blink-features=AutomationControlled") - chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") - - # Set up Chrome driver with automatic driver management - service = Service(ChromeDriverManager().install()) - driver = webdriver.Chrome(service=service, options=chrome_options) - - return driver - - def get_contract_name_tag(self, address: str) -> Optional[str]: - """ - Extract the name tag for a given contract address. - - Args: - address: Ethereum contract address (with or without '0x' prefix) - - Returns: - Contract name tag if found, None otherwise - - Example: - >>> scraper = EtherscanScraper() - >>> name = scraper.get_contract_name_tag("0x94cc50e4521bd271c1a997a3a4dc815c2f920b41") - >>> print(name) - 'Curve: crvUSDSUSD-f Pool' - """ - # Normalize address - if not address.startswith("0x"): - address = f"0x{address}" - - address = address.lower() - url = f"{self.BASE_URL}{address}" - - logger.info(f"Fetching name tag for address: {address}") - - try: - # Navigate to the address page - self.driver.get(url) - - # Wait for the page to load - time.sleep(1) - - # Try to find the name tag element - # The name tag appears in a span with class "hash-tag text-truncate" - try: - wait = WebDriverWait(self.driver, self.timeout) - name_tag_element = wait.until( - EC.presence_of_element_located((By.CSS_SELECTOR, "span.hash-tag.text-truncate")) - ) - - name_tag = name_tag_element.text.strip() - logger.info(f"Found name tag: {name_tag}") - return name_tag - - except TimeoutException: - logger.warning(f"No name tag found for address {address} (element not found within timeout)") - return None - except NoSuchElementException: - logger.warning(f"No name tag found for address {address} (element does not exist)") - return None - - except Exception as e: - logger.error(f"Error scraping address {address}: {e}") - return None - - def get_contract_info(self, address: str) -> dict: - """ - Extract comprehensive contract information including name tag and other details. - - Args: - address: Ethereum contract address - - Returns: - Dictionary containing contract information - - Example: - >>> scraper = EtherscanScraper() - >>> info = scraper.get_contract_info("0x94cc50e4521bd271c1a997a3a4dc815c2f920b41") - >>> print(info['name_tag']) - 'Curve: crvUSDSUSD-f Pool' - """ - name_tag = self.get_contract_name_tag(address) - - return { - "address": address.lower(), - "name_tag": name_tag, - "has_name_tag": name_tag is not None, - "url": f"{self.BASE_URL}{address.lower()}" - } - - def close(self): - """Close the WebDriver and clean up resources.""" - if self.driver: - self.driver.quit() - logger.info("EtherscanScraper closed") - - def __enter__(self): - """Context manager entry.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.close() diff --git a/src/onchaindata/data_extraction/graphql.py b/src/onchaindata/data_extraction/graphql.py deleted file mode 100644 index 277887a..0000000 --- a/src/onchaindata/data_extraction/graphql.py +++ /dev/null @@ -1,242 +0,0 @@ -#!/usr/bin/env python3 -""" -GraphQL data extractor with batch and streaming modes. - -This module extracts data from a GraphQL endpoint and either: -1. Saves to Parquet file (batch mode) -2. Pushes directly to database (streaming mode) -""" - -import time, logging -from typing import Optional, Dict, Any, List - -import requests -import polars as pl - -from onchaindata.data_pipeline import Loader - -logger = logging.getLogger(__name__) - - -class GraphQLBatch: - """Fetches data from GraphQL endpoint with streaming and batch modes.""" - - def __init__( - self, - endpoint: str, - query: str, - ): - """ - Initialize GraphQL fetcher. - - Args: - endpoint: GraphQL endpoint URL - query: GraphQL query string - """ - self.endpoint = endpoint - self.query = query - self.session = requests.Session() - - def extract(self) -> Dict[str, Any]: - """ - Execute GraphQL query and return results. - - Returns: - GraphQL response data - """ - payload = { - "query": self.query, - } - - response = self.session.post( - self.endpoint, - json=payload, - headers={"Content-Type": "application/json"}, - ) - response.raise_for_status() - - data = response.json() - if "errors" in data: - raise ValueError(f"GraphQL errors: {data['errors']}") - - return data.get("data", {}) - - def extract_to_dataframe(self, table_name: str) -> pl.DataFrame: - """ - Extract data and convert to Polars DataFrame. - - Args: - table_name: Name of the table/query result to extract - - Returns: - Polars DataFrame with fetched data - """ - data = self.extract() - if table_name not in data: - raise ValueError( - f"Table '{table_name}' not found in response. Available: {list(data.keys())}" - ) - - records = data[table_name] - if not records: - return pl.DataFrame() - - return pl.DataFrame(records) - - -class GraphQLStream: - """Fetches data from GraphQL endpoint in streaming mode with polling.""" - - def __init__( - self, - endpoint: str, - table_name: str, - fields: List[str], - poll_interval: int = 5, - ): - """ - Initialize streaming fetcher. - - Args: - endpoint: GraphQL endpoint URL - table_name: Name of the table/query to fetch - fields: List of fields to fetch - poll_interval: Seconds to wait between polls - """ - self.endpoint = endpoint - self.table_name = table_name - self.fields = fields - self.poll_interval = poll_interval - self.last_seen_block_number: Optional[int] = None - - def _build_query(self, where_clause: Optional[str] = None) -> str: - """ - Build GraphQL query dynamically. - - Args: - where_clause: Optional WHERE clause filter - - Returns: - GraphQL query string - """ - fields_str = "\n ".join(self.fields) - where_str = f", where: {{{where_clause}}}" if where_clause else "" - - query = f""" - query {{ - {self.table_name}( - order_by: {{blockNumber: desc}} - {where_str} - ) {{ - {fields_str} - }} - }} - """.strip() - return query - - def _get_last_block_number_from_db( - self, loader: Loader, schema: str, table_name: str - ) -> Optional[int]: - """ - Query database to get the last maximum block number. - - Args: - loader: Loader instance for database operations - schema: Target schema name - table_name: Target table name - - Returns: - Maximum block number, or None if table is empty or doesn't exist - """ - try: - # Get database connection from loader's client - with loader.client.get_connection() as conn: - with conn.cursor() as cur: - query = f""" - SELECT MAX(block_number)::INTEGER - FROM {schema}.{table_name} - """ - cur.execute(query) - result = cur.fetchone() - return result[0] if result and result[0] is not None else None - except Exception as e: - return None - - def stream(self, loader: Loader, schema: str, table_name: str): - """ - Stream data from GraphQL endpoint to database. - - Automatically resumes from the last record in the database based on block number. - - Args: - loader: Loader instance for database operations - schema: Target schema name - table_name: Target table name - """ - logger.info(f"Starting streaming mode: {self.endpoint}") - logger.info(f"Target: {schema}.{table_name}") - - self.last_seen_block_number = self._get_last_block_number_from_db( - loader, schema, table_name - ) - logger.info(f"Last seen block number: {self.last_seen_block_number}") - if self.last_seen_block_number is not None: - logger.info(f"Resuming from block number = {self.last_seen_block_number}") - else: - logger.info("No existing data found, starting fresh") - - poll_count = 1 - total_records = 0 - - try: - while True: - # Build query with current state - where_clause = None - if self.last_seen_block_number is not None: - # Build WHERE clause for incremental fetch - where_clause = ( - f"blockNumber: {{_gt: {self.last_seen_block_number}}}" - ) - - query = self._build_query(where_clause) - - extractor = GraphQLBatch( - endpoint=self.endpoint, - query=query, - ) - - # Fetch data - df = extractor.extract_to_dataframe(self.table_name) - if not df.is_empty(): - records_count = len(df) - total_records += records_count - - # Load DataFrame directly to database - loader.load_dataframe( - df=df, - schema=schema, - table_name=table_name, - write_disposition="append", - ) - - # Update last seen value - if "blockNumber" in df.columns: - self.last_seen_block_number = df["blockNumber"].max() - logger.info( - f"[Poll {poll_count}] - {records_count} new records, new max block number in the database: {self.last_seen_block_number}" - ) - - poll_count += 1 - else: - logger.info( - f"[Poll {poll_count}] - No records fetched, waiting for next poll..." - ) - poll_count += 1 - - # Wait before next poll - time.sleep(self.poll_interval) - - except KeyboardInterrupt: - logger.info(f"\n\nStreaming stopped by user.") - logger.info(f"Total polls: {poll_count}") - logger.info(f"Total records: {total_records}") diff --git a/src/onchaindata/data_extraction/rate_limiter.py b/src/onchaindata/data_extraction/rate_limiter.py deleted file mode 100644 index b773c50..0000000 --- a/src/onchaindata/data_extraction/rate_limiter.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Rate limiting utilities for API clients.""" - -import time -import logging -import requests -from enum import Enum -from typing import Optional - - -class RateLimitStrategy(Enum): - """Rate limiting strategies.""" - FIXED_INTERVAL = "fixed_interval" - EXPONENTIAL_BACKOFF = "exponential_backoff" - - -class RateLimitedSession(requests.Session): - """Enhanced rate-limited session with multiple strategies.""" - - def __init__( - self, - calls_per_second: float = 5.0, - strategy: RateLimitStrategy = RateLimitStrategy.FIXED_INTERVAL, - logger: Optional[logging.Logger] = None - ): - super().__init__() - self.calls_per_second = calls_per_second - self.strategy = strategy - self.logger = logger or logging.getLogger(__name__) - self.last_request_time = 0 - self.request_count = 0 - self.min_interval = 1.0 / calls_per_second - - def request(self, method: str, url: str, **kwargs) -> requests.Response: - """Make a rate-limited request.""" - self._apply_rate_limiting() - self.request_count += 1 - response = super().request(method, url, **kwargs) - return response - - def _apply_rate_limiting(self): - """Apply rate limiting based on configured strategy.""" - current_time = time.time() - time_since_last = current_time - self.last_request_time - - if self.strategy == RateLimitStrategy.FIXED_INTERVAL: - if time_since_last < self.min_interval: - sleep_time = self.min_interval - time_since_last - pass # Rate limiting applied - time.sleep(sleep_time) - elif self.strategy == RateLimitStrategy.EXPONENTIAL_BACKOFF: - # For exponential backoff, we'd need to track consecutive failures - # This is a simplified implementation - if time_since_last < self.min_interval: - backoff_time = min(self.min_interval * (2 ** (self.request_count % 5)), 60) - pass # Exponential backoff applied - time.sleep(backoff_time) - - self.last_request_time = time.time() \ No newline at end of file diff --git a/src/onchaindata/data_pipeline/__init__.py b/src/onchaindata/data_pipeline/__init__.py deleted file mode 100644 index dc86baa..0000000 --- a/src/onchaindata/data_pipeline/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Data pipeline modules for loading data to databases.""" - -from .loaders import Loader - -__all__ = ["Loader"] diff --git a/src/onchaindata/data_pipeline/loaders.py b/src/onchaindata/data_pipeline/loaders.py deleted file mode 100644 index ac647ec..0000000 --- a/src/onchaindata/data_pipeline/loaders.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python3 - -"""Unified loader for loading Parquet files to various destinations.""" - -import os -from pathlib import Path -from typing import Union - -import dlt -from dlt.sources.filesystem import filesystem, read_parquet -import polars as pl - -from ..utils import PostgresClient, SnowflakeClient - - -class Loader: - """Unified loader class for loading data to different destinations.""" - - def __init__( - self, - client: Union[PostgresClient, SnowflakeClient], - ): - """ - Initialize the Loader. - - Args: - client: Database client instance (PostgresClient or SnowflakeClient) - If not provided, will be created from environment variables - """ - self.client = client - - def load_parquet( - self, - file_path: Union[str, Path], - schema: str, - table_name: str, - write_disposition: str = "append", - ): - """ - Load Parquet file to the configured destination using DLT. - - Args: - file_path: Path to the Parquet file - schema: Target schema name - table_name: Target table name - write_disposition: How to handle existing data ("append", "replace", "merge") - - Returns: - DLT pipeline run result - """ - # Convert Path to string if needed - if isinstance(file_path, Path): - file_path = file_path.as_posix() - - # Create filesystem source - fs_source = filesystem(bucket_url=".", file_glob=file_path) - - # Read parquet with special handling for logs table - parquet_resource = fs_source | read_parquet() - if table_name == "logs": - parquet_resource.apply_hints( - columns={"topics": {"data_type": "json", "nullable": True}} - ) - - # Create pipeline with destination-specific configuration - pipeline = dlt.pipeline( - pipeline_name="parquet_loader", - destination=self.client.get_dlt_destination(), - dataset_name=schema, - ) - - # Load data - result = pipeline.run( - parquet_resource, - table_name=table_name, - write_disposition=write_disposition, - ) - - return result - - def load_dataframe( - self, - df: pl.DataFrame, - schema: str, - table_name: str, - write_disposition: str = "append", - primary_key: list[str] = None, - **kwargs, - ): - """ - Load Polars DataFrame directly to the database using DLT. - - Args: - df: Polars DataFrame to load - schema: Target schema name - table_name: Target table name - write_disposition: How to handle existing data ("append", "replace", "merge") - primary_key: List of column names to use as primary key for merge operations. - Required when write_disposition="merge". - Example: ["contract_address", "chain"] - - Returns: - DLT pipeline run result - """ - # Validate merge requirements - if write_disposition == "merge" and not primary_key: - raise ValueError( - "primary_key must be specified when write_disposition='merge'. " - "Example: primary_key=['contract_address', 'chain']" - ) - - # Convert DataFrame to list of dicts for DLT - data = df.to_dicts() - - # Create a DLT resource from the data - resource = dlt.resource(data, name=table_name) - - # Apply primary key hint for merge operations - if primary_key: - resource.apply_hints(primary_key=primary_key) - - # Create pipeline with destination-specific configuration - pipeline = dlt.pipeline( - pipeline_name="dataframe_loader", - destination=self.client.get_dlt_destination(), - dataset_name=schema, - ) - - # Load data - result = pipeline.run( - resource, - table_name=table_name, - write_disposition=write_disposition, - ) - - return result diff --git a/src/onchaindata/utils/__init__.py b/src/onchaindata/utils/__init__.py deleted file mode 100644 index 8e49d52..0000000 --- a/src/onchaindata/utils/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Utility modules for database clients and helpers.""" - -from .postgres_client import PostgresClient -from .snowflake_client import SnowflakeClient - -__all__ = ["PostgresClient", "SnowflakeClient"] diff --git a/src/onchaindata/utils/base_client.py b/src/onchaindata/utils/base_client.py deleted file mode 100644 index 2c726f8..0000000 --- a/src/onchaindata/utils/base_client.py +++ /dev/null @@ -1,34 +0,0 @@ -from abc import ABC, abstractmethod -from contextlib import contextmanager -from typing import Optional, Any, Dict, List -import os - - -class BaseDatabaseClient(ABC): - """Abstract base class for database clients with common patterns.""" - - def __init__(self): - """Initialize with connection parameters.""" - self.connection_params = self._build_connection_params() - self._engine = None - - @abstractmethod - def _build_connection_params(self) -> Dict[str, Any]: - """Build connection parameters from environment or config.""" - pass - - @abstractmethod - @contextmanager - def get_connection(self): - """Context manager for database connections.""" - pass - - @abstractmethod - def get_dlt_destination(self): - """Get DLT destination for this database.""" - pass - - @staticmethod - def _get_env_var(key: str, default: str = None) -> str: - """Helper method to get environment variables.""" - return os.getenv(key, default) diff --git a/src/onchaindata/utils/chain.py b/src/onchaindata/utils/chain.py deleted file mode 100644 index 391f283..0000000 --- a/src/onchaindata/utils/chain.py +++ /dev/null @@ -1,17 +0,0 @@ -import json -from pathlib import Path -from typing import Optional - - -def get_chainid(chain: str, chainid_data: Optional[dict] = None) -> int: - """Get the chainid for a given chain name.""" - chainid_json = Path(__file__).parent.parent / "config/chainid.json" - if chainid_data is None: - with open(chainid_json, "r") as f: - chainid_data = json.load(f) - pass # Loaded chainid.json - try: - chainid = chainid_data[chain] - return chainid - except KeyError: - raise ValueError(f"Chain {chain} not found in .config/chainid.json") diff --git a/src/onchaindata/utils/postgres_client.py b/src/onchaindata/utils/postgres_client.py deleted file mode 100644 index c0cfbf9..0000000 --- a/src/onchaindata/utils/postgres_client.py +++ /dev/null @@ -1,76 +0,0 @@ -from typing import Optional, Any, Dict -import os -from contextlib import contextmanager - -import psycopg -from sqlalchemy import create_engine -import dlt - -from .base_client import BaseDatabaseClient - - -class PostgresClient(BaseDatabaseClient): - """Object-oriented PostgreSQL client for database operations.""" - - def __init__( - self, - host: str = None, - port: int = None, - database: str = None, - user: str = None, - password: str = None, - ): - """ - Initialize PostgresDestination with database configuration. - - Args: - host: Database host - port: Database port - database: Database name - user: Database user - password: Database password - """ - self.host = host - self.port = port - self.database = database - self.user = user - self.password = password - super().__init__() - - @classmethod - def from_env(cls) -> "PostgresClient": - """Create from environment variables""" - return cls( - host=cls._get_env_var("POSTGRES_HOST"), - port=int(cls._get_env_var("POSTGRES_PORT", "5432")), - database=cls._get_env_var("POSTGRES_DB"), - user=cls._get_env_var("POSTGRES_USER"), - password=cls._get_env_var("POSTGRES_PASSWORD"), - ) - - def _build_connection_params(self) -> Dict[str, Any]: - """Build connection parameters from instance variables.""" - return { - "host": self.host, - "port": self.port, - "dbname": self.database, - "user": self.user, - "password": self.password, - } - - @contextmanager - def get_connection(self): - """Context manager for PostgreSQL connections.""" - conn = None - try: - conn = psycopg.connect(**self.connection_params) - yield conn - finally: - if conn: - conn.close() - - def get_dlt_destination(self) -> Any: - """Return DLT destination for pipeline operations.""" - params = self.connection_params - connection_url = f"postgresql://{params['user']}:{params['password']}@{params['host']}:{params['port']}/{params['dbname']}" - return dlt.destinations.postgres(connection_url) diff --git a/uv.lock b/uv.lock index bc354ab..d0ba4e0 100644 --- a/uv.lock +++ b/uv.lock @@ -852,6 +852,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/8e/abdd3f14d735b2929290a018ecf133c901be4874b858dd1c604b9319f064/greenlet-3.2.4-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2523e5246274f54fdadbce8494458a2ebdcdbc7b802318466ac5606d3cded1f8", size = 587684 }, { url = "https://files.pythonhosted.org/packages/5d/65/deb2a69c3e5996439b0176f6651e0052542bb6c8f8ec2e3fba97c9768805/greenlet-3.2.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1987de92fec508535687fb807a5cea1560f6196285a4cde35c100b8cd632cc52", size = 1116647 }, { url = "https://files.pythonhosted.org/packages/3f/cc/b07000438a29ac5cfb2194bfc128151d52f333cee74dd7dfe3fb733fc16c/greenlet-3.2.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:55e9c5affaa6775e2c6b67659f3a71684de4c549b3dd9afca3bc773533d284fa", size = 1142073 }, + { url = "https://files.pythonhosted.org/packages/67/24/28a5b2fa42d12b3d7e5614145f0bd89714c34c08be6aabe39c14dd52db34/greenlet-3.2.4-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c9c6de1940a7d828635fbd254d69db79e54619f165ee7ce32fda763a9cb6a58c", size = 1548385 }, + { url = "https://files.pythonhosted.org/packages/6a/05/03f2f0bdd0b0ff9a4f7b99333d57b53a7709c27723ec8123056b084e69cd/greenlet-3.2.4-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:03c5136e7be905045160b1b9fdca93dd6727b180feeafda6818e6496434ed8c5", size = 1613329 }, { url = "https://files.pythonhosted.org/packages/d8/0f/30aef242fcab550b0b3520b8e3561156857c94288f0332a79928c31a52cf/greenlet-3.2.4-cp311-cp311-win_amd64.whl", hash = "sha256:9c40adce87eaa9ddb593ccb0fa6a07caf34015a29bf8d344811665b573138db9", size = 299100 }, { url = "https://files.pythonhosted.org/packages/44/69/9b804adb5fd0671f367781560eb5eb586c4d495277c93bde4307b9e28068/greenlet-3.2.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3b67ca49f54cede0186854a008109d6ee71f66bd57bb36abd6d0a0267b540cdd", size = 274079 }, { url = "https://files.pythonhosted.org/packages/46/e9/d2a80c99f19a153eff70bc451ab78615583b8dac0754cfb942223d2c1a0d/greenlet-3.2.4-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddf9164e7a5b08e9d22511526865780a576f19ddd00d62f8a665949327fde8bb", size = 640997 }, @@ -861,6 +863,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586 }, { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281 }, { url = "https://files.pythonhosted.org/packages/3f/c7/12381b18e21aef2c6bd3a636da1088b888b97b7a0362fac2e4de92405f97/greenlet-3.2.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:20fb936b4652b6e307b8f347665e2c615540d4b42b3b4c8a321d8286da7e520f", size = 1151142 }, + { url = "https://files.pythonhosted.org/packages/27/45/80935968b53cfd3f33cf99ea5f08227f2646e044568c9b1555b58ffd61c2/greenlet-3.2.4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ee7a6ec486883397d70eec05059353b8e83eca9168b9f3f9a361971e77e0bcd0", size = 1564846 }, + { url = "https://files.pythonhosted.org/packages/69/02/b7c30e5e04752cb4db6202a3858b149c0710e5453b71a3b2aec5d78a1aab/greenlet-3.2.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:326d234cbf337c9c3def0676412eb7040a35a768efc92504b947b3e9cfc7543d", size = 1633814 }, { url = "https://files.pythonhosted.org/packages/e9/08/b0814846b79399e585f974bbeebf5580fbe59e258ea7be64d9dfb253c84f/greenlet-3.2.4-cp312-cp312-win_amd64.whl", hash = "sha256:a7d4e128405eea3814a12cc2605e0e6aedb4035bf32697f72deca74de4105e02", size = 299899 }, { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814 }, { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073 }, @@ -870,6 +874,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497 }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662 }, { url = "https://files.pythonhosted.org/packages/a2/15/0d5e4e1a66fab130d98168fe984c509249c833c1a3c16806b90f253ce7b9/greenlet-3.2.4-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:d25c5091190f2dc0eaa3f950252122edbbadbb682aa7b1ef2f8af0f8c0afefae", size = 1149210 }, + { url = "https://files.pythonhosted.org/packages/1c/53/f9c440463b3057485b8594d7a638bed53ba531165ef0ca0e6c364b5cc807/greenlet-3.2.4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6e343822feb58ac4d0a1211bd9399de2b3a04963ddeec21530fc426cc121f19b", size = 1564759 }, + { url = "https://files.pythonhosted.org/packages/47/e4/3bb4240abdd0a8d23f4f88adec746a3099f0d86bfedb623f063b2e3b4df0/greenlet-3.2.4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:ca7f6f1f2649b89ce02f6f229d7c19f680a6238af656f61e0115b24857917929", size = 1634288 }, { url = "https://files.pythonhosted.org/packages/0b/55/2321e43595e6801e105fcfdee02b34c0f996eb71e6ddffca6b10b7e1d771/greenlet-3.2.4-cp313-cp313-win_amd64.whl", hash = "sha256:554b03b6e73aaabec3745364d6239e9e012d64c68ccd0b8430c64ccc14939a8b", size = 299685 }, { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586 }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346 }, @@ -877,6 +883,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659 }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355 }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512 }, + { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508 }, + { url = "https://files.pythonhosted.org/packages/0d/da/343cd760ab2f92bac1845ca07ee3faea9fe52bee65f7bcb19f16ad7de08b/greenlet-3.2.4-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:015d48959d4add5d6c9f6c5210ee3803a830dce46356e3bc326d6776bde54681", size = 1680760 }, { url = "https://files.pythonhosted.org/packages/e3/a5/6ddab2b4c112be95601c13428db1d8b6608a8b6039816f2ba09c346c08fc/greenlet-3.2.4-cp314-cp314-win_amd64.whl", hash = "sha256:e37ab26028f12dbb0ff65f29a8d3d44a765c61e729647bf2ddfbbed621726f01", size = 303425 }, ] @@ -1897,63 +1905,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/af/11/0cc63f9f321ccf63886ac203336777140011fb669e739da36d8db3c53b98/numpy-2.3.3-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:2e267c7da5bf7309670523896df97f93f6e469fb931161f483cd6882b3b1a5dc", size = 12971844 }, ] -[[package]] -name = "onchaindata" -version = "0.1.0" -source = { editable = "." } -dependencies = [ - { name = "dbt-core" }, - { name = "dbt-postgres" }, - { name = "dbt-snowflake" }, - { name = "dlt", extra = ["postgres"] }, - { name = "jupyter" }, - { name = "mkdocs" }, - { name = "mkdocs-material" }, - { name = "pandas" }, - { name = "polars" }, - { name = "psycopg" }, - { name = "pyarrow" }, - { name = "python-dotenv" }, - { name = "rich" }, - { name = "selenium" }, - { name = "snowflake-connector-python" }, - { name = "sqlalchemy" }, - { name = "webdriver-manager" }, -] - -[package.dev-dependencies] -dev = [ - { name = "sqlfluff" }, - { name = "sqlfluff-templater-dbt" }, -] - -[package.metadata] -requires-dist = [ - { name = "dbt-core", specifier = ">=1.10.13" }, - { name = "dbt-postgres", specifier = ">=1.9.1" }, - { name = "dbt-snowflake", specifier = ">=1.10.2" }, - { name = "dlt", extras = ["postgres"], specifier = ">=1.17.0" }, - { name = "jupyter", specifier = ">=1.1.1" }, - { name = "mkdocs", specifier = ">=1.6.1" }, - { name = "mkdocs-material", specifier = ">=9.6.22" }, - { name = "pandas", specifier = ">=2.3.3" }, - { name = "polars", specifier = ">=1.33.1" }, - { name = "psycopg", specifier = ">=3.2.10" }, - { name = "pyarrow", specifier = ">=21.0.0" }, - { name = "python-dotenv", specifier = ">=1.0.0" }, - { name = "rich", specifier = ">=14.1.0" }, - { name = "selenium", specifier = ">=4.27.1" }, - { name = "snowflake-connector-python", specifier = ">=3.17.4" }, - { name = "sqlalchemy", specifier = ">=2.0.43" }, - { name = "webdriver-manager", specifier = ">=4.0.2" }, -] - -[package.metadata.requires-dev] -dev = [ - { name = "sqlfluff", specifier = ">=3.5.0" }, - { name = "sqlfluff-templater-dbt", specifier = ">=3.5.0" }, -] - [[package]] name = "orderly-set" version = "5.5.0" @@ -3427,6 +3378,63 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a9/5c/bfd6bd0bf979426d405cc6e71eceb8701b148b16c21d2dc3c261efc61c7b/sqlparse-0.5.3-py3-none-any.whl", hash = "sha256:cf2196ed3418f3ba5de6af7e82c694a9fbdbfecccdfc72e281548517081f16ca", size = 44415 }, ] +[[package]] +name = "stables-analytics" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "dbt-core" }, + { name = "dbt-postgres" }, + { name = "dbt-snowflake" }, + { name = "dlt", extra = ["postgres"] }, + { name = "jupyter" }, + { name = "mkdocs" }, + { name = "mkdocs-material" }, + { name = "pandas" }, + { name = "polars" }, + { name = "psycopg" }, + { name = "pyarrow" }, + { name = "python-dotenv" }, + { name = "rich" }, + { name = "selenium" }, + { name = "snowflake-connector-python" }, + { name = "sqlalchemy" }, + { name = "webdriver-manager" }, +] + +[package.dev-dependencies] +dev = [ + { name = "sqlfluff" }, + { name = "sqlfluff-templater-dbt" }, +] + +[package.metadata] +requires-dist = [ + { name = "dbt-core", specifier = ">=1.10.13" }, + { name = "dbt-postgres", specifier = ">=1.9.1" }, + { name = "dbt-snowflake", specifier = ">=1.10.2" }, + { name = "dlt", extras = ["postgres"], specifier = ">=1.17.0" }, + { name = "jupyter", specifier = ">=1.1.1" }, + { name = "mkdocs", specifier = ">=1.6.1" }, + { name = "mkdocs-material", specifier = ">=9.6.22" }, + { name = "pandas", specifier = ">=2.3.3" }, + { name = "polars", specifier = ">=1.33.1" }, + { name = "psycopg", specifier = ">=3.2.10" }, + { name = "pyarrow", specifier = ">=21.0.0" }, + { name = "python-dotenv", specifier = ">=1.0.0" }, + { name = "rich", specifier = ">=14.1.0" }, + { name = "selenium", specifier = ">=4.27.1" }, + { name = "snowflake-connector-python", specifier = ">=3.17.4" }, + { name = "sqlalchemy", specifier = ">=2.0.43" }, + { name = "webdriver-manager", specifier = ">=4.0.2" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "sqlfluff", specifier = ">=3.5.0" }, + { name = "sqlfluff-templater-dbt", specifier = ">=3.5.0" }, +] + [[package]] name = "stack-data" version = "0.6.3"