Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hemera_udf/token_holder_metrics/domains/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ class ERC20TokenTransferWithPriceD(Domain):
to_address: str
value: int
price: float
decimals: int
is_swap: bool
from_address_balance: int # balance after transfer
to_address_balance: int # balance after transfer
token_type: str
token_address: str
block_number: int
Expand All @@ -29,7 +32,7 @@ class TokenHolderMetricsD(Domain):
first_block_timestamp: int
last_swap_timestamp: int
last_transfer_timestamp: int

last_price: float = 0.0
current_balance: int = 0
max_balance: int = 0
max_balance_timestamp: int = 0
Expand Down Expand Up @@ -60,6 +63,7 @@ class TokenHolderMetricsD(Domain):
realized_pnl: float = 0.0
sell_pnl: float = 0.0
win_rate: float = 0.0
pnl_valid: bool = False


@dataclass
Expand Down
71 changes: 61 additions & 10 deletions hemera_udf/token_holder_metrics/export_token_holder_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
logger = logging.getLogger(__name__)

MAX_SAFE_VALUE = 2**255
MIN_BALANCE_THRESHOLD = 1e-4


class ExportTokenHolderMetricsJob(ExtensionJob):
Expand Down Expand Up @@ -52,6 +53,24 @@ def _process(self, **kwargs):
)
logger.info(f"Filtered non-meme tokens in {time.time() - t2:.2f}s")

self._block_address_token_values = {}
self._block_address_token_balances = {}
for transfer in transfers:
block_number = transfer.block_number
from_key = (block_number, transfer.from_address, transfer.token_address)
to_key = (block_number, transfer.to_address, transfer.token_address)

if from_key not in self._block_address_token_values:
self._block_address_token_values[from_key] = 0
self._block_address_token_values[from_key] -= transfer.value

if to_key not in self._block_address_token_values:
self._block_address_token_values[to_key] = 0
self._block_address_token_values[to_key] += transfer.value

self._block_address_token_balances[from_key] = transfer.from_address_balance
self._block_address_token_balances[to_key] = transfer.to_address_balance

t3 = time.time()
address_token_pairs = set()
for transfer in transfers:
Expand Down Expand Up @@ -84,7 +103,7 @@ def _process(self, **kwargs):
continue

token = self.tokens[transfer.token_address]
amount_usd = transfer.value * transfer.price / 10 ** token["decimals"]
amount_usd = transfer.value * transfer.price / 10 ** (token["decimals"] or 0)

# Process "from" address
self._update_holder_metrics(
Expand All @@ -94,6 +113,7 @@ def _process(self, **kwargs):
transfer,
"out",
amount_usd,
transfer.price,
token,
)

Expand All @@ -105,10 +125,15 @@ def _process(self, **kwargs):
transfer,
"in",
amount_usd,
transfer.price,
token,
)


for metrics in current_metrics.values():
metrics.current_balance = self._block_address_token_balances.get((metrics.block_number, metrics.holder_address, metrics.token_address), 0)

logger.info(f"Metrics update completed in {time.time() - t6:.2f}s")


