Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hemera/indexer/domains/token_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ def handle_withdraw_event(log: Log) -> List[TokenTransfer]:


def handle_transfer_event(log: Log) -> List[TokenTransfer]:
decode_data = ERC20_TRANSFER_EVENT.decode_log_ignore_indexed(log)
try:
decode_data = ERC20_TRANSFER_EVENT.decode_log_ignore_indexed(log)
except Exception as e:
logger.error(f"Error decoding transfer event: {e}, log: {log}")
return []

from_address = decode_data.get("from").lower()
to_address = decode_data.get("to").lower()
Expand Down
23 changes: 22 additions & 1 deletion hemera/indexer/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from hemera.indexer.domains.token_balance import TokenBalance
from hemera.indexer.domains.token_transfer import ERC20TokenTransfer
from hemera.indexer.exporters.base_exporter import BaseExporter
from hemera_udf.token_holder_metrics.domains.metrics import TokenHolderMetricsCurrentD, TokenHolderMetricsHistoryD
from hemera_udf.token_holder_metrics.domains.metrics import (
ERC20TokenTransferWithPriceD,
TokenHolderMetricsCurrentD,
TokenHolderMetricsHistoryD,
)
from hemera_udf.token_price.domains import DexBlockTokenPrice
from hemera_udf.uniswap_v2 import UniswapV2SwapEvent
from hemera_udf.uniswap_v3 import UniswapV3SwapEvent
Expand All @@ -39,6 +43,22 @@ def __init__(self, output, max_retries=5, ack_mode="all", timeout=30):
self.max_retries = max_retries
self.timeout = timeout
self.producer = None
if os.environ.get("KAFKA_ACK_MODE", None):
ack_mode = os.environ.get("KAFKA_ACK_MODE")
# Convert string ack_mode to appropriate format
if ack_mode is not None:
if ack_mode.lower() == "all":
ack_mode = -1
else:
try:
ack_mode = int(ack_mode)
except ValueError:
# Handle invalid values (not a number or "all")
logger.warning(f"Invalid KAFKA_ACK_MODE: {ack_mode}, defaulting to 1")
ack_mode = 1
else:
# Default value if not set
ack_mode = 1 # Or whatever default you prefer
self._create_producer(ack_mode)

def _create_producer(self, ack_mode):
Expand Down Expand Up @@ -180,6 +200,7 @@ def domain_mapping(self, item):
ERC20TokenTransfer,
TokenHolderMetricsCurrentD,
TokenHolderMetricsHistoryD,
ERC20TokenTransferWithPriceD,
),
):
return data
Expand Down
Loading