mkabilov / pg2ch Goto Github PK
View Code? Open in Web Editor NEWData streaming from postgresql to clickhouse via logical replication mechanism
License: MIT License
Data streaming from postgresql to clickhouse via logical replication mechanism
License: MIT License
Skip columns which are added on postgresql side after initial sync.
(that will involve tracking the RELATION message on logical replication protocol)
in case one replicates a subset of columns
Is there a way to safely stop pg2ch replication?
I need to run pg2ch for few minutes each hour and then to stop it to avoid keeping connections open on PostgreSQL. Is there a way to do that rather than the kill command?
When I try
go get -u github.com/mkabilov/pg2ch
I will got:
# github.com/mkabilov/pg2ch/pkg/utils/kvstorage
root/go/pkg/mod/github.com/mkabilov/[email protected]/pkg/utils/kvstorage/diskv.go:52:41: s.storage.ReadString undefined (type *diskv.Diskv has no field or method ReadString)
root/go/pkg/mod/github.com/mkabilov/[email protected]/pkg/utils/kvstorage/diskv.go:60:18: s.storage.WriteString undefined (type *diskv.Diskv has no field or method WriteString)
After following the readme
example, I end up with:
In Clickhouse:
SELECT SUM(abalance * sign), SUM(sign), count(0) FROM pgbench_accounts;
SELECT
SUM(abalance * sign),
SUM(sign),
count(0)
FROM pgbench_accounts
┌─SUM(multiply(abalance, sign))─┬─SUM(sign)─┬─count(0)─┐
│ -931490 │ 0 │ 49744 │
└───────────────────────────────┴───────────┴──────────┘
And in Postgres:
pg2ch_test=# SELECT SUM(abalance), COUNT(*) FROM pgbench_accounts;
sum | count
---------+--------
-931490 | 100000
(1 row)
I see that the sums are the same but the counts are different. Is this because only changes are replicated and, thus, only rows that underwent a change are represented in Clickhouse?
@mkabilov Hi Murat, thanks for creating a useful tool! It runs successfully (replicates the rows from PG to CH) but the timeouts (doesn't keep listening for new inserts) after 10s. See log below. Any ideas why this is happening or how can it be fixed?
I'm using PG v14.2 and CH v22.2.3.5.
pg2ch $ go run main.go --config config.yaml
2022/02/28 17:57:12 consuming changes for table testtable starting from 0/1C7B250 lsn position
2022/02/28 17:57:12 generation_id: 5
2022/02/28 17:57:12 Starting from 0/1C7B250 lsn
2022/02/28 17:57:12 Primary Keepalive Message => ServerWALEnd: 0/1C7A520 ServerTime: 2022-02-28 17:57:12.383815 -0500 EST ReplyRequested: false
2022/02/28 17:57:12 XLogData => WALStart 0/1C7A670 ServerWALEnd 0/1C7A670 ServerTime: 2022-02-28 17:57:12.414696 -0500 EST WALData size 21
2022/02/28 17:57:12 XLogData => WALStart 0/0 ServerWALEnd 0/0 ServerTime: 2022-02-28 17:57:12.415047 -0500 EST WALData size 326
2022/02/28 17:57:12 XLogData => WALStart 0/1C7A670 ServerWALEnd 0/1C7A670 ServerTime: 2022-02-28 17:57:12.415365 -0500 EST WALData size 325
2022/02/28 17:57:12 XLogData => WALStart 0/1C7B280 ServerWALEnd 0/1C7B280 ServerTime: 2022-02-28 17:57:12.4154 -0500 EST WALData size 26
2022/02/28 17:57:12 Primary Keepalive Message => ServerWALEnd: 0/1C7B2B8 ServerTime: 2022-02-28 17:57:12.419801 -0500 EST ReplyRequested: false
2022/02/28 17:57:22 replication failed: receive message failed: read tcp [::1]:64963->[::1]:5432: i/o timeout
It looks like the error is generating on
pg2ch/pkg/consumer/consumer.go
Line 147 in fb8b6ff
in case of setting up replication for child tables of the partitioned table it will be useful to specify pattern for tables, i.e. instead of :
partitions.report_2017_03_01:
main_table: report
engine: CollapsingMergeTree
init_sync_skip_truncate: true
partitions.report_2017_04_01:
main_table: report
engine: CollapsingMergeTree
init_sync_skip_truncate: true
partitions.report_2017_05_01:
main_table: report
engine: CollapsingMergeTree
init_sync_skip_truncate: true
partitions.report_2017_01_01:
main_table: report
engine: CollapsingMergeTree
init_sync_skip_truncate: true
write something like that:
partitions.report_*:
main_table: report
engine: CollapsingMergeTree
init_sync_skip_truncate: true
TBD
if multiple table are updated in a single transaction, the merge won't be synched among them.
so the state on the ClickHouse side will be inconsistent.
that is needed to avoid full copying of the tables on each start.
by state LSN position is meant
I am getting this error when I run pg2ch command.
The script started working fine, the synchronization started, and then I got this error:
could not start: could not sync tables: could not sync $TABLE_NAME: could not commit transaction: driver: bad connection
Project needs to be covered with tests, both integration and unit
if failed to insert to Clickhouse, retry again
otherwise one theoretically can get "ERROR: snapshot too old (SQLSTATE 72000)" error message
Hello.
We're using a 2 node cluster of ClickHouse with CollapsingMergeTree engine tables.
We're just testing out the pg2ch plugin, and getting problems with updates for this engine. CH generates warnings about wrong number of rows with signs 1, -1, sometimes. As we noticed, this happens quite often when pg2ch receives a whole bunch of DML operations corresponding to the same unique primary id.
The error looks like:
2019.09.24 13:42:14.484495 [ 17 ] {} <Warning> CollapsingSortedBlockInputStream: Incorrect data: number of rows with sign = 1 (4) differs with number of rows with sign = -1 (1) by more than one (for key: 1219488).
Row from the logs above had 12 rows corresponding to it right after the warning, and the strange thing it actually collapsed to 2 rows after a while.
Can you help with this issue?
How do we sync composite fields on PG to nested fields on CH?
Since this project is using Logical Replication and we have limitation of number of slots in our PostgreSQL instances what we should do about databases with lots of table inside?
I think using Physical Replication can handle this problem by making just a single Slot
And also Physical Slots are more reliable
There is a little mistake in readme guide. In the description of the configuration file there is 'pg' key for the postgres connection and etc. This error follows:
root@9529452bd859:/home/go/pg2ch# go run main.go --config config.yaml
could not load config: publication name is not specified
exit status 1
If you look in the source file (pkg\config\config.go) there is different key 'postgres'
And if replace 'pg' with 'postgres', it works correctly.
Hello Murat,
According to the Readme file in the project description, it is not ready for production.
What tasks are yet to be done?
Or what possible caveates should be kept in mind while working with it?
Thank you!
Hello! I have problem while sending data from PG partitioned table to CH MergeTree table over buffer table with Memory Engine.
Here is full body of ERROR
could not start: could not sync tables: could not create temporary replication slot: could not scan: ERROR: syntax error (SQLSTATE 42601)
PG logs onlys shows ERROR: syntax error
approx number of rows can be taken from pg's statistics table
Initial sync for very large and wide(500columns) table will fail
DB::Exception: Memory limit (for query) exceeded: would use 9.31 GiB (attempt to allocate chunk of 8388608 bytes)
After discover, it cause by clickhouse driver default block_size=1000000.
When the table is too wide, 1million rows will execeed Memory limit 10G.
Add a setting for 'block_size' will solve this.
right now the global LSN position is stored.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.