diff --git a/README.md b/README.md index d9dae46..a0e0053 100644 --- a/README.md +++ b/README.md @@ -49,9 +49,11 @@ configs explanation: }, "withholdPercent": 0.005, // coinbase reward withhold percent(0.5% by default), used for tx fee mainly + "rewardEnabled": true, // enabled by default "rewardInterval": 600, // update miner balances every this many seconds "confirmationTime": 30600, // 510m by default, you can decrease this if your payment addresses have enough balance + "paymentEnabled": true, // enabled by default "minPaymentCoins": "3.5", // minimum number of coins that a miner must earn before sending payment "paymentInterval": 600, // send payment every this many seconds diff --git a/config.json b/config.json index 10151ed..1d5329e 100644 --- a/config.json +++ b/config.json @@ -44,9 +44,11 @@ }, "withholdPercent": 0.005, + "rewardEnabled": true, "rewardInterval": 600, "confirmationTime": 30600, + "paymentEnabled": true, "minPaymentCoins": "0.5", "paymentInterval": 3600, "txConfirmations": { diff --git a/lib/jobManager.js b/lib/jobManager.js index e58e354..e86b19f 100644 --- a/lib/jobManager.js +++ b/lib/jobManager.js @@ -141,7 +141,7 @@ function JobManager(jobExpiryPeriod){ difficulty: difficulty, error: error[1] }); - return {error: error, result: null}; + return {error: error}; }; var job = _this.validJobs.getJob(params.jobId); @@ -214,7 +214,7 @@ function JobManager(jobExpiryPeriod){ foundBlock: foundBlock }); - return {result: true, error: null, blockHash: hash}; + return {error: null}; }; }; JobManager.prototype.__proto__ = events.EventEmitter.prototype; diff --git a/lib/paymentProcessor.js b/lib/paymentProcessor.js index 8feeab4..6bfd971 100644 --- a/lib/paymentProcessor.js +++ b/lib/paymentProcessor.js @@ -578,10 +578,12 @@ var PaymentProcessor = module.exports = function PaymentProcessor(config, logger } this.start = function(){ - checkAddress(config.addresses); - loadPublicKey(config.wallet, function(){ - setTimeout(payment, config.paymentInterval * 1000); - }); + if (config.paymentEnabled){ + checkAddress(config.addresses); + loadPublicKey(config.wallet, function(){ + setTimeout(payment, config.paymentInterval * 1000); + }); + } } function loadPublicKey(walletConfig, callback){ diff --git a/lib/pool.js b/lib/pool.js index 872298b..a610f45 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -100,7 +100,6 @@ var pool = module.exports = function pool(config, logger){ share_difficulty: shareData.shareDiff, ip: shareData.ip })) - _this.shareProcessor.handleShare(shareData); if (shareData.foundBlock){ logger.info('Found block for chainIndex: ' + chainIndex + ', hash: ' + shareData.blockHash + @@ -114,6 +113,7 @@ var pool = module.exports = function pool(config, logger){ } }); } + _this.shareProcessor.handleShare(shareData); }) } @@ -204,14 +204,14 @@ var pool = module.exports = function pool(config, logger){ _this.varDiff.manageClient(client); client.on('submit', function(params, resultCallback){ - var result =_this.jobManager.processShare( + var result = _this.jobManager.processShare( params, client.previousDifficulty, client.difficulty, client.remoteAddress, client.socket.localPort ); - resultCallback(result.error, result.result ? true : null); + resultCallback(result.error); }).on('malformedMessage', function (message) { logger.warn('Malformed message from ' + client.getLabel() + ': ' + message); diff --git a/lib/shareProcessor.js b/lib/shareProcessor.js index 2f16643..211e82c 100644 --- a/lib/shareProcessor.js +++ b/lib/shareProcessor.js @@ -6,6 +6,7 @@ const { Pool } = require('pg'); var ShareProcessor = module.exports = function ShareProcessor(config, logger){ var confirmationTime = config.confirmationTime * 1000; var rewardPercent = 1 - config.withholdPercent; + var shareExpiryPeriod = 15; var _this = this; this.redisClient = new Redis(config.redis.port, config.redis.host); @@ -71,11 +72,11 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ createTables(_this.db); _this.handleShare = function(share){ persistShare(_this.db, share); - _this._handleShare(share); + _this._handleShare(share, _ => {}); } } else { - _this.handleShare = share => _this._handleShare(share); + _this.handleShare = share => _this._handleShare(share, _ => {}); } this.currentRoundKey = function(fromGroup, toGroup){ @@ -91,28 +92,50 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ var hashrateKey = 'hashrate'; var balancesKey = 'balances'; - this._handleShare = function(share){ - var redisTx = _this.redisClient.multi(); - var currentMs = Date.now(); + this.shareCacheKey = function(fromGroup, toGroup, hash){ + return fromGroup + ':' + toGroup + ':hashes:' + hash; + } + + this._handleShare = function(share, callback){ var fromGroup = share.job.fromGroup; var toGroup = share.job.toGroup; + var blockHash = share.blockHash; var currentRound = _this.currentRoundKey(fromGroup, toGroup); - redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); + var hashKey = _this.shareCacheKey(fromGroup, toGroup, blockHash); - var currentTs = Math.floor(currentMs / 1000); - redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); + _this.redisClient.set(hashKey, true, 'EX', shareExpiryPeriod, 'NX', function(error, result){ + if (error){ + logger.error('Check share duplicated failed, error: ' + error); + callback(error); + return; + } - if (share.foundBlock){ - var blockHash = share.blockHash; - var newKey = _this.roundKey(fromGroup, toGroup, blockHash); - var blockWithTs = blockHash + ':' + currentMs.toString(); + if (result == null){ + logger.error('Ignore duplicated share, key: ' + hashKey); + callback('duplicated share'); + return; + } - redisTx.rename(currentRound, newKey); - redisTx.sadd(pendingBlocksKey, blockWithTs); - redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) - } - redisTx.exec(function(error, _){ - if (error) logger.error('Handle share failed, error: ' + error); + var redisTx = _this.redisClient.multi(); + redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); + + var currentMs = Date.now(); + var currentTs = Math.floor(currentMs / 1000); + redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); + + if (share.foundBlock){ + var blockHash = share.blockHash; + var newKey = _this.roundKey(fromGroup, toGroup, blockHash); + var blockWithTs = blockHash + ':' + currentMs.toString(); + + redisTx.rename(currentRound, newKey); + redisTx.sadd(pendingBlocksKey, blockWithTs); + redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) + } + redisTx.exec(function(error, _){ + if (error) logger.error('Handle share failed, error: ' + error); + callback(error); + }); }); } @@ -265,6 +288,8 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ } this.start = function(){ - setInterval(scanBlocks, config.rewardInterval * 1000); + if (config.rewardEnabled){ + setInterval(scanBlocks, config.rewardInterval * 1000); + } } } diff --git a/lib/stratum.js b/lib/stratum.js index 0a82724..1ba5d45 100644 --- a/lib/stratum.js +++ b/lib/stratum.js @@ -73,15 +73,13 @@ var StratumClient = function(params){ function handleSubmit(message){ _this.emit('submit', message.params, - function(error, result){ - if (!error && result){ - _this.emit('submitAccepted'); - } - if (!considerBan(result)){ + function(error){ + var accepted = !error; + if (!considerBan(accepted)){ sendJson({ id: message.id, method: 'mining.submit_result', - result: result, + result: accepted, error: error }); } diff --git a/test/shareProcessorTest.js b/test/shareProcessorTest.js index 9e2f3cd..8acb452 100644 --- a/test/shareProcessorTest.js +++ b/test/shareProcessorTest.js @@ -26,6 +26,56 @@ describe('test share processor', function(){ }) }) + it('should ignore duplicated shares', function(done){ + var shareProcessor = new ShareProcessor(test.config, test.logger); + shareProcessor.redisClient = redisClient; + + var share1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 1, workerAddress: 'miner1'}; + var share2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash2', difficulty: 2, workerAddress: 'miner1'}; + var invalidShare1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner1'}; + var invalidShare2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner2'}; + + var checkState = function(roundKey, callback){ + redisClient + .multi() + .hget(roundKey, 'miner1') + .hget(roundKey, 'miner2') + .exec(function(error, results){ + if (error) assert.fail('Test failed: ' + error); + var difficulty1 = results[0][1]; + var difficulty2 = results[1][1]; + callback(difficulty1, difficulty2); + }); + }; + + var currentRoundKey = shareProcessor.currentRoundKey(0, 1); + var key1 = shareProcessor.shareCacheKey(0, 1, share1.blockHash); + var key2 = shareProcessor.shareCacheKey(0, 1, share2.blockHash); + util.executeForEach([share1, share2, invalidShare1, invalidShare2], (share, callback) => { + shareProcessor._handleShare(share, callback); + }, function(){ + checkState(currentRoundKey, function(diff1, diff2){ + expect(parseFloat(diff1)).equal(share1.difficulty + share2.difficulty); + expect(diff2).equal(null); + + var blockShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: true, blockHash: 'hash3', difficulty: 3, workerAddress: 'miner1'}; + shareProcessor._handleShare(blockShare, function(){ + var roundKey = shareProcessor.roundKey(0, 1, blockShare.blockHash); + checkState(roundKey, function(diff1, diff2){ + expect(parseFloat(diff1)).equal(share1.difficulty + share2.difficulty + blockShare.difficulty); + expect(diff2).equal(null); + + redisClient.exists(key1, key2, function(error, result){ + if (error) assert.fail('Test failed: ' + error); + expect(result).equal(2); + done(); + }); + }); + }); + }); + }); + }) + it('should process shares', function(done){ var shareProcessor = new ShareProcessor(test.config, test.logger); shareProcessor.redisClient = redisClient; @@ -38,44 +88,47 @@ describe('test share processor', function(){ foundBlock: false }; - shareProcessor.handleShare(shareData); - var currentRoundKey = shareProcessor.currentRoundKey( - shareData.job.fromGroup, - shareData.job.toGroup - ); - - redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){ - if (error) assert.fail('Test failed: ' + error); - expect(parseFloat(res)).equal(shareData.difficulty); - - shareData.foundBlock = true; - var blockHashHex = '0011'; - shareData.blockHash = blockHashHex; - shareProcessor.handleShare(shareData); + shareProcessor._handleShare(shareData, function(){ - var roundKey = shareProcessor.roundKey( + var currentRoundKey = shareProcessor.currentRoundKey( shareData.job.fromGroup, - shareData.job.toGroup, - blockHashHex + shareData.job.toGroup ); - redisClient - .multi() - .hget(roundKey, shareData.workerAddress) - .smembers('pendingBlocks') - .hget('foundBlocks', blockHashHex) - .exec(function(error, result){ - if (error) assert.fail('Test failed: ' + error); - var difficulty = result[0][1]; - var pendingBlocks = result[1][1]; - var blockMiner = result[2][1]; + redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){ + if (error) assert.fail('Test failed: ' + error); + expect(parseFloat(res)).equal(shareData.difficulty); - expect(parseFloat(difficulty)).equal(shareData.difficulty * 2); - expect(pendingBlocks.length).equal(1); - expect(pendingBlocks[0].startsWith(blockHashHex)); - expect(blockMiner).equal(shareData.workerAddress); - done(); + shareData.foundBlock = true; + var blockHashHex = '0011'; + shareData.blockHash = blockHashHex; + shareProcessor._handleShare(shareData, function(){ + + var roundKey = shareProcessor.roundKey( + shareData.job.fromGroup, + shareData.job.toGroup, + blockHashHex + ); + + redisClient + .multi() + .hget(roundKey, shareData.workerAddress) + .smembers('pendingBlocks') + .hget('foundBlocks', blockHashHex) + .exec(function(error, result){ + if (error) assert.fail('Test failed: ' + error); + var difficulty = result[0][1]; + var pendingBlocks = result[1][1]; + var blockMiner = result[2][1]; + + expect(parseFloat(difficulty)).equal(shareData.difficulty * 2); + expect(pendingBlocks.length).equal(1); + expect(pendingBlocks[0].startsWith(blockHashHex)); + expect(blockMiner).equal(shareData.workerAddress); + done(); + }); }); + }); }); }) diff --git a/test/stratumTest.js b/test/stratumTest.js index e87b81a..26becbb 100644 --- a/test/stratumTest.js +++ b/test/stratumTest.js @@ -155,7 +155,7 @@ describe('test stratum server', function(){ stratumClient.on('submit', function(params, callback){ expect(params).equal(submitMessage.params); - callback(null, false); + callback('invalid share'); }); });