Skip to content

feat: added google pubsub example #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions examples/gcp_pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
## INTRODUCTION
This example demonstrates an implementation of SolVanityCL with google pub/sub. To run this project you will need to create a gcp project and enable google pub/sub on that. Google Pub/Sub has a very generous free tier plan which you can check [here](https://cloud.google.com/pubsub/pricing).

## SETTING UP GCP PUB/SUB TOPICS AND SUBSCRIPTIONS
1. Create topics - There are three topics that we need to create, listed below:
- **generate**: To send the generate events
- **on_generated**: To send the generated key response
- **on_error**: To send the error events

2. Create subscription - There are three subscriptions we need to create, listed below:
- **generate-sub**: To subscribe and listen to generate topic
- **on_generated-sub**: To subscribe and listen to responses generated by the generate-sub subscription on generate topic
- **on_error-sub**: To subscribe and listen to errors generated by the generate-sub subscription on generate topic.

## UPDATING CREDENTIALS AND PROJECT ID
1. **CREDENTIALS**: Once all the topics and subscriptions are set-up, we need to provision a service account so that we can authenticate with google pub/sub. After creating the service account, we need to create a JSON key and paste it in the `examples/gcp_pubsub/credentials.json` file
2. **PROJECT_ID**: Update the `PROJECT_ID` variable in both `main.py` and `publisher.py` file

## INSTALLATION
1. Create a virtual environment and activate it
```
python3 -m venv venv
source venv/bin/activate
```

2. Install the dependencies
```
pip install -r requirements.txt
```

## RUN THE PROJECT
1. Run the generator
```
python3 main.py
```

2. Publish a generate event
```
python3 publisher.py
```

182 changes: 182 additions & 0 deletions examples/gcp_pubsub/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import json
import logging
from google.cloud import pubsub_v1
from google.auth import jwt
import asyncio
import multiprocessing
from multiprocessing.pool import Pool
import numpy as np
from typing import List
from base58 import b58decode, b58encode
from nacl.signing import SigningKey

import sys
sys.path.append('../../')
from core.searcher import multi_gpu_init, save_result
from core.utils.helpers import check_character, load_kernel_source
from core.config import HostSetting
from core.opencl.manager import (
get_all_gpu_devices,
get_chosen_devices,
)

logging.basicConfig(level="INFO", format="[%(levelname)s %(asctime)s] %(message)s")

PROJECT_ID="<YOUR_PROJECT_ID>"
CREDENTIALS = json.load(open("./credentials.json"))

# subscriber authentication
audience = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
credentials = jwt.Credentials.from_service_account_info(CREDENTIALS, audience=audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)

# publisher authentication
publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_pub = credentials.with_claims(audience=publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials_pub)


generate_topic = 'projects/{project_id}/topics/{topic}'.format(
project_id=PROJECT_ID,
topic='on_generated',
)

error_topic = 'projects/{project_id}/topics/{topic}'.format(
project_id=PROJECT_ID,
topic='on_error',
)

topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=PROJECT_ID,
topic='generate',
)

subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=PROJECT_ID,
sub='generate-sub',
)

def get_result(outputs: List):
private_key, pub_key = "", ""
result_count = 0
for output in outputs:
if not output[0]:
continue
result_count += 1
pv_bytes = bytes(output[1:])
pv = SigningKey(pv_bytes)
pb_bytes = bytes(pv.verify_key)
pub_key = b58encode(pb_bytes).decode()
decoded_private_key = b58encode(pv_bytes).decode()
private_hex_string = ''.join(format(x, '02x') for x in list(pv_bytes + pb_bytes))
private_key = b58encode(bytes.fromhex(private_hex_string)).decode()

logging.info(f"Found pub key: {pub_key}")
logging.info(f"Private key: {private_key}")
return private_key, pub_key


def generate_address(prefix, suffix, jobId, is_case_sensitive):
starts_with = prefix
ends_with = suffix
count = 1
select_device = False
iteration_bits = 24
if not starts_with and not ends_with:
click.echo("Please provide at least one of --starts-with or --ends-with.")
check_character("starts_with", starts_with)
check_character("ends_with", ends_with)

chosen_devices: Optional[Tuple[int, List[int]]] = None
if select_device:
chosen_devices = get_chosen_devices()
gpu_counts = len(chosen_devices[1])
else:
gpu_counts = len(get_all_gpu_devices())

