subzerocloud / pg-amqp-bridge Goto Github PK
View Code? Open in Web Editor NEWSend messages to RabbitMQ from PostgreSQL
License: MIT License
Send messages to RabbitMQ from PostgreSQL
License: MIT License
I have installed PostgreSQL server in Windows.
How to install pg-amqp-bridge and use it?
Thank you.
How to to publish JSON?
When i try to run as a docker image on Ubuntu 18 with posgres local but rabbitmq i remote server with:
docker run --rm -it -net=host \ -e POSTGRESQL_URI="postgresql://user:[email protected]:5432/basename" \ -e POSTGRESQL_CHANNEL="test" \ -e AMQP_URI="amqp://user:[email protected]:5672//" \ -e AMQP_QUEUE_NAME="test" \ fgribreau/postgresql-to-amqp
i get this:
Finished release [optimized] target(s) in 0.37 secs
Running target/release/postgresql-to-amqp
Could not connect to AMQP: invalid port number
thread 'main' panicked at 'Could not run reactor: Error { repr: Custom(Custom { kind: Other, error: StringError("invalid port number") }) }', /checkout/src/libcore/result.rs:860:4
stack backtrace:
0: std::sys::imp::backtrace::tracing::imp::unwind_backtrace
at /checkout/src/libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::_print
at /checkout/src/libstd/sys_common/backtrace.rs:71
2: std::panicking::default_hook::{{closure}}
at /checkout/src/libstd/sys_common/backtrace.rs:60
at /checkout/src/libstd/panicking.rs:380
3: std::panicking::default_hook
at /checkout/src/libstd/panicking.rs:396
4: std::panicking::rust_panic_with_hook
at /checkout/src/libstd/panicking.rs:611
5: std::panicking::begin_panic_new
at /checkout/src/libstd/panicking.rs:553
6: std::panicking::begin_panic_fmt
at /checkout/src/libstd/panicking.rs:521
7: rust_begin_unwind
at /checkout/src/libstd/panicking.rs:497
8: core::panicking::panic_fmt
at /checkout/src/libcore/panicking.rs:92
9: core::result::unwrap_failed
10: postgresql_to_amqp::main
11: __rust_maybe_catch_panic
at /checkout/src/libpanic_unwind/lib.rs:98
12: std::rt::lang_start
at /checkout/src/libstd/panicking.rs:458
at /checkout/src/libstd/panic.rs:361
at /checkout/src/libstd/rt.rs:59
13: __libc_start_main
14:
Hi,
I am able to successfully connect to both my Postgres and RabbitMQ servers on my production environment but for some reason pg-amqp-brdige
doesn't receive any notifications from Postgres.
On my staging and dev environments, it works fine and I can see from the pg-amqp-bridge
logs that it's receiving and forwarding messages correctly. All instances are running on docker tag 0.0.6.
I created a simple Node.js script that listens to a specific channel on the production Postgres server and it is able to output all notifications sent to it, so I definitely know Postgres working. If I manually create and publish a message on RabbitMQ, my services that listen to RabbitMQ fetch those messages and react to them, so I know RabbitMQ also works.
It just seems like pg-amqp-bridge
is not getting or listening to Postgres in production. I don't see any logs that indicate it is receiving anything from Posgres, like I do on my staging and dev environments.
I've set RUST_LOG=debug
but can't see any errors being thrown.
My INFO
log output looks like this:
Attempting to connect to AMQP server..
INFO:amqp::session: Session initialized
Connection to AMQP server successful
Attempting to connect to PostgreSQL..
Connection to PostgreSQL successful
Listening on n13s...
Listening on user_events...
Can someone point me in the right direction? I've been trying to troubleshoot for the past day and am getting nowhere.
Attempting to connect to AMQP server..
Connection to AMQP server successful
thread 'main' panicked at 'The amqp entity "task_queue" doesn't exist', src/lib.rs:67:8
note: Run with `RUST_BACKTRACE=1` for a backtrace.
I tried to connect to Postgres db that requires ssl mode, but I got this error
app[pg-amqp-bridge] info Error(Db(DbError { severity: "FATAL", parsed_severity: Some(Fatal), code: SqlState("28000"), message: "no pg_hba.conf entry for host "xxxx", user "xx", database "xxx", SSL off", detail: None, hint: None, position: None, where_: None, schema: None, table: None, column: None, datatype: None, constraint: None, file: Some("auth.c"), line: Some(520), routine: Some("ClientAuthentication") }))
Stumbled upon this old issue with enhancement tag #12
Is this now supported in pg-amqp-bridge?
Would it be possible to add an option to change the delivery mode of the published messages to persistent? This would make it so that messages can be persisted to disk and won't be lost if rabbitmq is restarted (assuming the queues were defined as durable).
Thanks for the work on this project, it's great as a simple way to get postgresql events into rabbitmq.
os: macOS high sierra
postgres server and rabbitmq server run on localhost, pg-amqp-bridge run in docker
git checkout 0.0.6
POSTGRESQL_URI="postgres://postgres:[email protected]:5432" \
AMQP_URI="amqp://admin:[email protected]//" \ BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
cargo run
works.
but
pg_amqp_bridge:
image: subzerocloud/pg-amqp-bridge:0.0.6
environment:
- POSTGRESQL_URI=postgres://postgres:[email protected]:5432
- AMQP_URI=amqp://admin:[email protected]//
- BRIDGE_CHANNELS=pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange
restart: always
but docker-compose up got errors:
Attempting to obtain connection on AMQP server for pgchannel2 channel..
IoError(ConnectionRefused)
Retrying the AMQP connection for pgchannel2 channel in 1 seconds..
Attempting to obtain connection on AMQP server for pgchannel3 channel..
IoError(ConnectionRefused)
Retrying the AMQP connection for pgchannel3 channel in 1 seconds..
IoError(ConnectionRefused)
Retrying the AMQP connection for pgchannel1 channel in 2 seconds..
IoError(ConnectionRefused)
Default/nameless exchange - exchange created with every queue, this exchange has name "", routing-key = queue name
The problem is I can't use ''
with pg_notify because it throws error
For example
docker-compose.yml
# pg-amqp-bridge instance is responsible for forwarding NOTIFY events in PostgreSQL
# to RabbitMQ based on the BRIDGE_CHANNELS configuration
pg_amqp_bridge:
image: subzerocloud/pg-amqp-bridge
command: |
sh -c "
waitforit -host=$$POSTGRES_HOST -port=$$POSTGRES_PORT -timeout=10 &&
export POSTGRESQL_URI=postgres://$$POSTGRES_USER:$$POSTGRES_PASSWORD@$$POSTGRES_HOST:$$POSTGRES_PORT/$$POSTGRES_DB &&
export AMQP_URI=amqp://$$RABBITMQ_DEFAULT_USER:$$RABBITMQ_DEFAULT_PASS@rabbitmq// &&
exec pg-amqp-bridge
"
depends_on:
- rabbitmq
- postgres
environment:
POSTGRES_USER: app_admin
POSTGRES_PASSWORD: app_admin_pass
POSTGRES_HOST: postgres
POSTGRES_PORT: 5432
POSTGRES_DB: gb_dev
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: adminpass
RUST_LOG: info # output forwarded messages
volumes:
- ../scripts/waitforit:/bin/waitforit:ro
restart: unless-stopped
migrate.sh
#!/bin/bash
#
# stolen from https://gist.github.com/fforbeck/868462c0f7664d92e19e
#
type rabbitmqctl > /dev/null 2>&1 || { echo >&2 "rabbitmqctl is required but it is not installed. Aborting."; exit 1; }
type rabbitmqadmin > /dev/null 2>&1 || { echo >&2 "rabbitmqadmin is required but it is not installed. Aborting."; exit 1; }
VHOST="/"
QUEUE="mails_queue"
set_queues() {
rabbitmqadmin declare queue --vhost=$VHOST --user=$RABBITMQ_DEFAULT_USER --password=$RABBITMQ_DEFAULT_PASS name=$QUEUE durable=true
}
set_queues
echo ""
rabbitmqctl list_queues -p $VHOST
echo ""
echo "Rabbitmq configured with success."
sql
select rabbitmq.send_message('', 'mails_queue', 'Hi!');
throws error
postgres_1 | 2018-11-26 08:38:27.399 UTC [90] ERROR: channel name cannot be empty
postgres_1 | 2018-11-26 08:38:27.399 UTC [90] CONTEXT: SQL function "send_message" statement 1
postgres_1 | SQL statement "SELECT rabbitmq.send_message('', 'mails_queue', 'Hi!')"
This could be fixed by binding channel with some name to exchange with name ''
, but it throws error
for example
BRIDGE_CHANNELS: amq_default:""
error
rabbitmq_1 | operation exchange.declare caused a channel exception not_found: no exchange '""' in vhost '/'
BRIDGE_CHANNELS: "amq_default:"
error
pg_amqp_bridge_1 | thread 'main' panicked at 'No bindings(e.g. pgchannel1:queue1) specified in "amq_default:"', src/lib.rs:162:4
Proposal
thread 'main' panicked at 'No bindings(e.g. pgchannel1:queue1)
, just bind to nameless exchangeamq_default
that bonded to nameless exchangeI got a issue when i'm connectiong the bridge towards a cluster of rabbitmq behind a proxy.
ERROR:amqp::session: Error in reading loop: Protocol("failed to fill whole buffer")
ERROR:amqp::session: Error dispatching closing packet to a channel
This error seems to happen because the connection was closed. I would like to know if the heartbeat system offer by Rabbitmq is implemented https://www.rabbitmq.com/heartbeats.html
Can messages be lost when the connection between PostgreSQL and pg-amqp-bridge service is down?
This is something important to be discussed.
OpenSSL incorrect version.
./pg-amqp-bridge
./pg-amqp-bridge: error while loading shared libraries: libssl.so.1.0.0: cannot open shared object file: No such file or directory
[root@iz2ze9d7x8qidftw27oxqsz bin]# find / -name libssl.so.1.0.*
/usr/lib64/libssl.so.1.0.2k
[root@iz2ze9d7x8qidftw27oxqsz bin]# ln -s /usr/lib64/libssl.so.1.0.2k /usr/lib64/libssl.so.1.0.0
[root@iz2ze9d7x8qidftw27oxqsz bin]# ./pg-amqp-bridge
./pg-amqp-bridge: error while loading shared libraries: libcrypto.so.1.0.0: cannot open shared object file: No such file or directory
[root@iz2ze9d7x8qidftw27oxqsz bin]# find / -name libcrypto.so.1.0.*
/usr/lib64/libcrypto.so.1.0.2k
[root@iz2ze9d7x8qidftw27oxqsz bin]# ln -s /usr/lib64/libcrypto.so.1.0.2k /usr/lib64/libcrypto.so.1.0.0
[root@iz2ze9d7x8qidftw27oxqsz bin]# ./pg-amqp-bridge
./pg-amqp-bridge: /lib64/libcrypto.so.1.0.0: version OPENSSL_1.0.0' not found (required by ./pg-amqp-bridge) ./pg-amqp-bridge: /lib64/libc.so.6: version
GLIBC_2.18' not found (required by ./pg-amqp-bridge)
./pg-amqp-bridge: /lib64/libssl.so.1.0.0: version `OPENSSL_1.0.0' not found (required by ./pg-amqp-bridge)
I don't know what to do now.
Hello!
Is it possible to sent message to rabbit without NOTIFY?
PostgreSQL's NOTIFY has 6000 chars limit.
I experienced no more responses from pg-amqp-bridge
after 15 minutes or 900 seconds of NOTIFY
inactivity. Having found moby/moby#31208 I set the following setting on my PostgreSQL service in the compose.yml
:
postgres:
...
image: postgres:12.5-alpine@sha256:eee6f89fab183ebae62ad976722e3c2c1d201e73916af664d6c8fbfe9fe071fd
...
sysctls:
- net.ipv4.tcp_keepalive_time=600 # Prevent Docker Swarm from killing pg-amqp-bridge connections (https://github.com/moby/moby/issues/31208)
...
Just want to leave this here in case it helps someone else :)
I have PostgreSQL installed on m y local machine on port:5433 , it is running, also I run RabbitMQ docker and then run pg-amp-bridge-docker
docker run --rm -it --net=host -e POSTGRESQL_URI="postgres://postgres@localhost:5433" -e AMQP_URI="amqp://localhost//" -e BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" subzerocloud/pg-amqp-bridge
Attempting to connect to PostgreSQL..
Error { kind: Connect, cause: Some(Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }) }
Retrying the PostgreSQL connection in 1 seconds..
...
what is incorrect? how to fix?
ERROR:amqp::session: Error in reading loop: Protocol("Connection reset by peer (os error 104)")
ERROR:amqp::session: Error dispatching closing packet to a channel
ERROR:amqp::session: Error dispatching closing packet to a channel
... waits for next notify
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: IoError(BrokenPipe)', /checkout/src/libcore/result.rs:906:4
note: Run with `RUST_BACKTRACE=1` for a backtrace.
Do you have any recommendations on a strategy to most reliably guarantee notify messages make it to rabbit?
When i restart rabbitMQ the bridge hangs until the next notify event at which time it terminates. Which means if I restart rabbitMQ i must restart the bridge otherwise I am guaranteed to miss 1 event at minimum.
Would it make sense to terminate as soon as the "Connection reset by peer" error is thrown, or just initiate the retry procedure I saw running startup?
In production one does not really need this line
https://github.com/subzerocloud/pg-amqp-bridge/blob/master/src/lib.rs#L109
add a --verbose flag and print that info only when it's enabled,
also the format should be a bit more short, like
CHANNEL -> EXCHANGE|ROUTING_KEY MESSAGE
Hi,
I can't to connect to my PostgreSQL database (running pg-amqp-bridge
with PostgREST).
This is the error I get:
thread '<unnamed>' panicked at 'Could not connect to PostgreSQL: Io(Error { repr: Custom(Custom { kind: InvalidInput, error: StringError("invalid message length") }) })', /checkout/src/libcore/result.rs:859
Here is the backtrace:
0: std::sys::imp::backtrace::tracing::imp::unwind_backtrace
at ./checkout/src/libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::_print
at ./checkout/src/libstd/sys_common/backtrace.rs:71
2: std::panicking::default_hook::{{closure}}
at ./checkout/src/libstd/sys_common/backtrace.rs:60
at ./checkout/src/libstd/panicking.rs:355
3: std::panicking::default_hook
at ./checkout/src/libstd/panicking.rs:371
4: std::panicking::rust_panic_with_hook
at ./checkout/src/libstd/panicking.rs:549
5: std::panicking::begin_panic
at ./checkout/src/libstd/panicking.rs:511
6: std::panicking::begin_panic_fmt
at ./checkout/src/libstd/panicking.rs:495
7: rust_begin_unwind
at ./checkout/src/libstd/panicking.rs:471
8: core::panicking::panic_fmt
at ./checkout/src/libcore/panicking.rs:69
9: core::result::unwrap_failed
10: pg_amqp_bridge::spawn_listener_publisher::{{closure}}
11: std::panicking::try::do_call
12: __rust_maybe_catch_panic
at ./checkout/src/libpanic_unwind/lib.rs:98
13: <F as alloc::boxed::FnBox<A>>::call_box
14: std::sys::imp::thread::Thread::new::thread_start
at ./checkout/src/liballoc/boxed.rs:650
at ./checkout/src/libstd/sys_common/thread.rs:21
at ./checkout/src/libstd/sys/unix/thread.rs:84
15: start_thread
16: clone
Could someone please point me in the right direction?
Hi,
i'm currently evaluating this project in one of ours, but i got a question .
how are you planning to work on cases where HA is needed ?
We should need only one bridge working on an instance, and as long as the main bridge is out for some time, a second one should be up. Have you already work on this kind of uses cases ?
Thanks
Xavier
There's a way to use this library in a windows platform?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.