self._collect_items(TokenHolderMetricsCurrentD.type(), list(current_metrics.values()))
history_metrics = [TokenHolderMetricsHistoryD(**asdict(metrics)) for metrics in current_metrics.values()]
Expand Down Expand Up @@ -147,7 +172,7 @@ def _get_address_token_holder_metrics_batch(
query = text(
f"""
SELECT *
FROM af_token_holder_metrics_current_all_p{partition_idx}
FROM af_token_holder_metrics_current_p{partition_idx}
WHERE (holder_address, token_address) IN :pairs
"""
)
Expand Down Expand Up @@ -210,6 +235,7 @@ def _get_address_token_holder_metrics_batch(
realized_pnl=float(metrics.realized_pnl or 0),
sell_pnl=float(metrics.sell_pnl or 0),
win_rate=float(metrics.win_rate or 0),
pnl_valid=bool(metrics.pnl_valid or False),
)

session.close()
Expand All @@ -223,6 +249,7 @@ def _update_holder_metrics(
transfer,
transfer_action: str,
amount_usd: float,
token_price: float,
token: dict,
):
key = (holder_address, token_address)
Expand All @@ -236,6 +263,7 @@ def _update_holder_metrics(
first_block_timestamp=transfer.block_timestamp,
last_swap_timestamp=transfer.block_timestamp,
last_transfer_timestamp=transfer.block_timestamp,
pnl_valid=False,
)

metrics = current_metrics[key]
Expand All @@ -245,12 +273,24 @@ def _update_holder_metrics(
metrics.block_number = transfer.block_number
metrics.block_timestamp = transfer.block_timestamp

set_pnl_valid_block_number = 0

new_current_balance = self._block_address_token_balances.get((transfer.block_number, holder_address, token_address), 0) or 0

if not metrics.pnl_valid:
block_key = (transfer.block_number, holder_address, token_address)
total_value = self._block_address_token_values.get(block_key, 0)

if abs(total_value - new_current_balance) < MIN_BALANCE_THRESHOLD or new_current_balance < MIN_BALANCE_THRESHOLD:
metrics.pnl_valid = True
set_pnl_valid_block_number = transfer.block_number

# buy
# update balance
# update total buy count, amount, usd
# update current average buy price
# sell
# set average buy price to 0 when balance is less than 0.00001
# set average buy price to 0 when balance is less than MIN_BALANCE_THRESHOLD
# calculate pnl
# update balance
# update total sell count, amount, usd
Expand All @@ -260,7 +300,7 @@ def _update_holder_metrics(
# update win rate
if transfer_action == "in":
new_balance = metrics.current_balance + transfer.value
if new_balance / 10 ** token["decimals"] > 0.00001:
if new_balance / 10 ** token["decimals"] > MIN_BALANCE_THRESHOLD:
new_average_buy_price = (
amount_usd + metrics.current_balance * metrics.current_average_buy_price / 10 ** token["decimals"]
) / ((transfer.value + metrics.current_balance) / 10 ** token["decimals"])
Expand All @@ -274,19 +314,18 @@ def _update_holder_metrics(
metrics.current_average_buy_price = new_average_buy_price
else:
sell_amount = transfer.value
sell_price = amount_usd

if metrics.current_balance > 0:
sell_pnl = (sell_price - metrics.current_average_buy_price) * sell_amount / 10 ** token["decimals"]
sell_pnl = (token_price - metrics.current_average_buy_price) * sell_amount / 10 ** token["decimals"]
metrics.sell_pnl += sell_pnl

metrics.realized_pnl = (
metrics.total_sell_usd
- metrics.total_buy_usd
+ metrics.current_balance * sell_price / 10 ** token["decimals"]
+ metrics.current_balance * token_price / 10 ** token["decimals"]
)

if sell_price > metrics.current_average_buy_price:
if token_price > metrics.current_average_buy_price:
metrics.success_sell_count += 1
else:
metrics.fail_sell_count += 1
Expand All @@ -296,7 +335,7 @@ def _update_holder_metrics(
metrics.win_rate = metrics.success_sell_count / total_sells

metrics.current_balance -= sell_amount
if metrics.current_balance / 10 ** token["decimals"] < 0.00001:
if metrics.current_balance / 10 ** token["decimals"] < MIN_BALANCE_THRESHOLD:
metrics.current_average_buy_price = 0

metrics.total_sell_count += 1
Expand All @@ -315,6 +354,7 @@ def _update_holder_metrics(
metrics.sell_50_timestamp = metrics.block_timestamp

metrics.last_transfer_timestamp = metrics.block_timestamp
metrics.last_price = token_price
if transfer.is_swap:
metrics.last_swap_timestamp = metrics.block_timestamp

Expand All @@ -326,3 +366,14 @@ def _update_holder_metrics(
metrics.swap_sell_count += 1
metrics.swap_sell_amount += transfer.value
metrics.swap_sell_usd += amount_usd

if not metrics.pnl_valid or metrics.block_number == set_pnl_valid_block_number:
metrics.sell_pnl = 0
metrics.realized_pnl = 0
metrics.success_sell_count = 0
metrics.fail_sell_count = 0
metrics.win_rate = 0
metrics.current_average_buy_price = 0
if metrics.block_number == set_pnl_valid_block_number and new_current_balance > MIN_BALANCE_THRESHOLD:
metrics.current_average_buy_price = token_price

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from sqlalchemy import text

from hemera.common.utils.format_utils import bytes_to_hex_str, hex_str_to_bytes
from hemera.indexer.domains.current_token_balance import CurrentTokenBalance
from hemera.indexer.domains.token_balance import TokenBalance
from hemera.indexer.domains.token_transfer import ERC20TokenTransfer
from hemera.indexer.jobs.base_job import ExtensionJob
from hemera_udf.token_holder_metrics.domains.metrics import ERC20TokenTransferWithPriceD
Expand All @@ -16,7 +18,7 @@


class ExportTokenTransferWithPriceJob(ExtensionJob):
dependency_types = [ERC20TokenTransfer, DexBlockTokenPrice, UniswapV2SwapEvent, UniswapV3SwapEvent]
dependency_types = [ERC20TokenTransfer, DexBlockTokenPrice, UniswapV2SwapEvent, UniswapV3SwapEvent, TokenBalance]
output_types = [ERC20TokenTransferWithPriceD]
able_to_reorg = True

Expand All @@ -32,6 +34,9 @@ def _process(self, **kwargs):
self._init_history_token_prices(kwargs["start_block"])
self._init_token_dex_prices_batch(kwargs["start_block"], kwargs["end_block"])
transfers = self._data_buff[ERC20TokenTransfer.type()]
token_balance = {}
for balance in self._data_buff[TokenBalance.type()]:
token_balance[f"{balance.token_address}_{balance.address}_{balance.block_number}"] = balance.balance

swaps = self._data_buff[UniswapV2SwapEvent.type()] + self._data_buff[UniswapV3SwapEvent.type()]
swap_txs = {swap.transaction_hash: swap for swap in swaps}
Expand All @@ -48,8 +53,28 @@ def _process(self, **kwargs):
):
is_swap = True

decimals = 0
token = self.tokens.get(transfer.token_address)
if token:
decimals = token["decimals"]

price = self._get_token_dex_price(transfer.token_address, transfer.block_number)
to_export.append(ERC20TokenTransferWithPriceD(**asdict(transfer), price=price, is_swap=is_swap))
from_address_balance = token_balance.get(
f"{transfer.token_address}_{transfer.from_address}_{transfer.block_number}", 0
)
to_address_balance = token_balance.get(
f"{transfer.token_address}_{transfer.to_address}_{transfer.block_number}", 0
)
to_export.append(
ERC20TokenTransferWithPriceD(
**asdict(transfer),
price=price,
is_swap=is_swap,
from_address_balance=from_address_balance,
to_address_balance=to_address_balance,
decimals=decimals,
)
)
self._collect_items(ERC20TokenTransferWithPriceD.type(), to_export)
self._update_history_token_prices()

Expand Down
14 changes: 10 additions & 4 deletions hemera_udf/token_holder_metrics/models/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TokenHolderMetricsCurrent(HemeraModel):

last_transfer_timestamp = Column(TIMESTAMP)
last_swap_timestamp = Column(TIMESTAMP)

last_price = Column(NUMERIC)
success_sell_count = Column(BIGINT)
fail_sell_count = Column(BIGINT)

Expand All @@ -50,6 +50,7 @@ class TokenHolderMetricsCurrent(HemeraModel):
realized_pnl = Column(NUMERIC)
sell_pnl = Column(NUMERIC)
win_rate = Column(NUMERIC)
pnl_valid = Column(BOOLEAN)

first_block_timestamp = Column(TIMESTAMP)

Expand Down Expand Up @@ -102,6 +103,7 @@ class TokenHolderMetricsHistory(HemeraModel):

last_transfer_timestamp = Column(TIMESTAMP)
last_swap_timestamp = Column(TIMESTAMP)
last_price = Column(NUMERIC)

success_sell_count = Column(BIGINT)
fail_sell_count = Column(BIGINT)
Expand All @@ -111,6 +113,7 @@ class TokenHolderMetricsHistory(HemeraModel):
realized_pnl = Column(NUMERIC)
sell_pnl = Column(NUMERIC)
win_rate = Column(NUMERIC)
pnl_valid = Column(BOOLEAN)

first_block_timestamp = Column(TIMESTAMP)

Expand Down Expand Up @@ -141,25 +144,28 @@ class ERC20TokenTransfersWithPrice(HemeraModel):
token_address = Column(BYTEA)
value = Column(NUMERIC(100))
price = Column(NUMERIC)
decimals = Column(NUMERIC(100))
is_swap = Column(BOOLEAN)
from_address_balance = Column(NUMERIC(100))
to_address_balance = Column(NUMERIC(100))

block_number = Column(BIGINT)
block_hash = Column(BYTEA, primary_key=True)
block_timestamp = Column(TIMESTAMP)
block_timestamp = Column(TIMESTAMP, primary_key=True)

create_time = Column(TIMESTAMP, server_default=func.now())
update_time = Column(TIMESTAMP, server_default=func.now())
reorg = Column(BOOLEAN, server_default=text("false"))

__table_args__ = (PrimaryKeyConstraint("transaction_hash", "block_hash", "log_index"),)
__table_args__ = (PrimaryKeyConstraint("transaction_hash", "block_hash", "log_index", "block_timestamp"),)
__query_order__ = [block_number, log_index]

@staticmethod
def model_domain_mapping():
return [
{
"domain": ERC20TokenTransferWithPriceD,
"conflict_do_update": False,
"conflict_do_update": True,
"update_strategy": None,
"converter": general_converter,
}
Expand Down