conduitio / conduit-connector-postgres Goto Github PK
View Code? Open in Web Editor NEWConduit connector for PostgreSQL
License: Apache License 2.0
Conduit connector for PostgreSQL
License: Apache License 2.0
Identifiers and literals are not fully sanitized or escaped. This can lead to a connector error.
Use PostgreSQL format or quote_ident function sanitize these. It allows for a full query or an individual string.
https://www.postgresql.org/docs/current/functions-string.html#FUNCTIONS-STRING-FORMAT
Postgres table in which data exists can't be upsert unless "key" is specified in the config when creating Postgres Destination connector (request: /v1/connectors). Even if the "key" field is present in the metadata
Steps to reproduce
2022-05-12T13:56:39+00:00 ERR node stopped error="node 9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stopped with error: error receiving ack: ERROR: duplicate key value violates unique constraint "table2_pkey" (SQLSTATE 23505)" component=pipeline.Service node_id=9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
source file doesn't have tests
Tests are currently running against Postgres v14, we should configure the CI to run against older versions as well (we should also decide which ones we want to support).
As the title implies.
It would be great if the connector exposed the support for more than one table at a time. At the moment, the number of tables dictates the number of replication slots which is not ideal.
Replication slots can be reused or preconfigured. An issue can occur where the replication slot was replaced during runtime, or the replication slot was preconfigured and modified during connector downtown. In this situation a position can contain an LSN which points to WAL which was cleaned up.
The solution is to ensure that the position LSN is within the replication slot range during open, when the replication slot exists.
latest
Occasionally running the tests for the Postgres connector results in an interface conversion panic. This is likely due to pglogrepl messages being received either out of order or at unexpected times.
=== RUN TestRelationSetAllTypes
--- FAIL: TestRelationSetAllTypes (0.14s)
panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage [recovered]
panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage
goroutine 11 [running]:
testing.tRunner.func1.2({0x1a41de0, 0xc00033c570})
/usr/local/go/src/testing/testing.go:1389 +0x366
testing.tRunner.func1()
/usr/local/go/src/testing/testing.go:1392 +0x5d2
panic({0x1a41de0, 0xc00033c570})
/usr/local/go/src/runtime/panic.go:844 +0x258
github.com/conduitio/conduit-connector-postgres/source/logrepl/internal.TestRelationSetAllTypes(0xc0000036c0)
/Users/dylanlott/dev/meroxa/conduit-connector-postgres/source/logrepl/internal/relationset_test.go:58 +0x61b
testing.tRunner(0xc0000036c0, 0x1b48c08)
/usr/local/go/src/testing/testing.go:1439 +0x214
created by testing.(*T).Run
/usr/local/go/src/testing/testing.go:1486 +0x725
FAIL github.com/conduitio/conduit-connector-postgres/source/logrepl/internal 1.480s
make run
TestRelationSetAllTypes
test occasionally fails with a failed interface type conversion.v0.1.0
Postghres Destination connector can't insert data with the "key" in lowercase
Example:
if the "key" in lowercase is used in the configuration in the source connector. And in the payload, the "key" is transferred in the uppercase.
Logs:
2022-05-12T07:52:41+00:00 ERR acker node stopped without processing all messages error="dropped 3 messages when stopping acker node" component=AckerNode node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker pipeline_id=224bf6bd-5cef-4957-a181-880910f41d55 stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}] 2022-05-12T07:52:41+00:00 ERR node stopped error="node 7de377e6-efe3-45de-acc7-f39695fc5e76-acker stopped with error: error receiving ack: insert exec failed: ERROR: column "id" specified more than once (SQLSTATE 42701)" component=pipeline.Service node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
In some cases, the connector configuration can be changed to stream additional tables.
To support this, the publication will need to be updated to add the new tables.
Additionally, the new table should be snapshotted before logrepl has been resumed.
The connector always picks version 1 of the proto, however the protocol has evolved over the years.
As per proto versions defined here
Protocol version. Currently versions 1, 2, 3, and 4 are supported. A valid version is required.
Version 2 is supported only for server version 14 and above, and it allows streaming of large in-progress transactions.
Version 3 is supported only for server version 15 and above, and it allows streaming of two-phase commits.
Version 4 is supported only for server version 16 and above, and it allows streams of large in-progress transactions to be applied in parallel.
When the connector is started with a publication that is pre-created, the published tables are not matched to the configured tables. If a replication slot is created on the publication it can produce records for an unexpected tables or none of the expected tables.
latest
Add acceptance tests
Currently we have a CDC mode which uses PostgreSQL's WAL. We'd also like to have a CDC mode which uses PostgreSQL's triggers.
[Postgres source] When we create a source via UI and don't specify "key" then an error will fall when starting the pipeline.
Reason: An error is occurred if the Postgres table isn't used primary key. But we can't specify a custom "key" in the UI when creating a connector.
Advice for improve: Need to add the "key" to the UI as an optional field of the config
Logs:
2022-05-09T10:44:26+00:00 ERR node stopped error="node 01f1505f-a15c-40c5-a9b8-ab50c056019a stopped with error: could not open source connector: failed to create logical replication iterator: failed to setup subscription: failed to find key for table users (try specifying it manually): getKeyColumn query failed: no rows in result set" component=pipeline.Service node_id=01f1505f-a15c-40c5-a9b8-ab50c056019a stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode.Run","line":54}]
Currently, it's not possible to use long polling in CDC. It was left as a placeholder in the configuration and the docs, but never implemented. We should remove it, because it's confusing to users.
The implementation of long polling CDC will be tracked here: #150.
When using logical replication mode it should be possible to create a snapshot first.
Logs:
2022-05-25T08:00:16+00:00 ERR acker node stopped without processing all messages error="dropped 1 messages when stopping acker node" component=AckerNode node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker pipeline_id=0cacf098-fe7c-43c1-89d1-63365631108a stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}]
2022-05-25T08:00:16+00:00 ERR node stopped error="node 763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stopped with error: error receiving ack: failed to get key: invalid character 'p' looking for beginning of value" component=pipeline.Service node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
readme should be updated to match the new connector behavior
[Postghres source] When we create a source via UI we don't have the ability to specify the Postgres table which is used as the destination. An error will occur after starting the pipeline (see logs).
Advice for improvement: Need to add the "table" to the UI as a required field of the config.
Logs:
2022-05-09T11:08:01+00:00 ERR node stopped error="node 220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stopped with error: error receiving ack: no table provided for default writes" component=pipeline.Service node_id=220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
Attempted to replicate data records from a table named orders
to another table in the same database and schema. I assumed that I could accomplish this by setting the destination configuration table
value to orders_moved
(which did not yet exist).
I expected the destination connector to check for orders_moved
and if it did not exist, to create it, and begin replicating data records into the new table created orders_moved
. Instead, I saw no results. The destination connector flipped to degraded
. I looked up the logs on our platform to find the following error:
error while nacking message: failed to write message to DLQ: DLQ nack threshold exceeded (0/1), original error: failed to execute query for record 0: ERROR: relation "orders_moved" does not exist (SQLSTATE 42P01)
I had to create the table orders_moved
in the database in order for records to replicate successfully.
This was just a simple example.
Which begs the question: How would this work with multiple tables? Could there be a _prefix
tag in the destination config? I would expect some way to be able to differentiate the table in the event the table might already exist in the destination.
table
value to a table of choice that does not exist.v0.7.3
When a table has a numeric
type, the connector will fail to protobuf encode the value, since it is not a plugin supported type. The connector will need to coerce the value to its proper kind.
2024-05-29T13:27:35+00:00 ERR pipeline stopped error="node pg-to-file:pg.in stopped with error: source stream was stopped unexpectedly: error reading from source: read stream error: error converting payload: error converting after: proto: invalid type: pgtype.Numeric
numerc(12,2)
type and load some data.latest
The specifications currently don't match the actual expected config one-to-one, we need to adjust this.
Use perm replication slots when logical replication is used. This will ensure WAL is retained in times when the pipeline is not running for whatever reason (paused, stopped etc).
Additionally add an option to auto clean any left over replication slots and publications.
A Source
will redeliver a previously ACKed message on reconnect.
The connector uses the LSN at the Position and passes that when ACKing a message. This seems to differ from the example provided in jackc/pglogrepl
where the ACKed LSN is the offset from the WALStart
to the end of the message:
https://github.com/jackc/pglogrepl/blob/master/example/pglogrepl_demo/main.go#L112
Source
Source
Sample app to consume Source
:
package main
import (
"context"
"encoding/json"
"os"
"github.com/conduitio/conduit-connector-postgres/source"
)
func main() {
ctx := context.Background()
sourceConfig := map[string]string{
"url": os.Getenv("DB_URL"),
"table": os.Getenv("DB_TABLE"),
"key": os.Getenv("DB_TABLE_KEY"),
"columns": "",
"snapshot_mode": "never",
"cdc_mode": "logrepl",
"logrepl.publicationName": os.Getenv("DB_PUBLICATION"),
"logrepl.slotName": os.Getenv("DB_SLOT_NAME"),
}
s := source.NewSource()
if err := s.Configure(ctx, sourceConfig); err != nil {
panic(err)
}
if err := s.Open(ctx, nil); err != nil {
panic(err)
}
defer s.Teardown(ctx)
for {
record, err := s.Read(ctx)
if err != nil {
panic(err)
}
msg, err := json.Marshal(record.Payload)
if err != nil {
panic(err)
}
if err := s.Ack(ctx, record.Position); err != nil {
panic(err)
}
println(string(msg))
}
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.