Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
e7f945e
stash
will0x0909 Nov 13, 2024
c1ddfbb
stash
will0x0909 Nov 14, 2024
a544580
stash
will0x0909 Nov 14, 2024
35b489e
stash
will0x0909 Nov 14, 2024
c9879f3
stash
will0x0909 Nov 14, 2024
6a3ee5c
stash
will0x0909 Nov 14, 2024
0abcbdb
stash
will0x0909 Nov 15, 2024
3410012
stash
will0x0909 Nov 15, 2024
95dfbc3
stash
will0x0909 Nov 15, 2024
840aad0
update dependency lock file
xuzh2024 Nov 15, 2024
aedb974
a new pg service
will0x0909 Nov 15, 2024
f1ee4db
a new pg service
will0x0909 Nov 15, 2024
a7f2f83
a new pg service
will0x0909 Nov 15, 2024
7fb38f9
a new pg service
will0x0909 Nov 15, 2024
d3e508d
a new pg service
will0x0909 Nov 15, 2024
c01f0a1
a new pg service
will0x0909 Nov 15, 2024
00b198f
a new pg service
will0x0909 Nov 15, 2024
4a34f43
a new pg service
will0x0909 Nov 15, 2024
95c96cb
Merge remote-tracking branch 'origin/multi_processor' into multi_proc…
xuzh2024 Nov 15, 2024
b4d36d6
a new pg service
will0x0909 Nov 15, 2024
532b5d7
make job execute concurrently
xuzh2024 Nov 15, 2024
dca7472
make format
xuzh2024 Nov 15, 2024
3148a37
a new pg service
will0x0909 Nov 16, 2024
44356fd
a new pg service
will0x0909 Nov 16, 2024
8bbc88a
a new pg service
will0x0909 Nov 18, 2024
6951e46
a new pg service
will0x0909 Nov 18, 2024
6c08780
a new pg service
will0x0909 Nov 18, 2024
c01c261
a new pg service
will0x0909 Nov 18, 2024
51be78a
a new pg service
will0x0909 Nov 18, 2024
30f4d7a
a new pg service
will0x0909 Nov 18, 2024
d819f77
a new pg service
will0x0909 Nov 18, 2024
4e011a2
minimize runnable version
xuzh2024 Nov 19, 2024
b78d713
refactor for multi-processing
xuzh2024 Nov 21, 2024
91f9b64
modify export_tokens_and_transfers_job for multiprocessing
xuzh2024 Nov 21, 2024
ce027e0
modify error code
xuzh2024 Nov 21, 2024
0eaccee
bug fix
xuzh2024 Nov 21, 2024
7516983
update buffer default parameter
xuzh2024 Nov 21, 2024
62a4119
fix transaction collecting
xuzh2024 Nov 21, 2024
f02efba
shutdown gracefully
xuzh2024 Nov 21, 2024
e0ad0fa
update async sync-record submit and failure job record
xuzh2024 Nov 21, 2024
39316bb
add failure records table scripts
xuzh2024 Nov 21, 2024
37e99e9
file rename
xuzh2024 Nov 22, 2024
7972725
update file sync recorder for multiprocessing
xuzh2024 Nov 22, 2024
40bf986
disable progress bar
xuzh2024 Nov 22, 2024
1755bb5
enable daemon
xuzh2024 Nov 22, 2024
9b2bc5e
make format
xuzh2024 Nov 22, 2024
827b01f
parameter tuning
xuzh2024 Nov 25, 2024
7d74f5e
running log
xuzh2024 Nov 25, 2024
f6b7ec7
modify log
xuzh2024 Nov 25, 2024
e593748
modify log
xuzh2024 Nov 25, 2024
85df4b4
rebuild exception detail while missing context
xuzh2024 Nov 26, 2024
f424fe4
compatible change for reorg
xuzh2024 Nov 26, 2024
961688a
make format
xuzh2024 Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_version():


@click.group()
@click.version_option(version=get_version())
# @click.version_option(version=get_version())
@click.pass_context
def cli(ctx):
pass
Expand Down
85 changes: 40 additions & 45 deletions cli/reorg.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from indexer.utils.exception_recorder import ExceptionRecorder
from indexer.utils.logging_utils import configure_logging, configure_signals
from indexer.utils.provider import get_provider_from_uri
from indexer.utils.reorg import check_reorg
from indexer.utils.rpc_utils import pick_random_provider_uri
from indexer.utils.thread_local_proxy import ThreadLocalProxy

