Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 288 additions & 31 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"license": "GPL-3.0-or-later",
"dependencies": {
"@sentry/node": "^5.1.0",
"amqplib": "^0.8.0",
"bcryptjs": "^2.4.3",
"body-parser": "^1.18.2",
"chai-uuid": "^1.0.6",
Expand All @@ -50,6 +51,7 @@
"morgan": "^1.9.1",
"nodemon": "^2.0.4",
"pg": "^8.5.1",
"rascal": "^13.0.3",
"uuid": "^8.2.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion server/models/Wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Wallet{
this._id = idOrJSON.id;
this._JSON = idOrJSON;
}else{
throw new HttpError(500);
throw new HttpError(500, "Wrong constructor arg for wallet");
}
const WalletService = require("../services/WalletService");
this.walletRepository = new WalletRepository(session);
Expand Down
12 changes: 12 additions & 0 deletions server/repositories/TransferRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ class TransferRepository extends BaseRepository{
state: Transfer.STATE.pending,
});
}

async getTokensById(id){

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming this to getTokenAndCaptureIds(transactionId) would be verbose but more clear. Right now, it is not clear the parameter id is referring the transaction/transfer id or token id.

Another lesser preference would be to rename it to getTokens(transactionId)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, I agree, I will change it

return await this._session.getDB().raw(
`
SELECT token_id, capture_id FROM "transaction" tr
LEFT JOIN "token" t
ON tr.token_id = t.id
WHERE transfer_id = ?
`,
[id],
);
}
}

module.exports = TransferRepository;
20 changes: 20 additions & 0 deletions server/repositories/TransferRepository.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const knex = require("../database/knex");
const tracker = mockKnex.getTracker();
const Session = require("../models/Session");
const uuid = require('uuid');
const sinon = require('sinon');

describe("TransferRepository", () => {
let transferRepository;
Expand Down Expand Up @@ -74,5 +75,24 @@ describe("TransferRepository", () => {
expect(result).lengthOf(1);
});

it("getTokensById", async () => {
const data = [{
capture_id: "c",
token_id: "t",
}];
tracker.uninstall();
tracker.install();
tracker.on('query', function sendResult(query, step) {
[
function firstQuery() {
expect(query.sql).match(/capture_id/);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is match here a regex kind? I feel either we should do a robust match by verifying the exact query if possible.
Even if we did this test seems to be verifying just the method signature and knex but nothing significant in our code. I would prefer to skip the unit test for this Repository class if you ask me. It would be beneficial and reduce our developer time spent on maintaining it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe you are right, I don't realize that I added some tests maybe look not so useful, yes, it might be unnessarary, I will think it over, and another reason lead to this result is that I wrote the test first before I wrote the implementation, so when I link this model to the real system, I don't need to worry too much about that it's some problematic model, it makes integrating the code easier.

query.response(data);
},
][step - 1]();
});
const result = await transferRepository.getTokensById(1);
sinon.assert.match(result, data);
});

});

