Skip to content
This repository was archived by the owner on Sep 18, 2024. It is now read-only.

feat: add a ping and model manager to allow the validator to grab the closest miner #73

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
55 changes: 25 additions & 30 deletions packages/service-discovery/api/miner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type MinerService = {
address: string;
};

// this is the data format when we are not running the edge network alongside the subnet.
type MinerServiceApiOnly = {
uid: string;
type: string;
Expand All @@ -40,18 +39,14 @@ export async function POST(request: Request) {
? Boolean(params.get("api-only"))
: false;

// if we run with API only mode, we will not register bittensor specific properties in the data model
if (apiOnly) {
const miner = (await request.json()) as MinerServiceApiOnly;
const pipe = redis
.pipeline()
.hset(`apionly:miner:${String(miner.uid)}`, miner)
.sadd(`apionly:miners:${miner.type}`, miner.uid)
.set(`apionly:address:${miner.uid}`, miner.address)
.set(`apionly:miner:uid:${miner.uid}:address`, miner.address);
const pipe = redis.pipeline().hset(`apionly:miner:${miner.uid}`, miner);

pipe.sadd(`apionly:miners:type:${miner.type}`, miner.uid);

miner.models.forEach((modelName) => {
pipe.sadd(`apionly:${modelName}`, miner.uid);
pipe.sadd(`apionly:miners:model:${modelName}`, miner.uid);
});

await pipe.exec();
Expand All @@ -61,15 +56,12 @@ export async function POST(request: Request) {

const miner = (await request.json()) as MinerService;

const pipe = redis
.pipeline()
.hset(`miner:${String(miner.netuid)}`, miner)
.sadd(`miners:${miner.type}`, miner.netuid)
.set(`address:${miner.netuid}`, miner.address)
.set(`miner:nuid:${miner.netuid}:address`, miner.address);
const pipe = redis.pipeline().hset(`miner:${miner.netuid}`, miner);

pipe.sadd(`miners:type:${miner.type}`, miner.netuid);

miner.models.forEach((modelName) => {
pipe.sadd(modelName, miner.netuid);
pipe.sadd(`miners:model:${modelName}`, miner.netuid);
});

await pipe.exec();
Expand All @@ -94,21 +86,24 @@ export async function GET(request: Request) {

const model = params.get("model");

if (!model) {
return new Response("Error: model is missing in search params", {
status: 400,
});
}
let minersUidForModel: string[] = [];

const minersUidForModel = apiOnly
? await redis.smembers(`apionly:${model}`)
: await redis.smembers(model);
if (model) {
minersUidForModel = apiOnly
? await redis.smembers(`apionly:miners:model:${model}`)
: await redis.smembers(`miners:model:${model}`);

// If the model set does not exist, return an error response
if (minersUidForModel.length === 0) {
return new Response(`Error: no miners found for model ${model}`, {
status: 404,
});
if (minersUidForModel.length === 0) {
return new Response(`Error: no miners found for model ${model}`, {
status: 404,
});
}
} else {
const minerKeys = apiOnly
? await redis.keys("apionly:miner:*")
: await redis.keys("miner:*");

minersUidForModel = minerKeys.map((key) => key.split(":")[apiOnly ? 2 : 1]);
}

const pipe = redis.pipeline();
Expand All @@ -119,7 +114,7 @@ export async function GET(request: Request) {

const miners = await pipe.exec();

return new Response(JSON.stringify(miners), {
return new Response(JSON.stringify(miners.filter(Boolean)), {
headers: { "Content-Type": "application/json" },
});
}
1 change: 1 addition & 0 deletions packages/service-discovery/package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"name": "service-discovery",
"description": "This is an internal API mapping what services are available at which address in the network.",
"module": "NodeNext",
"dependencies": {
"@upstash/redis": "^1.29.0"
Expand Down
2 changes: 1 addition & 1 deletion subnet/miner-cloudflare/stream_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def __init__(self, config=None, axon=None, wallet=None, subtensor=None):
# send to the service map
post(f'{url}/api/miner',
data=json.dumps(service_map_dict), headers=headers)

bt.logging.info(f"Running miner on uid: {self.my_subnet_uid}")

else:
self.uuid = os.getenv('UUID') or uuid.uuid4()
url = os.getenv('SERVICE_MESH_URL')
Expand Down
32 changes: 26 additions & 6 deletions subnet/validator/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import argparse
import asyncio
import os
import bittensor as bt
import torch
from miner_manager import MinerManager
from validator import BaseValidatorNeuron
from fastapi import FastAPI, Request
import aiohttp
Expand All @@ -10,6 +13,18 @@
from urllib.parse import urljoin, urlencode
from dotenv import load_dotenv

load_dotenv()


api_only = os.getenv('API_ONLY')
miner_manager = MinerManager(api_only=api_only == 'True')


async def run_miner_manager():
while True:
await miner_manager.run()
await asyncio.sleep(10)


class Miner(TypedDict):
address: str
Expand All @@ -23,7 +38,6 @@ class Validator(BaseValidatorNeuron):
def __init__(self, config=None):
super(Validator, self).__init__(config=config)
bt.logging.info("load_state()")
load_dotenv()
self.load_state()

async def get_miner_with_model(self, model_name) -> Union[Miner, dict]:
Expand Down Expand Up @@ -68,15 +82,17 @@ async def get_miner_with_model(self, model_name) -> Union[Miner, dict]:
@app.post("/chat")
async def chat(request: Request):
data = await request.json()

model = data['model']
miner = await validator.get_miner_with_model(model_name=model)
miner_uid = miner['netuid']
test = miner_manager.get_miners()
print(test)
miner = miner_manager.get_fastest_miner_for_model(model=model)
print(miner)
miner_id = miner["id"]
prompt_len = calculate_total_message_length(data)

async with aiohttp.ClientSession() as session:
url = miner['address']
async with session.post(url, json=data) as resp:
async with session.post(f'{url}/chat', json=data) as resp:
response = await resp.json()
completion_len = len(response[-1])

Expand All @@ -85,11 +101,15 @@ async def chat(request: Request):
print(f'reward for prompt: {reward}')
if (validator.subtensor_connected):
validator.update_scores(
torch.FloatTensor([reward]), [int(miner_uid)])
torch.FloatTensor([reward]), [int(miner_id)])

return response


@app.on_event("startup")
async def startup_event():
asyncio.create_task(run_miner_manager())

# The main function parses the configuration and runs the validator.
if __name__ == "__main__":
import uvicorn
Expand Down
121 changes: 121 additions & 0 deletions subnet/validator/miner_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
from urllib.parse import urlencode
import aiohttp
import asyncio
import time
from dotenv import load_dotenv


class MinerManager:
def __init__(self, api_only):
load_dotenv()
self.miners = {}
self.api_only = api_only
self.bearer_token = os.getenv('SECRET_KEY')

async def run(self):
await self._discover_and_ping_miners()

async def _discover_and_ping_miners(self):
while True:
try:
async with aiohttp.ClientSession() as session:
url = os.getenv('SERVICE_MESH_URL')
headers = {"Authorization": f"Bearer {self.bearer_token}"}
params = {'api-only': 'true' if self.api_only else 'false'}
async with session.get(f"{url}/api/miner?{urlencode(params)}", headers=headers) as response:
if response.status == 200:
miners_data = await response.json()
for miner_data in miners_data:
miner_id = miner_data["id"] if "id" in miner_data else miner_data["netuid"]
miner_address = miner_data["address"]
# Assuming the model is optional
miner_models = miner_data.get("models")
if miner_id not in self.miners:
self.miners[miner_id] = {
"address": miner_address,
"models": miner_models,
"id": miner_id,
"last_ping": None,
"ping_time": None
}

# Ping all the miners
ping_tasks = []
for miner_id in list(self.miners.keys()):
ping_task = asyncio.create_task(
self._ping_miner(session, miner_id))
ping_tasks.append(ping_task)
self._update_model_miners()

# Wait for all the ping tasks to complete
await asyncio.gather(*ping_tasks)

else:
print(
f"Error discovering miners: {response.status}")

# Sleep for 10 minutes before the next discovery and ping iteration
await asyncio.sleep(600)

except aiohttp.ClientError as e:
# Handle any errors that occur during the request
print(f"Error discovering miners: {e}")
await asyncio.sleep(60) # Sleep for 1 minute before retrying

except Exception as e:
# Handle any other unexpected errors
print(f"Unexpected error: {e}")
await asyncio.sleep(60) # Sleep for 1 minute before retrying

async def _ping_miner(self, session, miner_id):
try:
start_time = time.time()
async with session.get(f"{self.miners[miner_id]['address']}/") as response:
if response.status == 200:
end_time = time.time()
ping_time = end_time - start_time
self.miners[miner_id]["last_ping"] = end_time
self.miners[miner_id]["ping_time"] = ping_time
else:
print(f"Error pinging miner {miner_id}: {response.status}")
# Remove the miner if the ping is unsuccessful
del self.miners[miner_id]

except aiohttp.ClientError as e:
# Handle any errors that occur during the ping request
print(f"Error pinging miner {miner_id}: {e}")
del self.miners[miner_id] # Remove the miner if the ping fails

except Exception as e:
# Handle any other unexpected errors
print(f"Unexpected error: {e}")
# Remove the miner if an unexpected error occurs
del self.miners[miner_id]

def _update_model_miners(self):
self.model_miners = {}
for miner_id, miner_data in self.miners.items():
models = miner_data.get("models")
ping_time = miner_data.get("ping_time")
if models:
for model in models:
if model not in self.model_miners:
self.model_miners[model] = []
self.model_miners[model].append({
"id": miner_id,
"ping_time": ping_time
})

def get_fastest_miner_for_model(self, model):
if model in self.model_miners:
miners = self.model_miners[model]
fastest_miner = min(miners, key=lambda x: x['ping_time'])
id = fastest_miner['id']
miner_details = self.miners[id]
return miner_details
else:
return None

def get_miners(self):
return self.miners
3 changes: 2 additions & 1 deletion subnet/validator/neuron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import os

import bittensor as bt

Expand Down Expand Up @@ -72,7 +73,7 @@ def __init__(self, config=None):
else:
self.wallet = bt.wallet(config=self.config)

if self.config.api_only is False:
if os.getenv('API_ONLY') is 'False':
self.instantiate_subtensor_and_metagraph()

if self.subtensor_connected:
Expand Down
3 changes: 0 additions & 3 deletions subnet/validator/service_mesh.py

This file was deleted.

7 changes: 0 additions & 7 deletions subnet/validator/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,6 @@ def add_args(cls, parser):

parser.add_argument("--netuid", type=int, help="Subnet netuid", default=1)

parser.add_argument(
"--api_only",
action="store_true",
help="Bypass connection to metagraph and subtensor and only starts the akeru API layer",
default=False,
)

parser.add_argument(
"--neuron.device",
type=str,
Expand Down
6 changes: 3 additions & 3 deletions subnet/validator/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from neuron import BaseNeuron
from mock import MockDendrite
from utils.config import add_validator_args
from miner_manager import MinerManager


class BaseValidatorNeuron(BaseNeuron):
Expand Down Expand Up @@ -39,9 +40,8 @@ def __init__(self, config=None):
# Init sync with the network. Updates the metagraph.
self.sync()

# Create asyncio event loop to manage async tasks.
self.loop = asyncio.get_event_loop()

# Create asyncio event loop to manage async tasks.
self.loop = asyncio.get_event_loop()
# Instantiate runners
self.should_exit: bool = False
self.is_running: bool = False
Expand Down
Loading