exception_recorder = ExceptionRecorder()
# exception_recorder = ExceptionRecorder()


@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
Expand Down Expand Up @@ -46,18 +47,6 @@
envvar="POSTGRES_URL",
help="The required postgres connection url." "e.g. postgresql+psycopg2://postgres:admin@127.0.0.1:5432/ethereum",
)
@click.option(
"-v",
"--db-version",
default="head",
show_default=True,
type=str,
envvar="DB_VERSION",
help="The database version to initialize the database. using the alembic script's revision ID to "
"specify a version."
" e.g. head, indicates the latest version."
"or base, indicates the empty database without any table.",
)
@click.option(
"-b",
"--batch-size",
Expand All @@ -77,6 +66,7 @@
)
@click.option(
"--block-number",
default=None,
show_default=True,
type=int,
envvar="BLOCK_NUMBER",
Expand All @@ -85,12 +75,20 @@
@click.option(
"-r",
"--ranges",
default=1000,
default=10,
show_default=True,
type=int,
envvar="RANGES",
help="Specify the range limit for data fixing.",
)
@click.option(
"--check-ranges",
default=None,
show_default=True,
type=int,
envvar="CHECK_RANGES",
help="Specify the range for block continuous checking.",
)
@click.option(
"--log-file",
default=None,
Expand All @@ -109,14 +107,6 @@
envvar="MULTI_CALL_ENABLE",
)
@click.option("--cache", default=None, show_default=True, type=str, envvar="CACHE", help="Cache")
@click.option(
"--auto-upgrade-db",
default=True,
show_default=True,
type=bool,
envvar="AUTO_UPGRADE_DB",
help="Whether to automatically run database migration scripts to update the database to the latest version.",
)
@click.option(
"--log-level",
default="INFO",
Expand All @@ -131,14 +121,13 @@ def reorg(
postgres_url,
block_number,
ranges,
check_ranges,
batch_size,
debug_batch_size,
db_version="head",
multicall=True,
log_file=None,
cache=None,
config_file=None,
auto_upgrade_db=True,
log_level="INFO",
):
configure_logging(log_level=log_level, log_file=log_file)
Expand All @@ -151,9 +140,9 @@ def reorg(

# build postgresql service
if postgres_url:
service = PostgreSQLService(postgres_url, db_version=db_version, init_schema=auto_upgrade_db)
config = {"db_service": service}
exception_recorder.init_pg_service(service)
service = PostgreSQLService(postgres_url)
config = {"db_service": postgres_url}
# exception_recorder.init_pg_service(service)
else:
logging.error("No postgres url provided. Exception recorder will not be useful.")
exit(1)
Expand All @@ -177,9 +166,9 @@ def reorg(
output_types = list(generate_output_types(entity_types))

job_scheduler = ReorgScheduler(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
batch_web3_debug_provider=ThreadLocalProxy(lambda: get_provider_from_uri(debug_provider_uri, batch=True)),
item_exporters=PostgresItemExporter(config["db_service"]),
web3_provider_uri=provider_uri,
web3_debug_provider_uri=debug_provider_uri,
item_exporters=PostgresItemExporter(service_url=postgres_url),
batch_size=batch_size,
debug_batch_size=debug_batch_size,
required_output_types=output_types,
Expand All @@ -192,23 +181,29 @@ def reorg(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=False)),
job_scheduler=job_scheduler,
ranges=ranges,
config=config,
service=service,
)

job = None
current_block = controller.get_current_block_number()
if check_ranges:
check_begin = current_block - check_ranges
check_reorg(service, check_begin)
else:
check_reorg(service)

while True:
if job:
controller.action(
job_id=job.job_id,
block_number=job.last_fixed_block_number - 1,
remains=job.remain_process,
)
else:
if block_number:
controller.action(block_number=block_number)

job = controller.wake_up_next_job()
if job:
logging.info(f"Waking up uncompleted job: {job.job_id}.")
else:
logging.info("No more uncompleted jobs to wake-up, reorg process will terminate.")
break
job = controller.wake_up_next_job()
if job:
logging.info(f"Waking up uncompleted job: {job.job_id}.")

controller.action(
job_id=job.job_id,
block_number=job.last_fixed_block_number - 1,
remains=job.remain_process,
)
else:
logging.info("No more uncompleted jobs to wake-up, reorg process will terminate.")
break
34 changes: 28 additions & 6 deletions cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,22 @@ def wrapper(*args, **kwargs):
envvar="BLOCK_BATCH_SIZE",
help="How many blocks to batch in single sync round",
)
@click.option(
"-P",
"--max-processors",
default=1,
show_default=True,
type=int,
help="How many sync round to concurrently execute.",
envvar="MAX_PROCESSOR",
)
@click.option(
"-w",
"--max-workers",
default=5,
show_default=True,
type=int,
help="The number of workers",
help="The number of workers during a request to rpc.",
envvar="MAX_WORKERS",
)
@click.option(
Expand Down Expand Up @@ -325,6 +334,7 @@ def stream(
batch_size=10,
debug_batch_size=1,
block_batch_size=1,
max_processors=1,
max_workers=5,
log_file=None,
pid_file=None,
Expand All @@ -346,6 +356,12 @@ def stream(
debug_provider_uri = pick_random_provider_uri(debug_provider_uri)
logging.getLogger("ROOT").info("Using provider " + provider_uri)
logging.getLogger("ROOT").info("Using debug provider " + debug_provider_uri)
logging.getLogger("ROOT").info(
f"Indexer will run in {'multi' if max_processors > 1 else 'single'}-process mode "
f"{'with ' if max_processors > 1 else ''}"
f"{max_processors if max_processors > 1 else ''}"
f" {'processor' if max_processors > 1 else ''} "
)

# parameter logic checking
if source_path:
Expand All @@ -361,7 +377,7 @@ def stream(

if postgres_url:
service = PostgreSQLService(postgres_url, db_version=db_version, init_schema=auto_upgrade_db)
config["db_service"] = service
config["db_service"] = postgres_url
exception_recorder.init_pg_service(service)
else:
logging.getLogger("ROOT").warning("No postgres url provided. Exception recorder will not be useful.")
Expand Down Expand Up @@ -397,14 +413,14 @@ def stream(
output_types = list(
set(generate_dataclass_type_list_from_parameter(output_types, "output") + output_types_by_entity_type)
)
output_types.sort(key=lambda x: x.type())

if source_path and source_path.startswith("postgresql://"):
source_types = generate_dataclass_type_list_from_parameter(source_types, "source")

job_scheduler = JobScheduler(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
batch_web3_debug_provider=ThreadLocalProxy(lambda: get_provider_from_uri(debug_provider_uri, batch=True)),
item_exporters=create_item_exporters(output, config),
web3_provider_uri=provider_uri,
web3_debug_provider_uri=debug_provider_uri,
batch_size=batch_size,
debug_batch_size=debug_batch_size,
max_workers=max_workers,
Expand All @@ -414,12 +430,16 @@ def stream(
cache=cache,
auto_reorg=auto_reorg,
multicall=multicall,
multiprocess=max_processors > 1,
force_filter_mode=force_filter_mode,
)

controller = StreamController(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=False)),
job_scheduler=job_scheduler,
max_processors=max_processors,
scheduled_jobs=job_scheduler.get_scheduled_jobs(),
item_exporters=create_item_exporters(output, config),
required_output_types=output_types,
sync_recorder=create_recorder(sync_recorder, config),
limit_reader=create_limit_reader(
source_path, ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=False))
Expand All @@ -435,3 +455,5 @@ def stream(
period_seconds=period_seconds,
pid_file=pid_file,
)

controller.shutdown()
16 changes: 16 additions & 0 deletions common/models/failure_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import BIGINT, JSON, TIMESTAMP, VARCHAR

from common.models import HemeraModel


class FailureRecords(HemeraModel):
__tablename__ = "failure_records"
record_id = Column(BIGINT, primary_key=True, autoincrement=True)
mission_sign = Column(VARCHAR)
output_types = Column(VARCHAR)
start_block_number = Column(BIGINT)
end_block_number = Column(BIGINT)
exception_stage = Column(VARCHAR)
exception = Column(JSON)
crash_time = Column(TIMESTAMP)
Loading