conduitio / conduit-connector-postgres Goto Github PK
View Code? Open in Web Editor NEWConduit connector for PostgreSQL
License: Apache License 2.0
Conduit connector for PostgreSQL
License: Apache License 2.0
When using logical replication mode it should be possible to create a snapshot first.
readme should be updated to match the new connector behavior
Tests are currently running against Postgres v14, we should configure the CI to run against older versions as well (we should also decide which ones we want to support).
Currently we have a CDC mode which uses PostgreSQL's WAL. We'd also like to have a CDC mode which uses PostgreSQL's triggers.
Postghres Destination connector can't insert data with the "key" in lowercase
Example:
if the "key" in lowercase is used in the configuration in the source connector. And in the payload, the "key" is transferred in the uppercase.
Logs:
2022-05-12T07:52:41+00:00 ERR acker node stopped without processing all messages error="dropped 3 messages when stopping acker node" component=AckerNode node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker pipeline_id=224bf6bd-5cef-4957-a181-880910f41d55 stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}] 2022-05-12T07:52:41+00:00 ERR node stopped error="node 7de377e6-efe3-45de-acc7-f39695fc5e76-acker stopped with error: error receiving ack: insert exec failed: ERROR: column "id" specified more than once (SQLSTATE 42701)" component=pipeline.Service node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
Postgres table in which data exists can't be upsert unless "key" is specified in the config when creating Postgres Destination connector (request: /v1/connectors). Even if the "key" field is present in the metadata
Steps to reproduce
2022-05-12T13:56:39+00:00 ERR node stopped error="node 9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stopped with error: error receiving ack: ERROR: duplicate key value violates unique constraint "table2_pkey" (SQLSTATE 23505)" component=pipeline.Service node_id=9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
The specifications currently don't match the actual expected config one-to-one, we need to adjust this.
[Postgres source] When we create a source via UI and don't specify "key" then an error will fall when starting the pipeline.
Reason: An error is occurred if the Postgres table isn't used primary key. But we can't specify a custom "key" in the UI when creating a connector.
Advice for improve: Need to add the "key" to the UI as an optional field of the config
Logs:
2022-05-09T10:44:26+00:00 ERR node stopped error="node 01f1505f-a15c-40c5-a9b8-ab50c056019a stopped with error: could not open source connector: failed to create logical replication iterator: failed to setup subscription: failed to find key for table users (try specifying it manually): getKeyColumn query failed: no rows in result set" component=pipeline.Service node_id=01f1505f-a15c-40c5-a9b8-ab50c056019a stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode.Run","line":54}]
Add acceptance tests
source file doesn't have tests
A Source
will redeliver a previously ACKed message on reconnect.
The connector uses the LSN at the Position and passes that when ACKing a message. This seems to differ from the example provided in jackc/pglogrepl
where the ACKed LSN is the offset from the WALStart
to the end of the message:
https://github.com/jackc/pglogrepl/blob/master/example/pglogrepl_demo/main.go#L112
Source
Source
Sample app to consume Source
:
package main
import (
"context"
"encoding/json"
"os"
"github.com/conduitio/conduit-connector-postgres/source"
)
func main() {
ctx := context.Background()
sourceConfig := map[string]string{
"url": os.Getenv("DB_URL"),
"table": os.Getenv("DB_TABLE"),
"key": os.Getenv("DB_TABLE_KEY"),
"columns": "",
"snapshot_mode": "never",
"cdc_mode": "logrepl",
"logrepl.publicationName": os.Getenv("DB_PUBLICATION"),
"logrepl.slotName": os.Getenv("DB_SLOT_NAME"),
}
s := source.NewSource()
if err := s.Configure(ctx, sourceConfig); err != nil {
panic(err)
}
if err := s.Open(ctx, nil); err != nil {
panic(err)
}
defer s.Teardown(ctx)
for {
record, err := s.Read(ctx)
if err != nil {
panic(err)
}
msg, err := json.Marshal(record.Payload)
if err != nil {
panic(err)
}
if err := s.Ack(ctx, record.Position); err != nil {
panic(err)
}
println(string(msg))
}
}
Occasionally running the tests for the Postgres connector results in an interface conversion panic. This is likely due to pglogrepl messages being received either out of order or at unexpected times.
=== RUN TestRelationSetAllTypes
--- FAIL: TestRelationSetAllTypes (0.14s)
panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage [recovered]
panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage
goroutine 11 [running]:
testing.tRunner.func1.2({0x1a41de0, 0xc00033c570})
/usr/local/go/src/testing/testing.go:1389 +0x366
testing.tRunner.func1()
/usr/local/go/src/testing/testing.go:1392 +0x5d2
panic({0x1a41de0, 0xc00033c570})
/usr/local/go/src/runtime/panic.go:844 +0x258
github.com/conduitio/conduit-connector-postgres/source/logrepl/internal.TestRelationSetAllTypes(0xc0000036c0)
/Users/dylanlott/dev/meroxa/conduit-connector-postgres/source/logrepl/internal/relationset_test.go:58 +0x61b
testing.tRunner(0xc0000036c0, 0x1b48c08)
/usr/local/go/src/testing/testing.go:1439 +0x214
created by testing.(*T).Run
/usr/local/go/src/testing/testing.go:1486 +0x725
FAIL github.com/conduitio/conduit-connector-postgres/source/logrepl/internal 1.480s
make run
TestRelationSetAllTypes
test occasionally fails with a failed interface type conversion.v0.1.0
In some cases, the connector configuration can be changed to stream additional tables.
To support this, the publication will need to be updated to add the new tables.
Additionally, the new table should be snapshotted before logrepl has been resumed.
[Postghres source] When we create a source via UI we don't have the ability to specify the Postgres table which is used as the destination. An error will occur after starting the pipeline (see logs).
Advice for improvement: Need to add the "table" to the UI as a required field of the config.
Logs:
2022-05-09T11:08:01+00:00 ERR node stopped error="node 220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stopped with error: error receiving ack: no table provided for default writes" component=pipeline.Service node_id=220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
It would be great if the connector exposed the support for more than one table at a time. At the moment, the number of tables dictates the number of replication slots which is not ideal.
Logs:
2022-05-25T08:00:16+00:00 ERR acker node stopped without processing all messages error="dropped 1 messages when stopping acker node" component=AckerNode node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker pipeline_id=0cacf098-fe7c-43c1-89d1-63365631108a stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}]
2022-05-25T08:00:16+00:00 ERR node stopped error="node 763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stopped with error: error receiving ack: failed to get key: invalid character 'p' looking for beginning of value" component=pipeline.Service node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.