Skip to content

Commit 90b9da0

Browse files
authored
Feature: Pricing Services (#226)
* feat: pricing services client * feat: init of pricing services * fix: utils func to calculate compute unit as pyaleph do * feat: GPU utils func to get currents state of gpu on network * fix: mypy issue * feat: pricing aggregate.json for mocked * Feature: new DictLikeModel for CRNS * feature: new utils extract_valid_eth_address * Refactor: CrnList to use DictLikeModel instead of dit and apply logic for GPU / Fetch crn on it * Fix: linting issue * Feat: new method for pricing aggregate
1 parent 7bbeddd commit 90b9da0

File tree

8 files changed

+966
-9
lines changed

8 files changed

+966
-9
lines changed

src/aleph/sdk/client/http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from aleph.sdk.client.services.dns import DNS
3838
from aleph.sdk.client.services.instance import Instance
3939
from aleph.sdk.client.services.port_forwarder import PortForwarder
40+
from aleph.sdk.client.services.pricing import Pricing
4041
from aleph.sdk.client.services.scheduler import Scheduler
4142

4243
from ..conf import settings
@@ -135,7 +136,7 @@ async def __aenter__(self):
135136
self.crn = Crn(self)
136137
self.scheduler = Scheduler(self)
137138
self.instance = Instance(self)
138-
139+
self.pricing = Pricing(self)
139140
return self
140141

141142
async def __aexit__(self, exc_type, exc_val, exc_tb):

src/aleph/sdk/client/services/crn.py

Lines changed: 182 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,187 @@
1-
from typing import TYPE_CHECKING, Dict, Optional, Union
1+
from typing import TYPE_CHECKING, Dict, List, Optional, Union
22

33
import aiohttp
44
from aiohttp.client_exceptions import ClientResponseError
55
from aleph_message.models import ItemHash
6+
from pydantic import BaseModel
67

78
from aleph.sdk.conf import settings
89
from aleph.sdk.exceptions import MethodNotAvailableOnCRN, VmNotFoundOnHost
9-
from aleph.sdk.types import CrnExecutionV1, CrnExecutionV2, CrnV1List, CrnV2List
10-
from aleph.sdk.utils import sanitize_url
10+
from aleph.sdk.types import (
11+
CrnExecutionV1,
12+
CrnExecutionV2,
13+
CrnV1List,
14+
CrnV2List,
15+
DictLikeModel,
16+
)
17+
from aleph.sdk.utils import extract_valid_eth_address, sanitize_url
1118

1219
if TYPE_CHECKING:
1320
from aleph.sdk.client.http import AlephHttpClient
1421

1522

23+
class GPU(BaseModel):
24+
vendor: str
25+
model: str
26+
device_name: str
27+
device_class: str
28+
pci_host: str
29+
compatible: bool
30+
31+
32+
class NetworkGPUS(BaseModel):
33+
total_gpu_count: int
34+
available_gpu_count: int
35+
available_gpu_list: dict[str, List[GPU]] # str = node_url
36+
used_gpu_list: dict[str, List[GPU]] # str = node_url
37+
38+
39+
class CRN(DictLikeModel):
40+
# This Model work as dict but where we can type what we need / apply logic on top
41+
42+
# Simplify search
43+
hash: str
44+
name: str
45+
address: str
46+
47+
gpu_support: Optional[bool] = False
48+
confidential_support: Optional[bool] = False
49+
qemu_support: Optional[bool] = False
50+
51+
version: Optional[str] = "0.0.0"
52+
payment_receiver_address: Optional[str] # Can be None if not configured
53+
54+
55+
class CrnList(DictLikeModel):
56+
crns: list[CRN] = []
57+
58+
@classmethod
59+
def from_api(cls, payload: dict) -> "CrnList":
60+
raw_list = payload.get("crns", [])
61+
crn_list = [
62+
CRN.model_validate(item) if not isinstance(item, CRN) else item
63+
for item in raw_list
64+
]
65+
return cls(crns=crn_list)
66+
67+
def find_gpu_on_network(self):
68+
gpu_count: int = 0
69+
available_gpu_count: int = 0
70+
71+
compatible_gpu: Dict[str, List[GPU]] = {}
72+
available_compatible_gpu: Dict[str, List[GPU]] = {}
73+
74+
for crn_ in self.crns:
75+
if not crn_.gpu_support:
76+
continue
77+
78+
# Extracts used GPU
79+
for gpu in crn_.get("compatible_gpus", []):
80+
compatible_gpu[crn_.address] = []
81+
compatible_gpu[crn_.address].append(GPU.model_validate(gpu))
82+
gpu_count += 1
83+
84+
# Extracts available GPU
85+
for gpu in crn_.get("compatible_available_gpus", []):
86+
available_compatible_gpu[crn_.address] = []
87+
available_compatible_gpu[crn_.address].append(GPU.model_validate(gpu))
88+
gpu_count += 1
89+
available_gpu_count += 1
90+
91+
return NetworkGPUS(
92+
total_gpu_count=gpu_count,
93+
available_gpu_count=available_gpu_count,
94+
used_gpu_list=compatible_gpu,
95+
available_gpu_list=available_compatible_gpu,
96+
)
97+
98+
def filter_crn(
99+
self,
100+
latest_crn_version: bool = False,
101+
ipv6: bool = False,
102+
stream_address: bool = False,
103+
confidential: bool = False,
104+
gpu: bool = False,
105+
) -> list[CRN]:
106+
"""Filter compute resource node list, unfiltered by default.
107+
Args:
108+
latest_crn_version (bool): Filter by latest crn version.
109+
ipv6 (bool): Filter invalid IPv6 configuration.
110+
stream_address (bool): Filter invalid payment receiver address.
111+
confidential (bool): Filter by confidential computing support.
112+
gpu (bool): Filter by GPU support.
113+
Returns:
114+
list[CRN]: List of compute resource nodes. (if no filter applied, return all)
115+
"""
116+
# current_crn_version = await fetch_latest_crn_version()
117+
# Relax current filter to allow use aleph-vm versions since 1.5.1.
118+
# TODO: Allow to specify that option on settings aggregate on maybe on GitHub
119+
current_crn_version = "1.5.1"
120+
121+
filtered_crn: list[CRN] = []
122+
for crn_ in self.crns:
123+
# Check crn version
124+
if latest_crn_version and (crn_.version or "0.0.0") < current_crn_version:
125+
continue
126+
127+
# Filter with ipv6 check
128+
if ipv6:
129+
ipv6_check = crn_.get("ipv6_check")
130+
if not ipv6_check or not all(ipv6_check.values()):
131+
continue
132+
133+
if stream_address and not extract_valid_eth_address(
134+
crn_.payment_receiver_address or ""
135+
):
136+
continue
137+
138+
# Confidential Filter
139+
if confidential and not crn_.confidential_support:
140+
continue
141+
142+
# Filter with GPU / Available GPU
143+
available_gpu = crn_.get("compatible_available_gpus")
144+
if gpu and (not crn_.gpu_support or not available_gpu):
145+
continue
146+
147+
filtered_crn.append(crn_)
148+
return filtered_crn
149+
150+
# Find CRN by address
151+
def find_crn_by_address(self, address: str) -> Optional[CRN]:
152+
for crn_ in self.crns:
153+
if crn_.address == sanitize_url(address):
154+
return crn_
155+
return None
156+
157+
# Find CRN by hash
158+
def find_crn_by_hash(self, crn_hash: str) -> Optional[CRN]:
159+
for crn_ in self.crns:
160+
if crn_.hash == crn_hash:
161+
return crn_
162+
return None
163+
164+
def find_crn(
165+
self,
166+
address: Optional[str] = None,
167+
crn_hash: Optional[str] = None,
168+
) -> Optional[CRN]:
169+
"""Find CRN by address or hash (both optional, address priority)
170+
171+
Args:
172+
address (Optional[str], optional): url of the node. Defaults to None.
173+
crn_hash (Optional[str], optional): hash of the nodes. Defaults to None.
174+
175+
Returns:
176+
Optional[CRN]: CRN object or None if not found
177+
"""
178+
if address:
179+
return self.find_crn_by_address(address)
180+
if crn_hash:
181+
return self.find_crn_by_hash(crn_hash)
182+
return None
183+
184+
16185
class Crn:
17186
"""
18187
This services allow interact with CRNS API
@@ -45,7 +214,7 @@ async def get_last_crn_version(self):
45214
data = await resp.json()
46215
return data.get("tag_name")
47216

48-
async def get_crns_list(self, only_active: bool = True) -> dict:
217+
async def get_crns_list(self, only_active: bool = True) -> CrnList:
49218
"""
50219
Query a persistent VM running on aleph.im to retrieve list of CRNs:
51220
https://crns-list.aleph.sh/crns.json
@@ -72,7 +241,7 @@ async def get_crns_list(self, only_active: bool = True) -> dict:
72241
sanitize_url(settings.CRN_LIST_URL), params=params
73242
) as resp:
74243
resp.raise_for_status()
75-
return await resp.json()
244+
return CrnList.from_api(await resp.json())
76245

77246
async def get_active_vms_v2(self, crn_address: str) -> CrnV2List:
78247
endpoint = "/v2/about/executions/list"
@@ -136,3 +305,11 @@ async def update_instance_config(self, crn_address: str, item_hash: ItemHash):
136305
async with session.post(full_url) as resp:
137306
resp.raise_for_status()
138307
return await resp.json()
308+
309+
# Gpu Functions Helper
310+
async def fetch_gpu_on_network(
311+
self,
312+
only_active: bool = True,
313+
) -> NetworkGPUS:
314+
crn_list = await self.get_crns_list(only_active)
315+
return crn_list.find_gpu_on_network()

0 commit comments

Comments
 (0)