Skip to content
This repository was archived by the owner on Sep 18, 2024. It is now read-only.

Add a way to use the subnet entities in an API only mode. #59

Merged
merged 3 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/helper-scripts/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
WALLET_PASSWORD=
59 changes: 59 additions & 0 deletions packages/helper-scripts/faucet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import re
import pexpect
import os
import argparse
import logging
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Define the password
PASSWORD = os.getenv('WALLET_PASSWORD', '')

# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')

# Define the faucet function


def run_faucet(wallet_name):
logging.info(f"Running faucet for wallet: {wallet_name}")
# Define the command without the environment variable part
command = f"btcli wallet faucet --wallet.name {wallet_name} --subtensor.chain_endpoint ws://127.0.0.1:9946"
# Prepare the environment
env = os.environ.copy()
env["KMP_DUPLICATE_LIB_OK"] = "TRUE"
try:
# Start the command with the modified environment
child = pexpect.spawn(command, encoding='utf-8', timeout=300, env=env)
yn_prompt = re.compile(r'\[y/n\]')
child.expect(yn_prompt, timeout=120)
child.sendline('y')
# Wait for the password prompt and respond with the password
child.expect('Enter password to unlock key:', timeout=120)
child.sendline(PASSWORD)
# Wait for the process to complete. This waits until the child process exits.
child.expect(pexpect.EOF)
logging.info(f"Command executed successfully for {wallet_name}")
except pexpect.exceptions.TIMEOUT:
logging.error(
f"A timeout occurred while waiting for a prompt for {wallet_name}. The command might have taken too long or failed unexpectedly.")
except pexpect.exceptions.EOF:
logging.error(
f"Unexpected end of file for {wallet_name}. Check if the command executed correctly.")
# Close the Pexpect instance
child.close()


if __name__ == '__main__':
# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Run the faucet command for a wallet.')
parser.add_argument('--wallet', type=str, required=True,
help='The name of the wallet to run the faucet for.')
args = parser.parse_args()

# Run the faucet for the specified wallet
run_faucet(args.wallet)
7 changes: 7 additions & 0 deletions packages/miner-cloudflare/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def get_config() -> "bt.Config":
default=False,
)

parser.add_argument(
"--api_only",
action="store_true",
help="Bypass connection to metagraph and subtensor and only starts the akeru API layer",
default=False,
)

# Adds subtensor specific arguments i.e. --subtensor.chain_endpoint ... --subtensor.network ...
bt.subtensor.add_args(parser)

Expand Down
101 changes: 66 additions & 35 deletions packages/miner-cloudflare/stream_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import threading
import traceback
from abc import ABC, abstractmethod
from urllib.parse import urlencode, urljoin
import uuid


import bittensor as bt
from typing import Dict, Tuple
Expand All @@ -19,6 +22,10 @@


class StreamMiner(ABC):
@property
def subtensor_connected(self):
return hasattr(self, 'subtensor') and self.subtensor is not None

def __init__(self, config=None, axon=None, wallet=None, subtensor=None):
# load env variables
load_dotenv()
Expand All @@ -41,59 +48,83 @@ def __init__(self, config=None, axon=None, wallet=None, subtensor=None):

self.prompt_cache: Dict[str, Tuple[str, int]] = {}

# Activating Bittensor's logging with the set configurations.
bt.logging(config=self.config, logging_dir=self.config.full_path)
bt.logging.info("Setting up bittensor objects.")
if self.config.api_only != True:
# Activating Bittensor's logging with the set configurations.
bt.logging(config=self.config, logging_dir=self.config.full_path)
bt.logging.info("Setting up bittensor objects.")

# Wallet holds cryptographic information, ensuring secure transactions and communication.
self.wallet = wallet or bt.wallet(config=self.config)
bt.logging.info(f"Wallet {self.wallet}")
# Wallet holds cryptographic information, ensuring secure transactions and communication.
self.wallet = wallet or bt.wallet(config=self.config)
bt.logging.info(f"Wallet {self.wallet}")

# subtensor manages the blockchain connection, facilitating interaction with the Bittensor blockchain.
self.subtensor = subtensor or bt.subtensor(config=self.config)
bt.logging.info(f"Subtensor: {self.subtensor}")
bt.logging.info(
f"Running miner for subnet: {self.config.netuid} on network: {self.subtensor.chain_endpoint} with config:"
)
# subtensor manages the blockchain connection, facilitating interaction with the Bittensor blockchain.
self.subtensor = subtensor or bt.subtensor(config=self.config)
bt.logging.info(f"Subtensor: {self.subtensor}")
bt.logging.info(
f"Running miner for subnet: {self.config.netuid} on network: {self.subtensor.chain_endpoint} with config:"
)

# metagraph provides the network's current state, holding state about other participants in a subnet.
self.metagraph = self.subtensor.metagraph(self.config.netuid)
bt.logging.info(f"Metagraph: {self.metagraph}")
# metagraph provides the network's current state, holding state about other participants in a subnet.
self.metagraph = self.subtensor.metagraph(self.config.netuid)
bt.logging.info(f"Metagraph: {self.metagraph}")