logging.info(
f"Searching Solana pubkey with starts_with='{starts_with}', ends_with='{ends_with}', case_sensitive={'on' if is_case_sensitive else 'off'}"
)
logging.info(f"Using {gpu_counts} OpenCL device(s)")

result_count = 0
results = []

with multiprocessing.Manager() as manager:
with Pool(processes=gpu_counts) as pool:
kernel_source = load_kernel_source(
starts_with, ends_with, is_case_sensitive
)
lock = manager.Lock()
while result_count < count:
stop_flag = manager.Value("i", 0)
partial_results = pool.starmap(
multi_gpu_init,
[
(
x,
HostSetting(kernel_source, iteration_bits),
gpu_counts,
stop_flag,
lock,
chosen_devices,
)
for x in range(gpu_counts)
],
)
result_count += 1
for partial_result in partial_results:
if isinstance(partial_result, dict) and 'error' in partial_result:
logging.error(f"Error in worker process: {partial_result['error']}")
continue
if isinstance(partial_result, np.ndarray) and partial_result.size > 0 and partial_result[0]:
results.append(partial_result)
elif isinstance(partial_result, list) and len(partial_result) > 0 and partial_result[0]:
results.append(partial_result)



logging.info("Generation finished, making private and public key")
privateKey, pubKey = get_result(results)
logging.info("Pub/secret key made, emitting result")
json_result = json.dumps({"privateKey": privateKey, "pubKey": pubKey, "jobId": jobId})

future = publisher.publish(generate_topic, bytes(json_result, "utf-8"))
future.result()
logging.info("Result emitted")
return 'Generated'


def generate(message):
data = message.data.decode("utf-8")
body = json.loads(data)
prefix = body["prefix"]
suffix = body["suffix"]
jobId = body["jobId"]
isCaseSensitive = body["isCaseSensitive"]
message.ack()
try:
logging.info("Generation started")
generate_address(prefix, suffix, jobId, bool(isCaseSensitive == 'True'))
except Exception as e:
print(e)
logging.info("Error while generation, sending error event")
json_result = json.dumps({"error": str(e), "jobId":jobId})
future = publisher.publish(error_topic, bytes(json_result, "utf-8"))
future.result()

future = subscriber.subscribe(subscription_name, generate)


def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = subscriber.subscribe(subscription_name, generate)
logging.info("Listening for generate event, generator is live")
loop.run_forever()

if __name__ == "__main__":
# important because we are using multiprocessing and pyopencl context runs in isolation
# According to the Python documentation, spawn is the default on Windows and macOS. So sub/pub works fine on macOS but not on linux.
multiprocessing.set_start_method("spawn")
main()
66 changes: 66 additions & 0 deletions examples/gcp_pubsub/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
from google.cloud import pubsub_v1
import json
from google.auth import jwt

PROJECT_ID="<YOUR_PROJECT_ID>"
CREDENTIALS = json.load(open("./credentials.json"))

audience = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
credentials = jwt.Credentials.from_service_account_info(CREDENTIALS, audience=audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)

publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_pub = credentials.with_claims(audience=publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials_pub)

TOPIC="generate"

topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=PROJECT_ID,
topic=TOPIC
)

on_generated_subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=PROJECT_ID,
sub='on_generated-sub',
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sub name should be on_generate-sub, not generated @mechaadi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be on_generated-sub, because the publisher is waiting for the on-generated event @WincerChan

)

on_error_subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=PROJECT_ID,
sub='on_error-sub',
)

payload = {
'prefix': 'so',
'suffix': '',
'jobId': '1234',
'isCaseSensitive': 'False'
}

encoded_payload = json.dumps(payload).encode('utf-8')

publisher.publish(topic_name, encoded_payload)

print(f"Published {encoded_payload} to {topic_name}")

def on_generated(message):
data = message.data.decode("utf-8")
print(f"Received message: {data}")
message.ack()

def on_error(message):
data = message.data.decode("utf-8")
print(f"Received error: {data}")
message.ack()

on_generated_future = subscriber.subscribe(on_generated_subscription_name, on_generated)
on_error_future = subscriber.subscribe(on_error_subscription_name, on_error)


try:
on_generated_future.result()
on_error_future.result()
except KeyboardInterrupt:
on_generated_future.cancel()
on_error_future.result()
8 changes: 8 additions & 0 deletions examples/gcp_pubsub/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
click>=8.1.0
pyopencl
base58
PyNaCl
numpy==1.26.4
google-cloud-pubsub
siphash24
asyncio