diff --git a/cli/__init__.py b/cli/__init__.py index 7d4b9a178..cd4676727 100644 --- a/cli/__init__.py +++ b/cli/__init__.py @@ -4,6 +4,7 @@ from cli.api import api from cli.init_db import init_db from cli.reorg import reorg +from cli.schedule import schedule from cli.stream import stream from indexer.utils.logging_utils import logging_basic_config @@ -28,3 +29,4 @@ def cli(ctx): cli.add_command(aggregates, "aggregates") cli.add_command(reorg, "reorg") cli.add_command(init_db, "init_db") +cli.add_command(schedule, "schedule") diff --git a/cli/aggregates.py b/cli/aggregates.py index c2bdb347d..403dcc8bc 100644 --- a/cli/aggregates.py +++ b/cli/aggregates.py @@ -1,12 +1,31 @@ import click from common.services.postgresql_service import PostgreSQLService +from indexer.aggr_jobs.job_list_generator import JobListGenerator from indexer.aggr_jobs.utils import DateType, check_data_completeness, get_yesterday_date from indexer.controller.aggregates_controller import AggregatesController from indexer.controller.dispatcher.aggregates_dispatcher import AggregatesDispatcher @click.command(context_settings=dict(help_option_names=["-h", "--help"])) +@click.option( + "-cn", + "--chain-name", + default=None, + show_default=True, + type=str, + help="The chain name of the chain to aggregate data for", + envvar="CHAIN_NAME", +) +@click.option( + "-jn", + "--job-name", + default=None, + show_default=True, + type=str, + help="Job list to aggregate data for", + envvar="JOB_NAME", +) @click.option( "-pg", "--postgres-url", @@ -47,13 +66,22 @@ @click.option( "-D", "--date-batch-size", - default=30, + default=5, show_default=True, type=int, envvar="DATE_BATCH_SIZE", help="How many DATEs to batch in single sync round", ) -def aggregates(postgres_url, provider_uri, start_date, end_date, date_batch_size): +@click.option( + "-du", + "--dblink-url", + default=None, + show_default=True, + type=str, + envvar="DBLINK_URL", + help="dblink to take token price, maybe moved to other replace later", +) +def aggregates(chain_name, job_name, postgres_url, provider_uri, start_date, end_date, date_batch_size, dblink_url): if not start_date and not end_date: start_date, end_date = get_yesterday_date() elif not end_date: @@ -61,10 +89,12 @@ def aggregates(postgres_url, provider_uri, start_date, end_date, date_batch_size db_service = PostgreSQLService(postgres_url) - check_data_completeness(db_service, provider_uri, end_date) + # check_data_completeness(db_service, provider_uri, end_date) + + config = {"db_service": db_service, "chain_name": chain_name, "dblink_url": dblink_url} + job_list = JobListGenerator(job_name) - config = {"db_service": db_service} - dispatcher = AggregatesDispatcher(config) + dispatcher = AggregatesDispatcher(config, job_list) controller = AggregatesController(job_dispatcher=dispatcher) controller.action(start_date, end_date, date_batch_size) diff --git a/cli/schedule.py b/cli/schedule.py new file mode 100644 index 000000000..21df48811 --- /dev/null +++ b/cli/schedule.py @@ -0,0 +1,83 @@ +import os +import sys +from datetime import datetime + +import click +from apscheduler.schedulers.blocking import BlockingScheduler + +from indexer.schedule_jobs.aggregates_jobs import aggregates_yesterday_job, parse_crontab + + +@click.command(context_settings=dict(help_option_names=["-h", "--help"])) +@click.option( + "-cn", + "--chain-name", + default=None, + show_default=True, + type=str, + help="The chain name of the chain to aggregate data for", + envvar="CHAIN_NAME", +) +@click.option( + "-jn", + "--job-name", + default=None, + show_default=True, + type=str, + help="Job list to aggregate data for", + envvar="JOB_NAME", +) +@click.option( + "-pg", + "--postgres-url", + type=str, + required=True, + envvar="POSTGRES_URL", + help="The required postgres connection url." "e.g. postgresql+psycopg2://postgres:admin@127.0.0.1:5432/ethereum", +) +@click.option( + "-du", + "--dblink-url", + default=None, + show_default=True, + type=str, + envvar="DBLINK_URL", + help="dblink to take token price, maybe moved to other replace later", +) +@click.option( + "-st", + "--schedule-time", + default="0 1 * * *", + show_default=True, + type=str, + envvar="SCHEDULE_TIME", + help="schedule time by crontab expression: default: 0 1 * * *", +) +def schedule(schedule_time, job_name, chain_name, postgres_url, dblink_url) -> None: + sys.stdout = os.fdopen(sys.stdout.fileno(), "w", buffering=1) # Line-buffered stdout + sys.stderr = os.fdopen(sys.stderr.fileno(), "w", buffering=1) # Line-buffered stderr + + parsed_crontab = parse_crontab(schedule_time) + minute = parsed_crontab["minute"] + hour = parsed_crontab["hour"] + + day = parsed_crontab["day"] + month = parsed_crontab["month"] + day_of_week = parsed_crontab["day_of_week"] + + scheduler = BlockingScheduler() + job_args = (chain_name, job_name, postgres_url, dblink_url) + scheduler.add_job( + aggregates_yesterday_job, + "cron", + hour=hour, + minute=minute, + day=day, + month=month, + day_of_week=day_of_week, + args=job_args, + ) + + scheduler.start() + # current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # print(f'Job started {current_time}, schedule time is {hour}:{minute} daily') diff --git a/indexer/aggr_jobs/aggr_base_job.py b/indexer/aggr_jobs/aggr_base_job.py index 6efde17a4..61f5fe0a2 100644 --- a/indexer/aggr_jobs/aggr_base_job.py +++ b/indexer/aggr_jobs/aggr_base_job.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone class AggrBaseJob: @@ -16,9 +16,31 @@ def get_sql_content(self, file_name, start_date, end_date): with open(file_path, "r") as f: sql_template = f.read() - sql = sql_template.format(start_date=start_date, end_date=end_date) + if file_name == "explorer_5_update_schedule_metadata.sql": + now = datetime.now() + tomorrow = now + timedelta(days=0) + tomorrow_midnight = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0) + + sql = sql_template.format_map( + { + "dag_id": "update_wallet_address_stats", + "execution_date": now.strftime("%Y-%m-%d %H:%M:%S"), + "last_data_timestamp": tomorrow_midnight.strftime("%Y-%m-%d %H:%M:%S"), + } + ) + else: + sql = sql_template.format( + start_date=start_date, end_date=end_date, start_date_previous=self.get_previous(start_date) + ) return sql + @staticmethod + def get_previous(date_str): + date_obj = datetime.strptime(date_str, "%Y-%m-%d") + previous_day = date_obj - timedelta(days=1) + previous_day_str = previous_day.strftime("%Y-%m-%d") + return previous_day_str + @staticmethod def generate_date_pairs(start_date, end_date): start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") diff --git a/indexer/aggr_jobs/aggr_job_scheduler.py b/indexer/aggr_jobs/aggr_job_scheduler.py index 54b490d8c..d3c782aa7 100644 --- a/indexer/aggr_jobs/aggr_job_scheduler.py +++ b/indexer/aggr_jobs/aggr_job_scheduler.py @@ -4,12 +4,14 @@ """ from indexer.aggr_jobs.disorder_jobs.disorder_job import AggrDisorderJob +from indexer.aggr_jobs.initialization_jobs.initialization_job import InitializationJob from indexer.aggr_jobs.order_jobs.order_job import AggrOrderJob class AggrJobScheduler: - def __init__(self, config): + def __init__(self, config, job_list): self.config = config + self.job_list = job_list self.jobs = self.instantiate_jobs() def run_jobs(self, start_date, end_date): @@ -18,9 +20,8 @@ def run_jobs(self, start_date, end_date): def instantiate_jobs(self): jobs = [] - for job_class in [AggrDisorderJob, AggrOrderJob]: - job = job_class( - config=self.config, - ) + # InitializationJob should be executed once only + for job_class in [InitializationJob, AggrDisorderJob, AggrOrderJob]: + job = job_class(config=self.config, job_list=self.job_list) jobs.append(job) return jobs diff --git a/indexer/aggr_jobs/disorder_jobs/daily_explore_aggregates.sql b/indexer/aggr_jobs/disorder_jobs/daily_explore_aggregates.sql new file mode 100644 index 000000000..cac1e56fe --- /dev/null +++ b/indexer/aggr_jobs/disorder_jobs/daily_explore_aggregates.sql @@ -0,0 +1,251 @@ + +begin ; + +delete from daily_transactions_aggregates where block_date <= '{start_date}'; +-- daily_transactions_aggregates +INSERT INTO daily_transactions_aggregates +SELECT + date_trunc('day', block_timestamp) AS block_date, + COUNT(*) AS cnt, + SUM(COUNT(*)) OVER (ORDER BY date_trunc('day', block_timestamp)) AS total_cnt, + SUM(CASE WHEN exist_error = true THEN 1 ELSE 0 END) AS txn_error_cnt, + AVG(receipt_effective_gas_price * receipt_gas_used) AS avg_transaction_fee, + AVG(gas_price) AS avg_gas_price, + MAX(gas_price) AS max_gas_price, + MIN(gas_price) AS min_gas_price, + AVG(receipt_l1_fee) AS avg_receipt_l1_fee, + MAX(receipt_l1_fee) AS max_receipt_l1_fee, + MIN(receipt_l1_fee) AS min_receipt_l1_fee, + AVG(receipt_l1_gas_used) AS avg_receipt_l1_gas_used, + MAX(receipt_l1_gas_used) AS max_receipt_l1_gas_used, + MIN(receipt_l1_gas_used) AS min_receipt_l1_gas_used, + AVG(receipt_l1_gas_price) AS avg_receipt_l1_gas_price, + MAX(receipt_l1_gas_price) AS max_receipt_l1_gas_price, + MIN(receipt_l1_gas_price) AS min_receipt_l1_gas_price, + AVG(receipt_l1_fee_scalar) AS avg_receipt_l1_fee_scalar, + MAX(receipt_l1_fee_scalar) AS max_receipt_l1_fee_scalar, + MIN(receipt_l1_fee_scalar) AS min_receipt_l1_fee_scalar +FROM + transactions + where date_trunc('day', block_timestamp) <= '{start_date}' +GROUP BY + date_trunc('day', block_timestamp) +ORDER BY + date_trunc('day', block_timestamp); +commit ; + +begin ; + +delete from daily_blocks_aggregates where block_date <= '{start_date}'; + +-- daily_blocks_aggregates +WITH block_data AS ( + SELECT + size, + gas_limit::numeric AS gas_limit, + gas_used::numeric AS gas_used, + transactions_count, + EXTRACT(EPOCH FROM (lead(timestamp) OVER (ORDER BY timestamp) - timestamp)) AS block_interval, + timestamp + FROM + blocks + where date_trunc('day', timestamp) <= '{start_date}' +), daily_aggregates AS ( + SELECT + date_trunc('day', timestamp) AS block_date, + COUNT(*) AS cnt, + AVG(size) AS avg_size, + AVG(gas_limit) AS avg_gas_limit, + AVG(gas_used) AS avg_gas_used, + SUM(gas_used::bigint) AS total_gas_used, + AVG((gas_used / gas_limit) * 100) AS avg_gas_used_percentage, + AVG(transactions_count::numeric) AS avg_txn_cnt, + SUM(COUNT(*)) OVER (ORDER BY date_trunc('day', timestamp)) AS total_cnt, + AVG(block_interval) AS block_interval + FROM + block_data + GROUP BY + block_date +) +INSERT INTO daily_blocks_aggregates ( + block_date, + cnt, + avg_size, + avg_gas_limit, + avg_gas_used, + total_gas_used, + avg_gas_used_percentage, + avg_txn_cnt, + total_cnt, + block_interval +) +select * from daily_aggregates +ORDER BY block_date; +commit; + + +begin; +delete from daily_addresses_aggregates where block_date <= '{start_date}'; + +-- daily-address +INSERT INTO daily_addresses_aggregates +WITH daily_address_stats AS ( + SELECT + date_trunc('day', block_timestamp) AS block_date, + from_address, + to_address, + CASE WHEN ROW_NUMBER() OVER (PARTITION BY from_address ORDER BY block_timestamp) = 1 THEN 1 ELSE 0 END AS is_new_sender, + CASE WHEN ROW_NUMBER() OVER (PARTITION BY to_address ORDER BY block_timestamp) = 1 THEN 1 ELSE 0 END AS is_new_receiver + FROM + transactions + where date_trunc('day', block_timestamp) <= '{start_date}' +), +address_counts AS ( + SELECT + block_date, + COUNT(DISTINCT from_address) AS sender_count, + COUNT(DISTINCT to_address) AS receiver_count, + COUNT(DISTINCT COALESCE(from_address, to_address)) AS active_count, + SUM(is_new_sender) + SUM(is_new_receiver) AS new_address_count + FROM + daily_address_stats + GROUP BY + block_date +), +cumulative_counts AS ( + SELECT + block_date, + SUM(new_address_count) OVER (ORDER BY block_date) AS total_address_count + FROM + address_counts +) +SELECT + ac.block_date, + ac.active_count AS active_address_cnt, + ac.receiver_count AS receiver_address_cnt, + ac.sender_count AS sender_address_cnt, + cc.total_address_count AS total_address_cnt, + ac.new_address_count AS new_address_cnt +FROM + address_counts ac +JOIN + cumulative_counts cc ON ac.block_date = cc.block_date +ORDER BY + ac.block_date; +commit ; + +begin ; +delete from daily_tokens_aggregates where block_date <= '{start_date}'; +INSERT INTO daily_tokens_aggregates +WITH erc20_stats AS ( + SELECT + date_trunc('day', block_timestamp) AS block_date, + COUNT(*) AS transfer_count, + COUNT(DISTINCT from_address) + COUNT(DISTINCT to_address) AS active_address_count + FROM + erc20_token_transfers + where date_trunc('day', block_timestamp) <= '{start_date}' + GROUP BY + date_trunc('day', block_timestamp) +), +erc721_stats AS ( + SELECT + date_trunc('day', block_timestamp) AS block_date, + COUNT(*) AS transfer_count, + COUNT(DISTINCT from_address) + COUNT(DISTINCT to_address) AS active_address_count + FROM + erc721_token_transfers + where date_trunc('day', block_timestamp) <= '{start_date}' + + GROUP BY + date_trunc('day', block_timestamp) +), +erc1155_stats AS ( + SELECT + date_trunc('day', block_timestamp) AS block_date, + COUNT(*) AS transfer_count, + COUNT(DISTINCT from_address) + COUNT(DISTINCT to_address) AS active_address_count + FROM + erc1155_token_transfers + where date_trunc('day', block_timestamp) <= '{start_date}' + + GROUP BY + date_trunc('day', block_timestamp) +), +all_dates AS ( + SELECT DISTINCT block_date FROM ( + SELECT block_date FROM erc20_stats + UNION + SELECT block_date FROM erc721_stats + UNION + SELECT block_date FROM erc1155_stats + ) AS all_dates +), +cumulative_counts AS ( + SELECT + ad.block_date, + SUM(COALESCE(e20.transfer_count, 0)) OVER (ORDER BY ad.block_date) AS erc20_total_transfer_cnt, + SUM(COALESCE(e721.transfer_count, 0)) OVER (ORDER BY ad.block_date) AS erc721_total_transfer_cnt, + SUM(COALESCE(e1155.transfer_count, 0)) OVER (ORDER BY ad.block_date) AS erc1155_total_transfer_cnt + FROM all_dates ad + LEFT JOIN erc20_stats e20 ON ad.block_date = e20.block_date + LEFT JOIN erc721_stats e721 ON ad.block_date = e721.block_date + LEFT JOIN erc1155_stats e1155 ON ad.block_date = e1155.block_date +) +SELECT + cc.block_date, + COALESCE(e20.active_address_count, 0) AS erc20_active_address_cnt, + cc.erc20_total_transfer_cnt, + COALESCE(e721.active_address_count, 0) AS erc721_active_address_cnt, + cc.erc721_total_transfer_cnt, + COALESCE(e1155.active_address_count, 0) AS erc1155_active_address_cnt, + cc.erc1155_total_transfer_cnt +FROM + cumulative_counts cc +LEFT JOIN + erc20_stats e20 ON cc.block_date = e20.block_date +LEFT JOIN + erc721_stats e721 ON cc.block_date = e721.block_date +LEFT JOIN + erc1155_stats e1155 ON cc.block_date = e1155.block_date +ORDER BY + cc.block_date; +commit ; + + +WITH token_transfers AS ( + SELECT + to_address AS token_address, + from_address AS address + FROM + transactions + WHERE + to_address IN (SELECT address FROM tokens) + UNION + SELECT + to_address AS token_address, + to_address AS address + FROM + transactions + WHERE + to_address IN (SELECT address FROM tokens) +), +token_stats AS ( + SELECT + token_address, + COUNT(DISTINCT address) AS holder_count, + COUNT(*) AS transfer_count + FROM + token_transfers + GROUP BY + token_address +) +UPDATE tokens +SET + holder_count = ts.holder_count, + transfer_count = ts.transfer_count, + update_time = CURRENT_TIMESTAMP +FROM + token_stats ts +WHERE + tokens.address = ts.token_address; \ No newline at end of file diff --git a/indexer/aggr_jobs/disorder_jobs/disorder_job.py b/indexer/aggr_jobs/disorder_jobs/disorder_job.py index a168dc5fd..dcde1ca87 100644 --- a/indexer/aggr_jobs/disorder_jobs/disorder_job.py +++ b/indexer/aggr_jobs/disorder_jobs/disorder_job.py @@ -9,8 +9,10 @@ class AggrDisorderJob(AggrBaseJob): def __init__(self, **kwargs): config = kwargs["config"] + job_list = kwargs["job_list"] + self.job_list = job_list.get_disordered_jobs() self.db_service = config["db_service"] - self._batch_work_executor = BatchWorkExecutor(5, 5) + self._batch_work_executor = BatchWorkExecutor(10, 10) def run(self, **kwargs): start_date = kwargs["start_date"] @@ -20,11 +22,14 @@ def run(self, **kwargs): date_pairs = self.generate_date_pairs(start_date, end_date) for date_pair in date_pairs: start_date, end_date = date_pair - sql_content = self.get_sql_content("daily_wallet_addresses_aggregates", start_date, end_date) - execute_sql_list.append(sql_content) + # Could be replaced to auto and selected + for sql_name in self.job_list: + sql_content = self.get_sql_content(sql_name, start_date, end_date) + execute_sql_list.append(sql_content) self._batch_work_executor.execute(execute_sql_list, self.execute_sql, total_items=len(execute_sql_list)) self._batch_work_executor.wait() + print(f"finish disorder job {start_date}, total {len(self.job_list)}") def execute_sql(self, sql_contents): session = self.db_service.Session() diff --git a/indexer/aggr_jobs/disorder_jobs/test.sql b/indexer/aggr_jobs/disorder_jobs/test.sql new file mode 100644 index 000000000..9e13a3eff --- /dev/null +++ b/indexer/aggr_jobs/disorder_jobs/test.sql @@ -0,0 +1 @@ +select 1; \ No newline at end of file diff --git a/indexer/aggr_jobs/initialization_jobs/__init__.py b/indexer/aggr_jobs/initialization_jobs/__init__.py new file mode 100644 index 000000000..68a965939 --- /dev/null +++ b/indexer/aggr_jobs/initialization_jobs/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time 2024/10/22 16:27 +# @Author will +# @File __init__.py +# @Brief diff --git a/indexer/aggr_jobs/initialization_jobs/initialization_job.py b/indexer/aggr_jobs/initialization_jobs/initialization_job.py new file mode 100644 index 000000000..b95057fa1 --- /dev/null +++ b/indexer/aggr_jobs/initialization_jobs/initialization_job.py @@ -0,0 +1,68 @@ +import time + +from sqlalchemy import text + + +class InitializationJob: + def __init__(self, **kwargs): + config = kwargs["config"] + job_list = kwargs["job_list"] + self.job_list = job_list.get_initialization_jobs() + + self.db_service = config["db_service"] + self.dblink_url = config["dblink_url"] + + def init_token_price(self): + token_price_sql_template = """ + CREATE EXTENSION if not exists dblink; + + DELETE FROM token_price WHERE timestamp >= :start_date; + + INSERT INTO token_price + SELECT * FROM dblink(:dblink_url, + 'SELECT * FROM w3w_commons.token_hourly_prices WHERE timestamp >= ''{start_date}'' ') + AS t(symbol varchar, timestamp timestamp, price numeric); + """ + + sql = token_price_sql_template.format(start_date=self.start_date) + + session = self.db_service.Session() + + start_time = time.time() + + session.execute(text(sql), {"start_date": self.start_date, "dblink_url": self.dblink_url}) + + session.commit() + execution_time = time.time() - start_time + print(f"----------- executed in {execution_time:.2f} seconds: init token price") + + session.close() + + def init_period_address_token_balance(self): + session = self.db_service.Session() + + sql_template = """ + delete from period_address_token_balances where period_date >= :start_date; + """ + start_time = time.time() + session.execute(text(sql_template), {"start_date": self.start_date}) + + session.commit() + execution_time = time.time() - start_time + print(f"----------- executed in {execution_time:.2f} seconds: init period_address_token_balances") + + session.close() + + def run(self, **kwargs): + self.start_date = kwargs["start_date"] + self.end_date = kwargs["end_date"] + + # self.init_token_price() + # self.init_period_address_token_balance() + + for function_name in self.job_list: + func = getattr(self, function_name, None) + if callable(func): + func() + else: + print(f"Function {function_name} does not exist") diff --git a/indexer/aggr_jobs/job_list_generator.py b/indexer/aggr_jobs/job_list_generator.py new file mode 100644 index 000000000..a54386589 --- /dev/null +++ b/indexer/aggr_jobs/job_list_generator.py @@ -0,0 +1,58 @@ +class JobListGenerator(object): + def __init__(self, job_name): + self.job_name = job_name + + def get_initialization_jobs(self): + job_list = [] + if self.job_name == "FBTC": + job_list = ["init_token_price", "init_period_address_token_balance"] + return job_list + + def get_disordered_jobs(self): + job_list = [] + + if self.job_name == "FBTC": + job_list = [ + "daily_feature_holding_balance_staked_fbtc_detail.sql", + "daily_feature_holding_balance_uniswap_v3.sql", + "daily_address_token_balances", + "daily_feature_erc20_token_supply_records.sql", + # 'daily_feature_erc1155_token_holdings.sql', + # 'daily_feature_erc1155_token_supply_records.sql' + ] + elif self.job_name == "EXPLORE": + job_list = [ + "test.sql", + "daily_explore_aggregates.sql", + ] + + return job_list + + def get_order_jobs(self): + job_list = [] + + if self.job_name == "FBTC": + job_list = [ + "period_address_token_balances", + "period_feature_holding_balance_uniswap_v3.sql", + "period_feature_staked_fbtc_detail_records.sql", + "period_feature_holding_balance_staked_fbtc_detail.sql", + # 'period_feature_erc1155_token_holdings.sql', + "period_feature_erc1155_token_supply_records.sql", + "period_feature_holding_balance_merchantmoe.sql", + "period_feature_erc20_token_supply_records.sql", + "period_feature_holding_balance_dodo.sql", + "period_feature_holding_balance_lendle.sql", + "period_feature_defi_wallet_fbtc_aggregates.py", + ] + elif self.job_name == "EXPLORE": + job_list = [ + "test.sql", + "explorer_1_update_address_txn_stats.sql", + "explorer_2_update_address_token_transfer_stats.sql", + "explorer_3_addresses.sql", + "explorer_4_agg_address_stats.sql", + "explorer_5_update_schedule_metadata.sql", + ] + + return job_list diff --git a/indexer/aggr_jobs/order_jobs/explorer_1_update_address_txn_stats.sql b/indexer/aggr_jobs/order_jobs/explorer_1_update_address_txn_stats.sql new file mode 100644 index 000000000..bfc57c4ae --- /dev/null +++ b/indexer/aggr_jobs/order_jobs/explorer_1_update_address_txn_stats.sql @@ -0,0 +1,87 @@ +-- Handle outgoing transactions including errors +WITH out_txn AS ( + SELECT + from_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(DISTINCT hash) AS txn_out_cnt, + SUM(value) AS txn_out_value, + SUM(CASE WHEN receipt_status = 0 THEN 1 ELSE 0 END) AS txn_out_error_cnt + FROM transactions + WHERE block_timestamp >= '{start_date}' AND block_timestamp < '{start_date}'::DATE + INTERVAL '1 DAY' + and from_address is not null + GROUP BY from_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, txn_out_cnt, txn_out_value, txn_out_error_cnt) +SELECT + address, + block_date, + txn_out_cnt, + txn_out_value, + txn_out_error_cnt +FROM out_txn + +ON CONFLICT (address, block_date) + DO UPDATE SET + txn_out_cnt = EXCLUDED.txn_out_cnt, + txn_out_value = EXCLUDED.txn_out_value, + txn_out_error_cnt = EXCLUDED.txn_out_error_cnt; + +-- Handle incoming transactions including errors +WITH in_txn AS ( + SELECT + to_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(DISTINCT hash) AS txn_in_cnt, + SUM(value) AS txn_in_value, + SUM(CASE WHEN receipt_status = 0 THEN 1 ELSE 0 END) AS txn_in_error_cnt + FROM transactions + WHERE block_timestamp >= '{start_date}' AND block_timestamp < '{start_date}'::DATE + INTERVAL '1 DAY' + and to_address is not null + GROUP BY to_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, txn_in_cnt, txn_in_value, txn_in_error_cnt) +SELECT + address, + block_date, + txn_in_cnt, + txn_in_value, + txn_in_error_cnt +FROM in_txn + +ON CONFLICT (address, block_date) + DO UPDATE SET + txn_in_cnt = EXCLUDED.txn_in_cnt, + txn_in_value = EXCLUDED.txn_in_value, + txn_in_error_cnt = EXCLUDED.txn_in_error_cnt; + + +-- Handle self transactions including errors +WITH self_txn AS ( + SELECT + from_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(DISTINCT hash) AS txn_self_cnt, + SUM(CASE WHEN receipt_status = 0 THEN 1 ELSE 0 END) AS txn_self_error_cnt + FROM transactions + WHERE block_timestamp >= '{start_date}' AND block_timestamp < '{start_date}'::DATE + INTERVAL '1 DAY' + and from_address = to_address and from_address is not null + GROUP BY from_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, txn_self_cnt, txn_self_error_cnt) +SELECT + address, + block_date, + txn_self_cnt, + txn_self_error_cnt +FROM self_txn + +ON CONFLICT (address, block_date) + DO UPDATE SET + txn_self_cnt = EXCLUDED.txn_self_cnt, + txn_self_error_cnt = EXCLUDED.txn_self_error_cnt; \ No newline at end of file diff --git a/indexer/aggr_jobs/order_jobs/explorer_2_update_address_token_transfer_stats.sql b/indexer/aggr_jobs/order_jobs/explorer_2_update_address_token_transfer_stats.sql new file mode 100644 index 000000000..952adadf8 --- /dev/null +++ b/indexer/aggr_jobs/order_jobs/explorer_2_update_address_token_transfer_stats.sql @@ -0,0 +1,134 @@ +WITH erc20_in AS ( + SELECT + to_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(1) AS cnt + FROM erc20_token_transfers + WHERE DATE(block_timestamp) = DATE '{start_date}' + GROUP BY to_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, erc20_transfer_in_cnt) +SELECT + address, + block_date, + cnt +FROM erc20_in + +ON CONFLICT (address, block_date) + DO UPDATE SET + erc20_transfer_in_cnt = EXCLUDED.erc20_transfer_in_cnt; + + +WITH erc20_out AS ( + SELECT + from_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(1) AS cnt + FROM erc20_token_transfers + WHERE DATE(block_timestamp) = DATE '{start_date}' + GROUP BY from_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, erc20_transfer_out_cnt) +SELECT + address, + block_date, + cnt +FROM erc20_out + +ON CONFLICT (address, block_date) + DO UPDATE SET + erc20_transfer_out_cnt = EXCLUDED.erc20_transfer_out_cnt; + + +WITH erc721_in AS ( + SELECT + to_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(1) AS cnt + FROM erc721_token_transfers + WHERE DATE(block_timestamp) = DATE '{start_date}' + GROUP BY to_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, erc721_transfer_in_cnt) +SELECT + address, + block_date, + cnt +FROM erc721_in + +ON CONFLICT (address, block_date) + DO UPDATE SET + erc721_transfer_in_cnt = EXCLUDED.erc721_transfer_in_cnt; + + +WITH erc721_out AS ( + SELECT + from_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(1) AS cnt + FROM erc721_token_transfers + WHERE DATE(block_timestamp) = DATE '{start_date}' + GROUP BY from_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, erc721_transfer_out_cnt) +SELECT + address, + block_date, + cnt +FROM erc721_out + +ON CONFLICT (address, block_date) + DO UPDATE SET + erc721_transfer_out_cnt = EXCLUDED.erc721_transfer_out_cnt; + +WITH erc1155_in AS ( + SELECT + to_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(1) AS cnt + FROM erc1155_token_transfers + WHERE DATE(block_timestamp) = DATE '{start_date}' + GROUP BY to_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, erc1155_transfer_in_cnt) +SELECT + address, + block_date, + cnt +FROM erc1155_in + +ON CONFLICT (address, block_date) + DO UPDATE SET + erc1155_transfer_in_cnt = EXCLUDED.erc1155_transfer_in_cnt; + +WITH erc1155_out AS ( + SELECT + from_address AS address, + DATE(block_timestamp) AS block_date, + COUNT(1) AS cnt + FROM erc1155_token_transfers + WHERE DATE(block_timestamp) = DATE '{start_date}' + GROUP BY from_address, DATE(block_timestamp) +) + +INSERT INTO daily_wallet_address_stats +(address, block_date, erc1155_transfer_out_cnt) +SELECT + address, + block_date, + cnt +FROM erc1155_out + +ON CONFLICT (address, block_date) + DO UPDATE SET + erc1155_transfer_out_cnt = EXCLUDED.erc1155_transfer_out_cnt; \ No newline at end of file diff --git a/indexer/aggr_jobs/order_jobs/explorer_3_addresses.sql b/indexer/aggr_jobs/order_jobs/explorer_3_addresses.sql new file mode 100644 index 000000000..e02936b6d --- /dev/null +++ b/indexer/aggr_jobs/order_jobs/explorer_3_addresses.sql @@ -0,0 +1,61 @@ +BEGIN; + +INSERT INTO daily_addresses_aggregates (block_date, active_address_cnt, receiver_address_cnt, sender_address_cnt) +SELECT + DATE('{start_date}') AS block_date, + COUNT(DISTINCT CASE WHEN txn_cnt > 0 THEN address END) AS active_address_cnt, + COUNT(DISTINCT CASE WHEN txn_in_cnt > 0 THEN address END) AS receiver_address_cnt, + COUNT(DISTINCT CASE WHEN txn_out_cnt > 0 THEN address END) AS sender_address_cnt +FROM + daily_wallet_address_stats +WHERE + block_date = '{start_date}' +GROUP BY + block_date +ON CONFLICT (block_date) DO UPDATE + SET + active_address_cnt = EXCLUDED.active_address_cnt, + receiver_address_cnt = EXCLUDED.receiver_address_cnt, + sender_address_cnt = EXCLUDED.sender_address_cnt; + +WITH FirstAppearance AS ( + SELECT + address, + MIN(block_date) AS first_seen_date + FROM + daily_wallet_address_stats + WHERE + block_date <= '{start_date}' + GROUP BY + address +), + NewAddresses AS ( + SELECT + date('{start_date}') AS block_date, + COUNT(*) AS new_address_cnt + FROM + FirstAppearance + WHERE + first_seen_date = '{start_date}' + ) +UPDATE daily_addresses_aggregates +SET new_address_cnt = na.new_address_cnt +FROM NewAddresses na +WHERE daily_addresses_aggregates.block_date = '{start_date}' and na.block_date = '{start_date}'; + +UPDATE daily_addresses_aggregates +SET total_address_cnt = sub.cumulative_addresses +FROM ( + SELECT + block_date, + SUM(new_address_cnt) OVER (ORDER BY block_date) AS cumulative_addresses + FROM + daily_addresses_aggregates + WHERE + block_date <= '{start_date}' + ) AS sub +WHERE daily_addresses_aggregates.block_date = '{start_date}' and sub.block_date = '{start_date}'; + +SELECT * FROM daily_addresses_aggregates WHERE block_date = '{start_date}'; + +COMMIT; \ No newline at end of file diff --git a/indexer/aggr_jobs/order_jobs/explorer_4_agg_address_stats.sql b/indexer/aggr_jobs/order_jobs/explorer_4_agg_address_stats.sql new file mode 100644 index 000000000..c1543ac1e --- /dev/null +++ b/indexer/aggr_jobs/order_jobs/explorer_4_agg_address_stats.sql @@ -0,0 +1,116 @@ +WITH aggregated_stats AS (SELECT address, + SUM(txn_in_cnt) AS total_txn_in_cnt, + SUM(txn_out_cnt) AS total_txn_out_cnt, + SUM(txn_in_value) AS total_txn_in_value, + SUM(txn_out_value) AS total_txn_out_value, + SUM(internal_txn_in_cnt) AS total_internal_txn_in_cnt, + SUM(internal_txn_out_cnt) AS total_internal_txn_out_cnt, + SUM(internal_txn_in_value) AS total_internal_txn_in_value, + SUM(internal_txn_out_value) AS total_internal_txn_out_value, + SUM(erc20_transfer_in_cnt) AS total_erc20_transfer_in_cnt, + SUM(erc20_transfer_out_cnt) AS total_erc20_transfer_out_cnt, + SUM(erc721_transfer_in_cnt) AS total_erc721_transfer_in_cnt, + SUM(erc721_transfer_out_cnt) AS total_erc721_transfer_out_cnt, + SUM(erc1155_transfer_in_cnt) AS total_erc1155_transfer_in_cnt, + SUM(erc1155_transfer_out_cnt) AS total_erc1155_transfer_out_cnt, + SUM(txn_in_error_cnt) AS total_txn_in_error_cnt, + SUM(txn_out_error_cnt) AS total_txn_out_error_cnt, + SUM(txn_self_cnt) AS total_txn_self_cnt, + SUM(txn_self_error_cnt) AS total_txn_self_error_cnt, + SUM(gas_in_used) AS total_gas_in_used, + SUM(l2_txn_in_fee) AS total_l2_txn_in_fee, + SUM(l1_txn_in_fee) AS total_l1_txn_in_fee, + SUM(txn_in_fee) AS total_txn_in_fee, + SUM(gas_out_used) AS total_gas_out_used, + SUM(l2_txn_out_fee) AS total_l2_txn_out_fee, + SUM(l1_txn_out_fee) AS total_l1_txn_out_fee, + SUM(txn_out_fee) AS total_txn_out_fee + FROM daily_wallet_address_stats + GROUP BY address) + +INSERT +INTO wallet_addresses +(address, + txn_in_cnt, + txn_out_cnt, + txn_in_value, + txn_out_value, + internal_txn_in_cnt, + internal_txn_out_cnt, + internal_txn_in_value, + internal_txn_out_value, + erc20_transfer_in_cnt, + erc20_transfer_out_cnt, + erc721_transfer_in_cnt, + erc721_transfer_out_cnt, + erc1155_transfer_in_cnt, + erc1155_transfer_out_cnt, + txn_in_error_cnt, + txn_out_error_cnt, + txn_self_cnt, + txn_self_error_cnt, + gas_in_used, + l2_txn_in_fee, + l1_txn_in_fee, + txn_in_fee, + gas_out_used, + l2_txn_out_fee, + l1_txn_out_fee, + txn_out_fee) +SELECT address, + total_txn_in_cnt, + total_txn_out_cnt, + total_txn_in_value, + total_txn_out_value, + total_internal_txn_in_cnt, + total_internal_txn_out_cnt, + total_internal_txn_in_value, + total_internal_txn_out_value, + total_erc20_transfer_in_cnt, + total_erc20_transfer_out_cnt, + total_erc721_transfer_in_cnt, + total_erc721_transfer_out_cnt, + total_erc1155_transfer_in_cnt, + total_erc1155_transfer_out_cnt, + total_txn_in_error_cnt, + total_txn_out_error_cnt, + total_txn_self_cnt, + total_txn_self_error_cnt, + total_gas_in_used, + total_l2_txn_in_fee, + total_l1_txn_in_fee, + total_txn_in_fee, + total_gas_out_used, + total_l2_txn_out_fee, + total_l1_txn_out_fee, + total_txn_out_fee + +FROM aggregated_stats + +ON CONFLICT (address) + DO UPDATE SET txn_in_cnt = EXCLUDED.txn_in_cnt, + txn_out_cnt = EXCLUDED.txn_out_cnt, + txn_in_value = EXCLUDED.txn_in_value, + txn_out_value = EXCLUDED.txn_out_value, + internal_txn_in_cnt = EXCLUDED.internal_txn_in_cnt, + internal_txn_out_cnt = EXCLUDED.internal_txn_out_cnt, + internal_txn_in_value = EXCLUDED.internal_txn_in_value, + internal_txn_out_value = EXCLUDED.internal_txn_out_value, + erc20_transfer_in_cnt = EXCLUDED.erc20_transfer_in_cnt, + erc20_transfer_out_cnt = EXCLUDED.erc20_transfer_out_cnt, + erc721_transfer_in_cnt = EXCLUDED.erc721_transfer_in_cnt, + erc721_transfer_out_cnt = EXCLUDED.erc721_transfer_out_cnt, + erc1155_transfer_in_cnt = EXCLUDED.erc1155_transfer_in_cnt, + erc1155_transfer_out_cnt = EXCLUDED.erc1155_transfer_out_cnt, + txn_in_error_cnt = EXCLUDED.txn_in_error_cnt, + txn_out_error_cnt = EXCLUDED.txn_out_error_cnt, + txn_self_cnt = EXCLUDED.txn_self_cnt, + txn_self_error_cnt = EXCLUDED.txn_self_error_cnt, + gas_in_used = EXCLUDED.gas_in_used, + l2_txn_in_fee = EXCLUDED.l2_txn_in_fee, + l1_txn_in_fee = EXCLUDED.l1_txn_in_fee, + txn_in_fee = EXCLUDED.txn_in_fee, + gas_out_used = EXCLUDED.gas_out_used, + l2_txn_out_fee = EXCLUDED.l2_txn_out_fee, + l1_txn_out_fee = EXCLUDED.l1_txn_out_fee, + txn_out_fee = EXCLUDED.txn_out_fee; \ No newline at end of file diff --git a/indexer/aggr_jobs/order_jobs/explorer_5_update_schedule_metadata.sql b/indexer/aggr_jobs/order_jobs/explorer_5_update_schedule_metadata.sql new file mode 100644 index 000000000..9dbb46209 --- /dev/null +++ b/indexer/aggr_jobs/order_jobs/explorer_5_update_schedule_metadata.sql @@ -0,0 +1 @@ +INSERT INTO scheduled_wallet_count_metadata (dag_id, execution_date, last_data_timestamp) VALUES ('{dag_id}', '{execution_date}', '{last_data_timestamp}'); \ No newline at end of file diff --git a/indexer/aggr_jobs/order_jobs/order_job.py b/indexer/aggr_jobs/order_jobs/order_job.py index ea9449346..356a2b017 100644 --- a/indexer/aggr_jobs/order_jobs/order_job.py +++ b/indexer/aggr_jobs/order_jobs/order_job.py @@ -1,24 +1,54 @@ +import time + from sqlalchemy import text from indexer.aggr_jobs.aggr_base_job import AggrBaseJob +# from indexer.aggr_jobs.order_jobs.py_jobs.period_feature_defi_wallet_fbtc_aggregates import ( +# PeriodFeatureDefiWalletFbtcAggregates, +# ) + class AggrOrderJob(AggrBaseJob): sql_folder = "order_jobs" def __init__(self, **kwargs): config = kwargs["config"] + job_list = kwargs["job_list"] + self.job_list = job_list.get_order_jobs() self.db_service = config["db_service"] + self.chain_name = config["chain_name"] + + def generator_py_jobs(self, name, start_date, end_date): + if name == "period_feature_defi_wallet_fbtc_aggregates.py": + # period_feature_defi_wallet_fbtc_aggregates_job = PeriodFeatureDefiWalletFbtcAggregates( + # self.chain_name, self.db_service, start_date + # ) + # period_feature_defi_wallet_fbtc_aggregates_job.run() + pass def run(self, **kwargs): - start_date = kwargs["start_date"] - end_date = kwargs["end_date"] + start_date_limit = kwargs["start_date"] + end_date_limit = kwargs["end_date"] session = self.db_service.Session() - date_pairs = self.generate_date_pairs(start_date, end_date) + date_pairs = self.generate_date_pairs(start_date_limit, end_date_limit) for date_pair in date_pairs: start_date, end_date = date_pair - sql_content = self.get_sql_content("period_wallet_addresses_aggregates", start_date, end_date) - session.execute(text(sql_content)) - session.commit() + + for job_name in self.job_list: + start_time = time.time() + if job_name.endswith(".py"): + self.generator_py_jobs(job_name, start_date, end_date) + else: + sql_content = self.get_sql_content(job_name, start_date, end_date) + session.execute(text(sql_content)) + session.commit() + execution_time = time.time() - start_time + print(f"----------- executed in {execution_time:.2f} seconds: JOB {job_name}") + + print("======== finished date", start_date) + + session.close() + print(f"finish order job {start_date_limit}") diff --git a/indexer/aggr_jobs/order_jobs/test.sql b/indexer/aggr_jobs/order_jobs/test.sql new file mode 100644 index 000000000..9e13a3eff --- /dev/null +++ b/indexer/aggr_jobs/order_jobs/test.sql @@ -0,0 +1 @@ +select 1; \ No newline at end of file diff --git a/indexer/aggr_jobs/utils.py b/indexer/aggr_jobs/utils.py index 8a6302909..f6bcdf5ea 100644 --- a/indexer/aggr_jobs/utils.py +++ b/indexer/aggr_jobs/utils.py @@ -14,7 +14,7 @@ def get_yesterday_date(): today_str = now.strftime("%Y-%m-%d") yesterday_str = yesterday_datetime.strftime("%Y-%m-%d") - return today_str, yesterday_str + return yesterday_str, today_str class DateType(click.ParamType): diff --git a/indexer/controller/aggregates_controller.py b/indexer/controller/aggregates_controller.py index 714c0b176..75be281b0 100644 --- a/indexer/controller/aggregates_controller.py +++ b/indexer/controller/aggregates_controller.py @@ -7,7 +7,11 @@ class AggregatesController(BaseController): def __init__(self, job_dispatcher): self.job_dispatcher = job_dispatcher - def action(self, start_date, end_date, date_batch_size): + def action(self, start_date, end_date, date_batch_size=None): + # no batch size + # self.job_dispatcher.run(start_date, end_date) + + # batch size date_batches = self.split_date_range(start_date, end_date, date_batch_size) for date_batch in date_batches: start_date, end_date = date_batch diff --git a/indexer/controller/dispatcher/aggregates_dispatcher.py b/indexer/controller/dispatcher/aggregates_dispatcher.py index 975605814..51b687681 100644 --- a/indexer/controller/dispatcher/aggregates_dispatcher.py +++ b/indexer/controller/dispatcher/aggregates_dispatcher.py @@ -3,9 +3,9 @@ class AggregatesDispatcher(BaseDispatcher): - def __init__(self, config): + def __init__(self, config, job_list): super().__init__() - self._job_scheduler = AggrJobScheduler(config=config) + self._job_scheduler = AggrJobScheduler(config=config, job_list=job_list) def run(self, start_date, end_date): self._job_scheduler.run_jobs(start_date=start_date, end_date=end_date) diff --git a/indexer/schedule_jobs/__init__.py b/indexer/schedule_jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/indexer/schedule_jobs/aggregates_jobs.py b/indexer/schedule_jobs/aggregates_jobs.py new file mode 100644 index 000000000..f31737b20 --- /dev/null +++ b/indexer/schedule_jobs/aggregates_jobs.py @@ -0,0 +1,32 @@ +from common.services.postgresql_service import PostgreSQLService +from indexer.aggr_jobs.job_list_generator import JobListGenerator +from indexer.aggr_jobs.utils import get_yesterday_date +from indexer.controller.aggregates_controller import AggregatesController +from indexer.controller.dispatcher.aggregates_dispatcher import AggregatesDispatcher + + +def parse_crontab(expression): + fields = expression.split() + + if len(fields) != 5: + raise ValueError("Invalid crontab expression, it must contain 5 fields.") + + minute, hour, day, month, day_of_week = fields + + parsed_fields = {"minute": minute, "hour": hour, "day": day, "month": month, "day_of_week": day_of_week} + + return parsed_fields + + +def aggregates_yesterday_job(chain_name, job_name, postgres_url, dblink_url): + print("---executing aggregates job---") + start_date, end_date = get_yesterday_date() + db_service = PostgreSQLService(postgres_url) + + config = {"db_service": db_service, "chain_name": chain_name, "dblink_url": dblink_url} + job_list = JobListGenerator(job_name) + + dispatcher = AggregatesDispatcher(config, job_list) + + controller = AggregatesController(job_dispatcher=dispatcher) + controller.action(start_date, end_date)