if self.wallet.hotkey.ss58_address not in self.metagraph.hotkeys:
bt.logging.error(
f"\nYour validator: {self.wallet} if not registered to chain connection: {self.subtensor} \nRun btcli register and try again. "
)
exit()
else:
# Each miner gets a unique identity (UID) in the network for differentiation.
self.my_subnet_uid = self.metagraph.hotkeys.index(
self.wallet.hotkey.ss58_address
)
if self.wallet.hotkey.ss58_address not in self.metagraph.hotkeys:
bt.logging.error(
f"\nYour validator: {self.wallet} if not registered to chain connection: {self.subtensor} \nRun btcli register and try again. "
)
exit()
else:
# Each miner gets a unique identity (UID) in the network for differentiation.
self.my_subnet_uid = self.metagraph.hotkeys.index(
self.wallet.hotkey.ss58_address
)

# identify to the edge compute network service discovery

# TODO replace with hosted endpoint of service map
url = os.getenv('SERVICE_MESH_URL')
secret = os.getenv('SECRET_KEY')
# for now miners are allow listed manually and given a secret key to identify
headers = {'Content-Type': 'application/json',
'Authorization': f'Bearer {secret}'}

# identify to the edge compute network service discovery
service_map_dict = {
# must map the netuid for validation by validators later
"netuid": self.my_subnet_uid,
"hotkey": self.wallet.hotkey.ss58_address,
**self.miner_services
}

# TODO replace with hosted endpoint of service map
# send to the service map
post(f'{url}/api/miner',
data=json.dumps(service_map_dict), headers=headers)

bt.logging.info(f"Running miner on uid: {self.my_subnet_uid}")
else:
self.uuid = os.getenv('UUID') or uuid.uuid4()
url = os.getenv('SERVICE_MESH_URL')
secret = os.getenv('SECRET_KEY')
# for now miners are allow listed manually and given a secret key to identify
headers = {'Content-Type': 'application/json',
'Authorization': f'Bearer {secret}'}

print(self.wallet.hotkey.ss58_address)

service_map_dict = {
# must map the netuid for validation by validators later
"netuid": self.my_subnet_uid,
"hotkey": self.wallet.hotkey.ss58_address,
"uid": str(self.uuid),
**self.miner_services
}

# send to the service map
post(f'{url}/api/miner',
data=json.dumps(service_map_dict), headers=headers)
# Base URL
base_url = urljoin(url, '/api/miner')

bt.logging.info(f"Running miner on uid: {self.my_subnet_uid}")
# Query parameters
params = {'api-only': 'true'}

# Construct the full URL with query parameters
full_url = f"{base_url}?{urlencode(params)}"
data = json.dumps(service_map_dict)
# send to the service map
post(full_url, data=data, headers=headers)

# Instantiate runners
self.should_exit: bool = False
Expand Down
51 changes: 49 additions & 2 deletions packages/service-discovery/api/miner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,48 @@ type MinerService = {
address: string;
};

// this is the data format when we are not running the edge network alongside the subnet.
type MinerServiceApiOnly = {
uid: string;
type: string;
models: string[];
address: string;
};

export async function POST(request: Request) {
const authHeader = request.headers.get("Authorization");
const expectedAuthHeader = `Bearer ${process.env.SECRET_KEY}`;

const url = new URL(request.url);
const params = new URLSearchParams(url.search);

if (!authHeader || authHeader !== expectedAuthHeader) {
return new Response("Unauthorized", { status: 401 });
}

const apiOnly = params.has("api-only")
? Boolean(params.get("api-only"))
: false;

// if we run with API only mode, we will not register bittensor specific properties in the data model
if (apiOnly) {
const miner = (await request.json()) as MinerServiceApiOnly;
const pipe = redis
.pipeline()
.hset(`apionly:miner:${String(miner.uid)}`, miner)
.sadd(`apionly:miners:${miner.type}`, miner.uid)
.set(`apionly:address:${miner.uid}`, miner.address)
.set(`apionly:miner:uid:${miner.uid}:address`, miner.address);

miner.models.forEach((modelName) => {
pipe.sadd(`apionly:${modelName}`, miner.uid);
});

await pipe.exec();

return new Response(`ok`);
}

const miner = (await request.json()) as MinerService;

const pipe = redis
Expand All @@ -47,6 +81,17 @@ export async function GET(request: Request) {
const url = new URL(request.url);
const params = new URLSearchParams(url.search);

const authHeader = request.headers.get("Authorization");
const expectedAuthHeader = `Bearer ${process.env.SECRET_KEY}`;

if (!authHeader || authHeader !== expectedAuthHeader) {
return new Response("Unauthorized", { status: 401 });
}

const apiOnly = params.has("api-only")
? Boolean(params.get("api-only"))
: false;

const model = params.get("model");

if (!model) {
Expand All @@ -55,7 +100,9 @@ export async function GET(request: Request) {
});
}

const minersUidForModel = await redis.smembers(model);
const minersUidForModel = apiOnly
? await redis.smembers(`apionly:${model}`)
: await redis.smembers(model);

// If the model set does not exist, return an error response
if (minersUidForModel.length === 0) {
Expand All @@ -67,7 +114,7 @@ export async function GET(request: Request) {
const pipe = redis.pipeline();

minersUidForModel.forEach((uid) => {
pipe.hgetall(`miner:${uid}`);
pipe.hgetall(apiOnly ? `apionly:miner:${uid}` : `miner:${uid}`);
});

const miners = await pipe.exec();
Expand Down
Loading
Loading