diff --git a/src/main.cu b/src/main.cu index 552346f..9bbc6d4 100644 --- a/src/main.cu +++ b/src/main.cu @@ -18,6 +18,18 @@ #include "getopt.h" #include "log.h" +#define MAX_MESSAGE_SIZE (2048 * 1024) +#define BUFFER_SIZE (MAX_MESSAGE_SIZE * chain_nums) + +typedef struct { + uv_mutex_t lock; + uint8_t buffer[BUFFER_SIZE]; + ssize_t length; +} message_buffer_t; + +message_buffer_t message_buffer; +uv_async_t message_handler; + std::atomic found_solutions{0}; typedef std::chrono::high_resolution_clock Time; @@ -56,17 +68,25 @@ void on_write_end(uv_write_t *req, int status) free(req); } +void print_block(job_t *job, uint8_t *hash) +{ + char *hex_string = bytes_to_hex(hash, 32); + LOG("new block: %d -> %d, height: %d, hash: %s\n", job->from_group, job->to_group, job->height, hex_string); + free(hex_string); +} + std::mutex write_mutex; uint8_t write_buffer[4096 * 1024]; void submit_new_block(mining_worker_t *worker) { - expire_template_for_new_block(load_worker__template(worker)); + mining_template_t *template_ptr = load_worker__template(worker); + print_block(template_ptr->job, (uint8_t *) hasher_hash(worker, true)); + expire_template_for_new_block(template_ptr); const std::lock_guard lock(write_mutex); ssize_t buf_size = write_new_block(worker, write_buffer); uv_buf_t buf = uv_buf_init((char *)write_buffer, buf_size); - print_hex("new solution", (uint8_t *) hasher_buf(worker, true), 32); uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t)); uint32_t buf_count = 1; @@ -199,46 +219,10 @@ void log_hashrate(uv_timer_t *timer) } } -uint8_t read_buf[2048 * 1024 * chain_nums]; -blob_t read_blob = {read_buf, 0}; -server_message_t *decode_buf(const uv_buf_t *buf, ssize_t nread) -{ - if (read_blob.len == 0) - { - read_blob.blob = (uint8_t *)buf->base; - read_blob.len = nread; - server_message_t *message = decode_server_message(&read_blob); - if (message) - { - // some bytes left - if (read_blob.len > 0) - { - memcpy(read_buf, read_blob.blob, read_blob.len); - read_blob.blob = read_buf; - } - return message; - } - else - { // no bytes consumed - memcpy(read_buf, buf->base, nread); - read_blob.blob = read_buf; - read_blob.len = nread; - return NULL; - } - } - else - { - assert(read_blob.blob == read_buf); - memcpy(read_buf + read_blob.len, buf->base, nread); - read_blob.len += nread; - return decode_server_message(&read_blob); - } -} - void connect_to_broker(); void try_to_reconnect(uv_timer_t *timer){ - read_blob.len = 0; + message_buffer.length = 0; free(uv_socket); free(uv_connect); connect_to_broker(); @@ -259,36 +243,101 @@ void on_read(uv_stream_t *server, ssize_t nread, const uv_buf_t *buf) return; } - server_message_t *message = decode_buf(buf, nread); - if (message) + uv_mutex_lock(&message_buffer.lock); + assert(message_buffer.length + nread <= BUFFER_SIZE); + memcpy(message_buffer.buffer + message_buffer.length, buf->base, nread); + message_buffer.length += nread; + uv_mutex_unlock(&message_buffer.lock); + + uv_async_send(&message_handler); + free(buf->base); +} + +uint8_t latest_job[MAX_MESSAGE_SIZE]; + +void process_message(uv_async_t* handle) { + bool found_latest_job = false; + ssize_t latest_job_offset = 0; + ssize_t latest_job_len = 0; + ssize_t offset = 0; + + uv_mutex_lock(&message_buffer.lock); + uint8_t *buf = message_buffer.buffer; + ssize_t len = message_buffer.length; + + // try to read the latest jobs message from the buffer. + while (len - offset >= 4) { - switch (message->kind) + ssize_t message_size = decode_size(buf + offset); + ssize_t total_message_size = 4 + message_size; + // ignore the submit result message for simplicity + if (total_message_size == 47) + { + offset += total_message_size; + continue; + } + if (len - offset >= total_message_size) + { + latest_job_offset = offset + 4; + latest_job_len = message_size; + offset += total_message_size; + found_latest_job = true; + } else { - case JOBS: - for (int i = 0; i < message->jobs->len; i++) - { - update_templates(message->jobs->jobs[i]); - } - start_mining_if_needed(); break; + } + } - case SUBMIT_RESULT: - char *block_hash_hex = bytes_to_hex(message->submit_result->block_hash, 32); - LOG( - "submitted: %d -> %d, %s: %d \n", - message->submit_result->from_group, - message->submit_result->to_group, - block_hash_hex, - message->submit_result->status - ); - free(block_hash_hex); - break; + if (found_latest_job) + { + memcpy(latest_job, buf + latest_job_offset, latest_job_len); + } + + if (offset > 0) + { + if (offset == len) + { + message_buffer.length = 0; + } + else + { + ssize_t remain = len - offset; + memmove(message_buffer.buffer, buf + offset, remain); + message_buffer.length = remain; } - free_server_message_except_jobs(message); } + uv_mutex_unlock(&message_buffer.lock); - free(buf->base); - // uv_close((uv_handle_t *) server, free_close_cb); + if (found_latest_job) + { + server_message_t *message = decode_server_message(latest_job, latest_job_len); + if (message) + { + switch (message->kind) + { + case JOBS: + for (int i = 0; i < message->jobs->len; i++) + { + update_templates(message->jobs->jobs[i]); + } + start_mining_if_needed(); + break; + + case SUBMIT_RESULT: + char *block_hash_hex = bytes_to_hex(message->submit_result->block_hash, 32); + LOG( + "submitted: %d -> %d, %s: %d \n", + message->submit_result->from_group, + message->submit_result->to_group, + block_hash_hex, + message->submit_result->status + ); + free(block_hash_hex); + break; + } + free_server_message_except_jobs(message); + } + } } void on_connect(uv_connect_t *req, int status) @@ -426,6 +475,9 @@ int main(int argc, char **argv) setup_gpu_worker_count(gpu_count, gpu_count * parallel_mining_works_per_gpu); loop = uv_default_loop(); + uv_mutex_init(&message_buffer.lock); + message_buffer.length = 0; + uv_async_init(loop, &message_handler, process_message); uv_timer_init(loop, &reconnect_timer); connect_to_broker(); diff --git a/src/messages.h b/src/messages.h index 871b660..1c14210 100644 --- a/src/messages.h +++ b/src/messages.h @@ -252,24 +252,9 @@ void extract_submit_result(uint8_t **bytes, submit_result_t *result) result->status = extract_bool(bytes); } -server_message_t *decode_server_message(blob_t *blob) +server_message_t *decode_server_message(uint8_t *bytes, ssize_t len) { - uint8_t *bytes = blob->blob; - ssize_t len = blob->len; - - if (len <= 4) { - return NULL; // not enough bytes for decoding - } - uint8_t *pos = bytes; - ssize_t message_size = extract_size(&pos); - assert(pos == bytes + 4); - - ssize_t message_byte_size = message_size + 4; - if (len < message_byte_size) { - return NULL; // not enough bytes for decoding - } - uint8_t version = extract_byte(&pos); if (version != mining_protocol_version) { LOG("Invalid protocol version %d, expect %d\n", version, mining_protocol_version); @@ -297,15 +282,6 @@ server_message_t *decode_server_message(blob_t *blob) LOGERR("Invalid server message kind\n"); exit(1); } - - assert(pos == (bytes + message_byte_size)); - if (message_byte_size < len) { - blob->len = len - message_byte_size; - memmove(blob->blob, pos, blob->len); - } else { - blob->len = 0; - } - return server_message; }