Comments (5)
Can you please pull from my repository and test?
http://github.com/hurtonm/zeromq2/commits/multipart_messages_encoder_fix
from libzmq.
This seems to have fixed the head loss issue, however concatenation issue is still there.
Receiver program:
#include "iostream" #include "zmq.hpp" #define NUM_MSG 2048 int main (int argc, char* argv[]) { int i, num; long more = 1; size_t more_size = sizeof(more); if (argc < 2) { std::cout << "Receiver arguments: " << std::endl; return 1; } num = (argc < 3) ? NUM_MSG : atoi(argv[2]); zmq::context_t ctx (1); zmq::socket_t s (ctx, ZMQ_SUB); s.bind ("epgm://eth0;224.10.10.10:5555"); s.setsockopt (ZMQ_SUBSCRIBE, argv[1], std::strlen(argv[1])); std::cout << "Subscribed to [" << argv[1] << "], length " << std::strlen(argv[1]) << std::endl; for (i = 0; i < num; i++) { zmq::message_t hdr; zmq::message_t msg; s.recv (&hdr); s.getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more) { s.recv (&msg); std::cout << "Topic [" << ((const char *)hdr.data()) << "] : " << i << " = " << ((const char *)msg.data()) << std::endl; } else std::cout << "Topic [" << argv[1] << "] : " << i << " header only [" << ((const char *)hdr.data()) << "]" << std::endl; } return 0; }
Sender program:
#include "iostream" #include "sstream" #include "zmq.hpp" #define NUM_MSG 1 int main (int argc, char* argv[]) { int i, num; long rate = 10000; if (argc < 3) { std::cout << "Sender arguments: " << std::endl; return 1; } num = (argc < 4) ? NUM_MSG : atoi(argv[3]); zmq::context_t ctx (1); zmq::socket_t s (ctx, ZMQ_PUB); s.setsockopt (ZMQ_RATE, &rate, sizeof (rate)); s.connect ("epgm://eth0;224.10.10.10:5555"); for (i = 0; i < num; i++) { std::ostringstream oss; oss << argv[2] << "[" << i << "]"; zmq::message_t hdr (argv[1], std::strlen(argv[1]) + 1, NULL); zmq::message_t msg (oss.str().size() + 1); strcpy((char*)msg.data(), oss.str().c_str()); std::cout << "Sending '" << ((const char*)msg.data()) << "' to [" << ((const char*)hdr.data()) << "]" << std::endl; s.send (hdr, ZMQ_SNDMORE); s.send (msg); } return 0; }
Test:
Start receiver to wait for a large number of messages, then make sender transmit the same number. Once in a while this leads to the receiver not getting all of the sent messages. When this happens, restart the sender to send a single message to a different topic.
Start receiver:
[ak@linux src]$ /tmp/zrm test 16384
Start sender:
[ak@linux zmq]$ /tmp/zsm test 1111111111111111111111111111 16384 >& /dev/zero
Receiver output until it stops:
...
Topic [test] : 12436 = 11111111111111111111111111111[12436]
Topic [test] : 12437 = 11111111111111111111111111111[12437]
Topic [test] : 12438 = 11111111111111111111111111111[12438]
Then, sender again to a different topic:
[ak@linux zmq]$ /tmp/zsm testing 222222222222222222222222222 1 >& /dev/zero
Receiver:
...
Topic [test] : 12439 = testing
Topic [test] : 12440 header only [22222222222222222222222222222[0]]
Topic [testing] : 12441 = 22222222222222222222222222222[1]
from libzmq.
Thanks for elaborating. Will look into this.
from libzmq.
Hi -
I am also having this issue where the first N-1 parts of an N part multi-message are being lost on the first message sent over epgm.
Is this issue still being looked into or should I post another example? I am using zeromq version 2.1.4.
from libzmq.
Here's a basic C++ example that shows this problem. Start pgmtest2 (the subscriber), then start pgmtest1. pgmtest2 should show
01
02
03
01
02
03
but instead shows
03
01
02
03
pgmtest1.cpp (publisher)
int main (int argc, char *argv[])
{
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
try
{
publisher.connect("epgm://eth0;239.255.7.15:11142");
}
catch(std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
}
int64_t rate = 100;
publisher.setsockopt(ZMQ_RATE, &rate, sizeof(rate));
// int i = 0;
for(int i = 0, n = 2; i < n; ++i)
{
zmq_msg_t part1;
int rc = zmq_msg_init_size (&part1, 1);
assert (rc == 0);
/* Fill in message content with unsigned char 1 */
memset (zmq_msg_data (&part1), 1, 1);
/* Send the message to the socket */
rc = zmq_send (publisher, &part1, ZMQ_SNDMORE);
assert (rc == 0);
zmq_msg_t part2;
rc = zmq_msg_init_size (&part2, 1);
assert (rc == 0);
/* Fill in message content with unsigned char 2 */
memset (zmq_msg_data (&part2), 2, 1);
/* Send the message to the socket */
rc = zmq_send (publisher, &part2, ZMQ_SNDMORE);
assert (rc == 0);
zmq_msg_t part3;
rc = zmq_msg_init_size (&part3, 1);
assert (rc == 0);
/* Fill in message content with unsigned char 2 */
memset (zmq_msg_data (&part3), 3, 1);
/* Send the message to the socket */
rc = zmq_send (publisher, &part3, 0);
assert (rc == 0);
}
return 0;
}
pgmtest2.cpp (subscriber)
#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <stdio.h>
int main (int argc, char *argv[])
{
zmq::context_t context (1);
// Socket to talk to server
zmq::socket_t subscriber (context, ZMQ_SUB);
try
{
subscriber.connect("epgm://eth0;239.255.7.15:11142");
}
catch(std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
}
subscriber.setsockopt(ZMQ_SUBSCRIBE, 0, 0);
for(;;)
{
zmq::message_t update;
subscriber.recv(&update);
std::string in(static_cast<const char*>(update.data()), update.size());
std::cout << static_cast<int>(in[0]) << std::endl;
}
return 0;
}
from libzmq.
Related Issues (20)
- Is there a definitive list of security vulnerabilities against libzmq versions? HOT 2
- Platform.hpp not found issue on macOS Sonoma and Xcode 15.2 HOT 1
- NPM install failed for zeromq dependency while installing device farmer STF. NPM logs available in the description. HOT 1
- ZMQ subscriber doesn't receive message first time occasionally. HOT 2
- What is the procedure to finalise a DRAFT API feature HOT 1
- BUG: IPC socket drops messages, ignores LINGER HOT 1
- How can I clear the internal buffer of ZMQ multicast without closing the socket?
- Add more guidance on error handling in the guide/docs
- Too many resets on loopback after socket.connect() call HOT 1
- cmake: ZMQ_WIN32_WINNT autodetection broken when cross-compiling with mingw-w64
- When the topic length of sub exceeds 15kb, xpub crashes when calling zmq_msg_recv.
- High latency when messages are not being send repeatedly HOT 4
- Latest libsodium has deprecated functions, causing it not to compile HOT 1
- tools: curve_keygen not built when building with cmake, ninja HOT 2
- No IPC Support for Windows, probably because of missing AF_UNIX support?
- Curve not working on ZeroMQ on IoS HOT 1
- test_busy_poll.cpp missing from zeromq 4.3.5 tarball
- Large stacks in ZeroMQ threads lead to inflated crashpad minidumps on Windows
- build fail on Ubuntu 24.04 LTS: autogen.sh stopped with error HOT 1
- Why am I no longer on the contributor list? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from libzmq.