From 8a712beb324b4601449fb34ab3fa0929d0380051 Mon Sep 17 00:00:00 2001 From: Aarav Date: Wed, 25 Dec 2024 12:41:49 +0530 Subject: [PATCH] feat: Add degen wallet tracker module - Add DegenWallet domain model - Add DegenWalletHoldings database model - Implement DegenTrackerJob for monitoring wallet activities - Update config template with degen tracker settings --- config/indexer-config-template.yaml | 10 +- .../modules/custom/degen_tracker/__init__.py | 3 + .../custom/degen_tracker/degen_tracker_job.py | 119 ++++++++++++++++++ .../degen_tracker/domain/degen_wallet.py | 17 +++ .../models/degen_wallet_holdings.py | 24 ++++ 5 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 indexer/modules/custom/degen_tracker/__init__.py create mode 100644 indexer/modules/custom/degen_tracker/degen_tracker_job.py create mode 100644 indexer/modules/custom/degen_tracker/domain/degen_wallet.py create mode 100644 indexer/modules/custom/degen_tracker/models/degen_wallet_holdings.py diff --git a/config/indexer-config-template.yaml b/config/indexer-config-template.yaml index 76392084b..4e0d6dccb 100644 --- a/config/indexer-config-template.yaml +++ b/config/indexer-config-template.yaml @@ -1,4 +1,8 @@ chain_id: 1 -demo_job: - contract_address: - - "0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d" \ No newline at end of file +degen_tracker_job: + tracked_addresses: + - "0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d" # Example degen address + tracked_tokens: + - "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2" # WETH + - "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" # WBTC + - "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" # USDC \ No newline at end of file diff --git a/indexer/modules/custom/degen_tracker/__init__.py b/indexer/modules/custom/degen_tracker/__init__.py new file mode 100644 index 000000000..deb939fd6 --- /dev/null +++ b/indexer/modules/custom/degen_tracker/__init__.py @@ -0,0 +1,3 @@ +from indexer.modules.custom.degen_tracker.degen_tracker_job import DegenTrackerJob + +__all__ = ['DegenTrackerJob'] diff --git a/indexer/modules/custom/degen_tracker/degen_tracker_job.py b/indexer/modules/custom/degen_tracker/degen_tracker_job.py new file mode 100644 index 000000000..5c4345e9a --- /dev/null +++ b/indexer/modules/custom/degen_tracker/degen_tracker_job.py @@ -0,0 +1,119 @@ +import logging +from datetime import datetime +from typing import List, Optional, Set, Dict + +from sqlalchemy import select + +from common.models import db +from common.models.logs import Logs +from common.models.tokens import Tokens +from common.models.token_hourly_price import TokenHourlyPrices +from common.utils.format_utils import bytes_to_hex_str, hex_str_to_bytes +from indexer.domain.log import Log +from indexer.jobs.base_job import BaseJob +from indexer.modules.custom.degen_tracker.models.degen_wallet_holdings import DegenWalletHoldings +from indexer.modules.custom.degen_tracker.domain.degen_wallet import DegenWallet + +logger = logging.getLogger(__name__) + +class DegenTrackerJob(BaseJob): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.degen_addresses = set() + self.tracked_tokens = set() + self._load_config() + + def _load_config(self): + """Load degen addresses and tracked tokens from config""" + config = self._config.get("degen_tracker_job", {}) + addresses = config.get("tracked_addresses", []) + tokens = config.get("tracked_tokens", []) + + self.degen_addresses = {hex_str_to_bytes(addr.lower()) for addr in addresses} + self.tracked_tokens = {hex_str_to_bytes(token.lower()) for token in tokens} + + def process_logs(self, logs: List[Log]) -> None: + """Process logs to track degen wallet activities""" + if not logs: + return + + # Get relevant token transfers + transfer_logs = [ + log for log in logs + if (log.address in self.tracked_tokens or not self.tracked_tokens) and + (log.topic0 == hex_str_to_bytes("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")) # Transfer event + ] + + # Extract unique addresses and tokens + addresses = {log.topic1[12:] for log in transfer_logs}.union({log.topic2[12:] for log in transfer_logs}) + addresses = addresses.intersection(self.degen_addresses) if self.degen_addresses else addresses + + if not addresses: + return + + # Update holdings for each address + for address in addresses: + self._update_holdings(address) + + def _update_holdings(self, address: bytes) -> None: + """Update holdings for a specific address""" + # Get token balances + token_balances = self._get_token_balances(address) + + # Get token details and prices + token_details = self._get_token_details(list(token_balances.keys())) + + # Update database + for token_address, balance in token_balances.items(): + if token_address not in token_details: + continue + + token = token_details[token_address] + value_usd = self._get_token_value_usd(token["symbol"], balance, token["decimals"]) + + holding = DegenWalletHoldings( + address=address, + token_address=token_address, + balance=balance, + token_symbol=token["symbol"], + token_decimals=token["decimals"], + last_transaction_time=datetime.now(), + total_value_usd=value_usd, + block_number=self._current_block_number, + block_timestamp=datetime.now() + ) + + db.session.merge(holding) + + db.session.commit() + + def _get_token_balances(self, address: bytes) -> Dict[bytes, float]: + """Get token balances for an address""" + # Implementation depends on your specific needs + # You might want to use Web3 calls or your existing database + pass + + def _get_token_details(self, token_addresses: List[bytes]) -> Dict[bytes, Dict]: + """Get token details from database""" + tokens = db.session.query(Tokens).filter( + Tokens.address.in_(token_addresses) + ).all() + + return { + token.address: { + "symbol": token.symbol, + "decimals": token.decimals + } + for token in tokens + } + + def _get_token_value_usd(self, symbol: str, amount: float, decimals: int) -> float: + """Get USD value of token amount""" + price = db.session.query(TokenHourlyPrices).filter( + TokenHourlyPrices.symbol == symbol + ).order_by(TokenHourlyPrices.timestamp.desc()).first() + + if not price: + return 0.0 + + return float(price.price) * (amount / (10 ** decimals)) diff --git a/indexer/modules/custom/degen_tracker/domain/degen_wallet.py b/indexer/modules/custom/degen_tracker/domain/degen_wallet.py new file mode 100644 index 000000000..479cf5b55 --- /dev/null +++ b/indexer/modules/custom/degen_tracker/domain/degen_wallet.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +@dataclass +class DegenWallet: + address: bytes + token_address: bytes + balance: float + token_symbol: str + token_decimals: int + last_transaction_time: datetime + total_value_usd: float + block_number: int + block_timestamp: datetime + create_time: Optional[datetime] = None + update_time: Optional[datetime] = None diff --git a/indexer/modules/custom/degen_tracker/models/degen_wallet_holdings.py b/indexer/modules/custom/degen_tracker/models/degen_wallet_holdings.py new file mode 100644 index 000000000..f487b0478 --- /dev/null +++ b/indexer/modules/custom/degen_tracker/models/degen_wallet_holdings.py @@ -0,0 +1,24 @@ +from sqlalchemy import Column, Index, TIMESTAMP, func, text +from sqlalchemy.dialects.postgresql import BIGINT, BYTEA, FLOAT, INTEGER, VARCHAR + +from common.models import HemeraModel + +class DegenWalletHoldings(HemeraModel): + __tablename__ = "degen_wallet_holdings" + + address = Column(BYTEA, primary_key=True) + token_address = Column(BYTEA, primary_key=True) + balance = Column(FLOAT) + token_symbol = Column(VARCHAR) + token_decimals = Column(INTEGER) + last_transaction_time = Column(TIMESTAMP) + total_value_usd = Column(FLOAT) + block_number = Column(BIGINT) + block_timestamp = Column(TIMESTAMP) + create_time = Column(TIMESTAMP, server_default=func.now()) + update_time = Column(TIMESTAMP, server_default=func.now()) + +# Create indexes for efficient querying +Index('degen_wallet_holdings_address_idx', DegenWalletHoldings.address) +Index('degen_wallet_holdings_token_address_idx', DegenWalletHoldings.token_address) +Index('degen_wallet_holdings_value_idx', DegenWalletHoldings.total_value_usd.desc())