Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
13 changes: 13 additions & 0 deletions indexer/modules/custom/test_job/domain/test_job_domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from indexer.domain import FilterData

@dataclass
class TestJobDomain(FilterData):
from_address: str
to_address: str
value: int
token_address: str
block_timestamp: int
block_number: int
transaction_hash: str
log_index: int
42 changes: 42 additions & 0 deletions indexer/modules/custom/test_job/models/test_job_ model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from common.models import HemeraModel, general_converter

from sqlalchemy import Column, Index, PrimaryKeyConstraint, func, text
from sqlalchemy.dialects.postgresql import BIGINT, BOOLEAN, BYTEA, INTEGER, NUMERIC, TIMESTAMP

class TestJobModel(HemeraModel):
__tablename__ = 'test_job_table'

from_address = Column(BYTEA)
to_address = Column(BYTEA)
value = Column(BIGINT)
token_address = Column(BYTEA)
block_timestamp = Column(TIMESTAMP)
block_number = Column(BIGINT)
transaction_hash = Column(BYTEA)
log_index = Column(BIGINT)

create_time = Column(TIMESTAMP, server_default=func.now())
update_time = Column(TIMESTAMP, server_default=func.now())
reorg = Column(BOOLEAN, server_default=text("false"))

__table_args__ = (
PrimaryKeyConstraint('token_address', 'log_index'),
)

@staticmethod
def model_domain_mapping():
return [
{
"domain": "TestJobDomain",
"conflict_do_update": True,
"update_strategy": None,
"converter": general_converter,
},
]

Index(
"test_job_table_address_id_index",
TestJobModel.token_address,
TestJobModel.log_index,
)

87 changes: 87 additions & 0 deletions indexer/modules/custom/test_job/test_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import List, Union
from indexer.jobs.base_job import Collector, FilterTransactionDataJob
from indexer.domain.log import Log
from indexer.domain.token_transfer import TokenTransfer
from indexer.modules.custom.test_job.domain.test_job_domain import TestJobDomain
from indexer.utils.abi_setting import ERC20_TRANSFER_EVENT, ERC721_TRANSFER_EVENT, ERC1155_SINGLE_TRANSFER_EVENT, ERC1155_BATCH_TRANSFER_EVENT
from indexer.specification.specification import TopicSpecification, TransactionFilterByLogs

class TestJob(FilterTransactionDataJob):

def __init__(self, **kwargs):
super().__init__(**kwargs)

def get_filter(self):

return TransactionFilterByLogs([
TopicSpecification(topics=[ERC20_TRANSFER_EVENT.get_signature()]),
TopicSpecification(topics=[ERC721_TRANSFER_EVENT.get_signature()]),
TopicSpecification(topics=[ERC1155_SINGLE_TRANSFER_EVENT.get_signature()]),
TopicSpecification(topics=[ERC1155_BATCH_TRANSFER_EVENT.get_signature()])
])

def _udf(self, logs: List[Log], output: Collector[Union[TokenTransfer, TestJobDomain]]):
token_transfers = []
test_job_domains = []

