Comments (9)
I just realized immediately after writing this issue what the problem is and what exactly is going on with my code.
The problem is that the event handlers for both 'error' and 'data' events on the socket are being registered to the socket before any response from the socket. Therefore, when the first response from 'method1' comes back, both of the functions attach to the socket get invoked (and removed because they're attached with the .once EventEmitter method). When the response from 'method2' comes around, there are no event handlers to handle it, thus it passes by silently.
So the issue is apparent. However, the solution isn't so clear to me yet. It seems like what I would need to do is generate a unique ID for each payload, and then invoke the appropriate callback using this ID.
Or, maybe I should just create a new socket object for every request. My only concern here is with efficiency and memory leaks. Also, would this even solve the issue; wouldn't each new socket get the same 'data' events written to it?
from rabbit.js.
The answers will come back in the order that you sent them, so one thing you could do is keep a queue (array) of the callbacks, and dequeue (shift
) every time a message comes in.
from rabbit.js.
That doesn't work because the order doesn't come back the same. If a process on the exchange takes longer than others, then it will come in after other messages. I'll need an ID system.
/**
* Message Queue (RabbitMQ)
*/
global.mqContext = require('rabbit.js').createContext('amqp://localhost');
global.mqSockets = { // Cache for all the sockets
'backend': 'REQ'
};
global.mqContext.on('ready', function(){
for (var key in global.mqSockets)
{
if (global.mqSockets.hasOwnProperty(key));
{
if (['REQ', 'SUB'].indexOf(global.mqSockets[key]) !== -1)
{
var mqsock = global.mqContext.socket(global.mqSockets[key]);
mqsock.mqCallbacks = [];
mqsock.connect(key, function(){
global.mqSockets[key] = mqsock;
});
mqsock.on('error', function(err){
var cb = mqsock.mqCallbacks.shift();
cb(err);
})
mqsock.on('data', function(data){
var cb = mqsock.mqCallbacks.shift();
var json = JSON.parse(data);
if (json.error)
return cb(json.error);
cb(null, json);
});
}
}
}
})
global.MQ = function(queue){
var mqsock = global.mqSockets[queue];
function rtnFun(method, info, cb){
if (typeof info === 'function')
{
var cb = info;
var info = {};
}
mqsock.mqCallbacks.push(cb);
var payload = JSON.stringify({
method: method,
info: info
});
mqsock.write(payload, 'utf8');
return rtnFun;
};
return rtnFun;
}
That's my implementation. When I do something like this:
mqBackend('exchange.getPriceHighByDate', wait(getPriceHighByDateHandler));
mqBackend('ads.getAdViewsByDate', wait(getAdViewsByDateHandle));
mqBackend('banks.getCreditsCreatedByDate', wait(getCreditsCreatedByDateHandler));
mqBackend('banks.getCreditsConsumedByDate', wait(getCreditsConsumedByDateHandler));
The second one triggers after all of them. This causes it to get the wrong handler, wait(getCreditsConsumedByDateHandler)
, instead of the right handler, wait(getAdViewsByDateHandle)
.
from rabbit.js.
Yes, an ID system would work fine. The discussion here is apposite: #48
from rabbit.js.
@samholmes How did you get on?
from rabbit.js.
I went with a ID system.
/**
* Message Queue (RabbitMQ)
*/
global.mqContext = require('rabbit.js').createContext('amqp://localhost');
global.mqSockets = { // Cache for all the sockets
'backend': 'REQ'
};
global.mqContext.on('ready', function(){
for (var key in global.mqSockets)
{
if (global.mqSockets.hasOwnProperty(key));
{
if (['REQ', 'SUB'].indexOf(global.mqSockets[key]) !== -1)
{
var mqsock = global.mqContext.socket(global.mqSockets[key]);
mqsock.mqCallbacks = {};
mqsock.connect(key, function(){
global.mqSockets[key] = mqsock;
});
mqsock.on('data', function(payload){
var json = JSON.parse(payload);
var cb = mqsock.mqCallbacks[json.callbackId];
if (typeof cb !== 'function') return;
delete mqsock.mqCallbacks[json.callbackId];
if (json.error)
return cb(json.error);
cb(null, json.info);
});
}
}
}
})
global.MQ = function(queue){
var mqsock = global.mqSockets[queue];
function rtnFun(method, info, cb){
if (typeof info === 'function')
{
var cb = info;
var info = {};
}
var callbackId = Math.random();
mqsock.mqCallbacks[callbackId] = cb;
var payload = JSON.stringify({
callbackId: callbackId,
method: method,
info: info
});
mqsock.write(payload, 'utf8');
return rtnFun;
};
return rtnFun;
}
from rabbit.js.
@samholmes, just checking that this is working for you. Do you think it's worth me including something in rabbit.js to make this easier? It doesn't fit in the socket-style API, but perhaps it could be a utility.
from rabbit.js.
This is working fine for me. I'm not sure how it fits with the style of rabbit.js.
from rabbit.js.
One thing that needs modifying with my code is that it'll fail if I try to use the MQ function before the socket connects. I might modify this into a module that will queue up method invocations in that case.
from rabbit.js.
Related Issues (20)
- update dependencies and fix verion warnings HOT 1
- cannot connect to SUB HOT 1
- context.close() does not emit "close" event
- How to make a durable named queue?
- [SOLVED] Cant connect to remote host HOT 1
- Identifying the Queue name HOT 2
- Queue durability, exclusivity and consumerTag HOT 1
- Allow control over exchange creation HOT 4
- How to get the contents of header?
- bump amqp.node version to 0.4.4 (Node 4.x) HOT 1
- rabbit.js within AngularJS HOT 6
- How best to implement distributed semaphores? HOT 3
- Running REQ/REP as an RPC
- Advantage of using over plain AMQPLIB HOT 1
- Connect callback not optional
- List queues?
- message in queue impact by another message
- How to set queue x-max-length parameter HOT 3
- example error HOT 1
- Prefetch option can only be set through setsockopt and not as part of socket argument.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rabbit.js.