Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from cli.api import api
from cli.init_db import init_db
from cli.reorg import reorg
from cli.schedule import schedule
from cli.stream import stream
from indexer.utils.logging_utils import logging_basic_config

Expand All @@ -28,3 +29,4 @@ def cli(ctx):
cli.add_command(aggregates, "aggregates")
cli.add_command(reorg, "reorg")
cli.add_command(init_db, "init_db")
cli.add_command(schedule, "schedule")
40 changes: 35 additions & 5 deletions cli/aggregates.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
import click

from common.services.postgresql_service import PostgreSQLService
from indexer.aggr_jobs.job_list_generator import JobListGenerator
from indexer.aggr_jobs.utils import DateType, check_data_completeness, get_yesterday_date
from indexer.controller.aggregates_controller import AggregatesController
from indexer.controller.dispatcher.aggregates_dispatcher import AggregatesDispatcher


@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option(
"-cn",
"--chain-name",
default=None,
show_default=True,
type=str,
help="The chain name of the chain to aggregate data for",
envvar="CHAIN_NAME",
)
@click.option(
"-jn",
"--job-name",
default=None,
show_default=True,
type=str,
help="Job list to aggregate data for",
envvar="JOB_NAME",
)
@click.option(
"-pg",
"--postgres-url",
Expand Down Expand Up @@ -47,24 +66,35 @@
@click.option(
"-D",
"--date-batch-size",
default=30,
default=5,
show_default=True,
type=int,
envvar="DATE_BATCH_SIZE",
help="How many DATEs to batch in single sync round",
)
def aggregates(postgres_url, provider_uri, start_date, end_date, date_batch_size):
@click.option(
"-du",
"--dblink-url",
default=None,
show_default=True,
type=str,
envvar="DBLINK_URL",
help="dblink to take token price, maybe moved to other replace later",
)
def aggregates(chain_name, job_name, postgres_url, provider_uri, start_date, end_date, date_batch_size, dblink_url):
if not start_date and not end_date:
start_date, end_date = get_yesterday_date()
elif not end_date:
_, end_date = get_yesterday_date()

db_service = PostgreSQLService(postgres_url)

check_data_completeness(db_service, provider_uri, end_date)
# check_data_completeness(db_service, provider_uri, end_date)

config = {"db_service": db_service, "chain_name": chain_name, "dblink_url": dblink_url}
job_list = JobListGenerator(job_name)

config = {"db_service": db_service}
dispatcher = AggregatesDispatcher(config)
dispatcher = AggregatesDispatcher(config, job_list)

controller = AggregatesController(job_dispatcher=dispatcher)
controller.action(start_date, end_date, date_batch_size)
83 changes: 83 additions & 0 deletions cli/schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import sys
from datetime import datetime

import click
from apscheduler.schedulers.blocking import BlockingScheduler

from indexer.schedule_jobs.aggregates_jobs import aggregates_yesterday_job, parse_crontab


@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option(
"-cn",
"--chain-name",
default=None,
show_default=True,
type=str,
help="The chain name of the chain to aggregate data for",
envvar="CHAIN_NAME",
)
@click.option(
"-jn",
"--job-name",
default=None,
show_default=True,
type=str,
help="Job list to aggregate data for",
envvar="JOB_NAME",
)
@click.option(
"-pg",
"--postgres-url",
type=str,
required=True,
envvar="POSTGRES_URL",
help="The required postgres connection url." "e.g. postgresql+psycopg2://postgres:admin@127.0.0.1:5432/ethereum",
)
@click.option(
"-du",
"--dblink-url",
default=None,
show_default=True,
type=str,
envvar="DBLINK_URL",
help="dblink to take token price, maybe moved to other replace later",
)
@click.option(
"-st",
"--schedule-time",
default="0 1 * * *",
show_default=True,
type=str,
envvar="SCHEDULE_TIME",
help="schedule time by crontab expression: default: 0 1 * * *",
)
def schedule(schedule_time, job_name, chain_name, postgres_url, dblink_url) -> None:
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", buffering=1) # Line-buffered stdout
sys.stderr = os.fdopen(sys.stderr.fileno(), "w", buffering=1) # Line-buffered stderr

parsed_crontab = parse_crontab(schedule_time)
minute = parsed_crontab["minute"]
hour = parsed_crontab["hour"]

day = parsed_crontab["day"]
month = parsed_crontab["month"]
day_of_week = parsed_crontab["day_of_week"]

scheduler = BlockingScheduler()
job_args = (chain_name, job_name, postgres_url, dblink_url)
scheduler.add_job(
aggregates_yesterday_job,
"cron",
hour=hour,
minute=minute,
day=day,
month=month,
day_of_week=day_of_week,
args=job_args,
)

scheduler.start()
# current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# print(f'Job started {current_time}, schedule time is {hour}:{minute} daily')
26 changes: 24 additions & 2 deletions indexer/aggr_jobs/aggr_base_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone


class AggrBaseJob:
Expand All @@ -16,9 +16,31 @@ def get_sql_content(self, file_name, start_date, end_date):

with open(file_path, "r") as f:
sql_template = f.read()
sql = sql_template.format(start_date=start_date, end_date=end_date)
if file_name == "explorer_5_update_schedule_metadata.sql":
now = datetime.now()
tomorrow = now + timedelta(days=0)
tomorrow_midnight = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)

sql = sql_template.format_map(
{
"dag_id": "update_wallet_address_stats",
"execution_date": now.strftime("%Y-%m-%d %H:%M:%S"),
"last_data_timestamp": tomorrow_midnight.strftime("%Y-%m-%d %H:%M:%S"),
}
)
else:
sql = sql_template.format(
start_date=start_date, end_date=end_date, start_date_previous=self.get_previous(start_date)
)
return sql

@staticmethod
def get_previous(date_str):
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
previous_day = date_obj - timedelta(days=1)
previous_day_str = previous_day.strftime("%Y-%m-%d")
return previous_day_str

@staticmethod
def generate_date_pairs(start_date, end_date):
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
Expand Down
11 changes: 6 additions & 5 deletions indexer/aggr_jobs/aggr_job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
"""

from indexer.aggr_jobs.disorder_jobs.disorder_job import AggrDisorderJob
from indexer.aggr_jobs.initialization_jobs.initialization_job import InitializationJob
from indexer.aggr_jobs.order_jobs.order_job import AggrOrderJob


class AggrJobScheduler:
def __init__(self, config):
def __init__(self, config, job_list):
self.config = config
self.job_list = job_list
self.jobs = self.instantiate_jobs()

def run_jobs(self, start_date, end_date):
Expand All @@ -18,9 +20,8 @@ def run_jobs(self, start_date, end_date):

def instantiate_jobs(self):
jobs = []
for job_class in [AggrDisorderJob, AggrOrderJob]:
job = job_class(
config=self.config,
)
# InitializationJob should be executed once only
for job_class in [InitializationJob, AggrDisorderJob, AggrOrderJob]:
job = job_class(config=self.config, job_list=self.job_list)
jobs.append(job)
return jobs
Loading