for log in logs:
if log.topic0 == ERC20_TRANSFER_EVENT.get_signature():
decoded_data = ERC20_TRANSFER_EVENT.decode_log(log)
token_transfers.append(
TokenTransfer(
from_address=decoded_data["from"],
to_address=decoded_data["to"],
value=decoded_data["value"],
token_address=log.address,
block_timestamp=log.block_timestamp,
block_number=log.block_number,
transaction_hash=log.transaction_hash,
log_index=log.log_index,
)
)
elif log.topic0 == ERC721_TRANSFER_EVENT.get_signature():
decoded_data = ERC721_TRANSFER_EVENT.decode_log(log)
test_job_domains.append(
TestJobDomain(
from_address=decoded_data["from"],
to_address=decoded_data["to"],
value=decoded_data["tokenId"],
token_address=log.address,
block_timestamp=log.block_timestamp,
block_number=log.block_number,
transaction_hash=log.transaction_hash,
log_index=log.log_index,
)
)
elif log.topic0 == ERC1155_SINGLE_TRANSFER_EVENT.get_signature():
decoded_data = ERC1155_SINGLE_TRANSFER_EVENT.decode_log(log)
test_job_domains.append(
TestJobDomain(
from_address=decoded_data["from"],
to_address=decoded_data["to"],
value=decoded_data["id"],
token_address=log.address,
block_timestamp=log.block_timestamp,
block_number=log.block_number,
transaction_hash=log.transaction_hash,
log_index=log.log_index,
)
)
elif log.topic0 == ERC1155_BATCH_TRANSFER_EVENT.get_signature():
decoded_data = ERC1155_BATCH_TRANSFER_EVENT.decode_log(log)
for i in range(len(decoded_data["ids"])):
test_job_domains.append(
TestJobDomain(
from_address=decoded_data["from"],
to_address=decoded_data["to"],
value=decoded_data["ids"][i],
token_address=log.address,
block_timestamp=log.block_timestamp,
block_number=log.block_number,
transaction_hash=log.transaction_hash,
log_index=log.log_index,
)
)

output.collect_domains(token_transfers)
output.collect_domains(test_job_domains)
Empty file.
13 changes: 13 additions & 0 deletions indexer/modules/custom/test_udf/domain/test_udf_domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from indexer.domain import FilterData

@dataclass
class TestUdfDomain(FilterData):
from_address: str
to_address: str
value: int
token_address: str
block_timestamp: int
block_number: int
transaction_hash: str
log_index: int
Empty file.
20 changes: 20 additions & 0 deletions indexer/modules/custom/test_udf/model/test_udf_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from sqlalchemy import Column, String, Integer, BigInteger, TIMESTAMP, PrimaryKeyConstraint
from sqlalchemy.dialects.postgresql import BYTEA
from common.models import HemeraModel

class TestUdfModel(HemeraModel):
__tablename__ = 'test_udf_table'

from_address = Column(String)
to_address = Column(String)
value = Column(BigInteger)
token_address = Column(String)
block_timestamp = Column(TIMESTAMP)
block_number = Column(BigInteger)
transaction_hash = Column(BYTEA)
log_index = Column(Integer)

__table_args__ = (
PrimaryKeyConstraint('token_address', 'log_index'),
)

44 changes: 44 additions & 0 deletions indexer/modules/custom/test_udf/test_udf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List
from indexer.jobs.base_job import Collector, FilterTransactionDataJob
from indexer.domain.log import Log
from indexer.utils.abi_setting import ERC20_TRANSFER_EVENT
from indexer.specification.specification import TopicSpecification, TransactionFilterByLogs
from indexer.modules.custom.test_udf.domain.test_udf_domain import TestUdfDomain

class TestUDFJob(FilterTransactionDataJob):

def __init__(self, **kwargs):
super().__init__(**kwargs)

def get_filter(self):
# Define the filter for ERC20 transfer events
return TransactionFilterByLogs([
TopicSpecification(topics=[ERC20_TRANSFER_EVENT.get_signature()])
])

def _udf(self, logs: List[Log], output: Collector[TestUdfDomain]):
"""Process input data and collect output results.

Args:
logs: List of Log objects.
output: Collector to collect TestUdfDomain objects.
"""
token_transfers = []

for log in logs:
if log.topic0 == ERC20_TRANSFER_EVENT.get_signature():
decoded_data = ERC20_TRANSFER_EVENT.decode_log(log)
token_transfers.append(
TestUdfDomain(
from_address=decoded_data["from"],
to_address=decoded_data["to"],
value=decoded_data["value"],
token_address=log.address,
block_timestamp=log.block_timestamp,
block_number=log.block_number,
transaction_hash=log.transaction_hash,
log_index=log.log_index,
)
)

output.collect_domains(token_transfers)