22 changes: 16 additions & 6 deletions server/routes/transferRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ transferRouter.post(
const walletLogin = await walletService.getById(res.locals.wallet_id);
const walletSender = await walletService.getByIdOrName(req.body.sender_wallet);
const walletReceiver = await walletService.getByIdOrName(req.body.receiver_wallet);
const transferService = new TransferService(session);
// check if this transfer is a claim (claim == not transferrrable tokens)
const claim = req.body.claim;

Expand All @@ -76,7 +77,15 @@ transferRouter.post(
// TODO: get only transferrable tokens
result = await walletLogin.transferBundle(walletSender, walletReceiver, req.body.bundle.bundle_size, claim);
}
const transferService = new TransferService(session);

// send message
if (result.state === Transfer.STATE.completed) {
await transferService.sendMessage(result.id);
}

await session.commitTransaction();

// response
result = await transferService.convertToResponse(result);
if (result.state === Transfer.STATE.completed) {
res.status(201).json(result);
Expand All @@ -88,7 +97,6 @@ transferRouter.post(
} else {
throw new Error(`Unexpected state ${result.state}`);
}
await session.commitTransaction();
}catch(e){
if(e instanceof HttpError && !e.shouldRollback()){
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -127,8 +135,9 @@ transferRouter.post(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
await transferService.sendMessage(transferJson.id);
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -167,8 +176,8 @@ transferRouter.post(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -207,8 +216,8 @@ transferRouter.delete(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -280,8 +289,9 @@ transferRouter.post(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
await transferService.sendMessage(transferJson.id);
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down
5 changes: 5 additions & 0 deletions server/routes/transferRouter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ describe("transferRouter", () => {
id: tokenId,
state: Transfer.STATE.completed,
});
sinon.stub(TransferService.prototype, "sendMessage");
const res = await request(app)
.post('/')
.send({
Expand Down Expand Up @@ -188,6 +189,7 @@ describe("transferRouter", () => {
id: transferId,
state: Transfer.STATE.completed,
});
const sendMessage = sinon.stub(TransferService.prototype, "sendMessage");
const res = await request(app)
.post('/')
.send({
Expand All @@ -196,6 +198,9 @@ describe("transferRouter", () => {
receiver_wallet: wallet2Id,
});
expect(res).property('statusCode').eq(201);

// should send message to queue
expect(sendMessage).calledWith(transferId);
});

// //TODO: test for case 1: with trust relationship, tokens specified
Expand Down
27 changes: 27 additions & 0 deletions server/services/MQConfig.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module.exports = {
config: {
"vhosts": {
"test": {
"connection": {
"url": process.env.RABBIT_MQ_URL,
"socketOptions": {
"timeout": 1000
}
},
"exchanges": ["field-data"],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably need a new exchange named wallet-service-ex with a queue binding like the following. The consumer of your publication is web-map-query-service. "field-data-events" queue are meant for events generated by field-data-service.

     "exchanges": [
           "wallet-service-ex"
     ],
      "queues": [
        "token-transfer:events"
      ],
      "bindings": [
        "wallet-service-ex[token.transfer] -> token-transfer: events"
      ],
      "publications": {
        "token-assigned": {
          "exchange": "wallet-service-ex",
          "routingKey": "token.transfer"
        }
      }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we have something like a dashboard/panel to check and manage the queue? RabbitMQ I mean? How can I know what kind of exchanges are existing?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do. I will ping you the url and credentials separately. Also, if the rabbitmq doesn't have an exchange and queues that you have specified in the config, it gets created when app starts running.

Copy link
Collaborator Author

@dadiorchen dadiorchen Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @arunbakt for your helpful configuration. I have added the exchange and queue, now the message has been sent to the queue,
Screen Shot 2021-06-21 at 11 45 00 AM

"queues": ["field-data-events", "field-data:verifications"],
"publications": {
"raw-capture-created": {
"exchange": "field-data"
}
},
"subscriptions": {
"admin-verification": {
"queue": "field-data:verifications"
}
}
}
}
}
}

37 changes: 37 additions & 0 deletions server/services/MQService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const Broker = require('rascal').BrokerAsPromised;
const config = require("./MQConfig").config;
const HttpError = require("../utils/HttpError");
const log = require("loglevel");

class MQService{

constructor(session){
this._settsion = session;
}

sendMessage(payload){
return new Promise((resolve, reject) => {
const broker = Broker.create(config);
// TODO
Promise.resolve(broker)
.then(broker => {
broker.publish("raw-capture-created", payload, "field-data.capture.creation")
.then(publication => {
publication
.on("success", () => resolve(true))
.on("error", (err, messageId)=> {
const error = `Error with id ${messageId} ${err.message}`;
log.error(error);
reject(new HttpError(500, error));
});
})
.catch(err => {
log.error(err);
reject(new HttpError(500, `Error publishing message ${err}`));
})
});
});
}
}

module.exports = MQService;
62 changes: 62 additions & 0 deletions server/services/MQService.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const MQService = require("./MQService");
const Broker = require('rascal').BrokerAsPromised;
const sinon = require("sinon");
const {expect} = require("chai");
const jestExpect = require("expect");

describe("MQService", () => {

afterEach(() => {
sinon.restore();
});

it("send message successfully", async () => {
const broker = {
publish: async () => {
console.log("publish");
return {
on(event, handler){
// mock the success event
if(event === "success"){
setImmediate(handler);
}
return this;
}
}
}
};
sinon.spy(broker, "publish");
sinon.stub(Broker, "create").resolves(broker);
const mqService = new MQService();

const payload = {a:1};
const result = await mqService.sendMessage(payload);
expect(result).eq(true);
sinon.assert.calledWith(broker.publish, "raw-capture-created", payload, "field-data.capture.creation");

});

it("send message with problem", async () => {
sinon.stub(Broker, "create").returns({
publish: async () => {
console.log("publish");
return {
on(event, handler){
// mock the error event
if(event === "error"){
setImmediate(() => handler(new Error("Message sending wrong"), "No.1"));
}
return this;
}
}
}
});
const mqService = new MQService();

await jestExpect(async () => {
await mqService.sendMessage({a:1});
}).rejects.toThrow(/Message sending wrong/);

});

});
44 changes: 40 additions & 4 deletions server/services/TransferService.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
const WalletService = require('./WalletService');
const MQService = require("./MQService");
const log = require("loglevel");
const TransferRepository = require('../repositories/TransferRepository');

class TransferService {
constructor(session) {
this._session = session;
this.walletService = new WalletService(session);
this._walletService = new WalletService(session);
this._mqService = new MQService();
this._transferRepository = new TransferRepository(session);
}

async convertToResponse(transferObject) {
Expand All @@ -14,25 +19,56 @@ class TransferService {
} = transferObject;
const result = { ...transferObject };
{
const wallet = await this.walletService.getById(originator_wallet_id);
const wallet = await this._walletService.getById(originator_wallet_id);
const json = await wallet.toJSON();
result.originating_wallet = json.name;
delete result.originator_wallet_id;
}
{
const wallet = await this.walletService.getById(source_wallet_id);
const wallet = await this._walletService.getById(source_wallet_id);
const json = await wallet.toJSON();
result.source_wallet = await json.name;
delete result.source_wallet_id;
}
{
const wallet = await this.walletService.getById(destination_wallet_id);
const wallet = await this._walletService.getById(destination_wallet_id);
const json = await wallet.toJSON();
result.destination_wallet = await json.name;
delete result.destination_wallet_id;
}
return result;
}

/*
* Send message to queue, inform about the transfer detail, token, and
* associated tree/capture
*
{
"type": "TokensAssigned",
"wallet_name": "joeswallet",
"entries": [
{ "capture_id": "63e00bca-8eb0-11eb-8dcd-0242ac130003", "token_id": "9d7abad8-8eb0-11eb-8dcd-0242ac130003" },
{ "capture_id": "8533b704-8eb0-11eb-8dcd-0242ac130003", "token_id":"a5799d94-8eb0-11eb-8dcd-0242ac130003" } ]
}
*/
async sendMessage(transferId){
log.debug("send message");
const transfer = await this._transferRepository.getById(transferId);
const walletReceiver = await this._walletService.getById(transfer.destination_wallet_id);
const walletReceiverObj = await walletReceiver.toJSON();
const walletSender = await this._walletService.getById(transfer.source_wallet_id);
const walletSenderObj = await walletSender.toJSON();
const tokenData = await this._transferRepository.getTokensById(transferId);
const message = {
transfer_id: transferId,
type: "TokensAssigned",
wallet_name : walletReceiverObj.name,
wallet_name_sender : walletSenderObj.name,
entries: tokenData,
};
this._mqService.sendMessage(message);
}

}

module.exports = TransferService;
Loading