Giter Site home page Giter Site logo

conduit-connector-postgres's Issues

Postgres table in which data exists can't be upsert unless "key" is specified in the config when creating

Postgres table in which data exists can't be upsert unless "key" is specified in the config when creating  Postgres Destination connector (request: /v1/connectors). Even if the "key" field is present in the metadata
Steps to reproduce

  1. There is a table with data with the  primary "key" in Postgres
  2. Create a new pipeline
  3. Create Postgres destination connector with configs
    "settings": {
    "url": "postgresql://postgres:postgres@localhost:5432/postgres",
    table: "",
    "key": ""
    }
  4. Create Source connector that transfers the "key" in metadata (For example, Postgres or Snowflake Source)
  5. Run the pipeline -> the system is trying to transfer data by "key" which already exists
    Actual result: data isn't transferred to the Postgres table
    Expected result: The data should be updated by "key" from the metadata.

2022-05-12T13:56:39+00:00 ERR node stopped error="node 9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stopped with error: error receiving ack: ERROR: duplicate key value violates unique constraint "table2_pkey" (SQLSTATE 23505)" component=pipeline.Service node_id=9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

Feature: Support multiple tables

Feature description

It would be great if the connector exposed the support for more than one table at a time. At the moment, the number of tables dictates the number of replication slots which is not ideal.

Bug: position can be out of range of replication slot

Bug description

Replication slots can be reused or preconfigured. An issue can occur where the replication slot was replaced during runtime, or the replication slot was preconfigured and modified during connector downtown. In this situation a position can contain an LSN which points to WAL which was cleaned up.

The solution is to ensure that the position LSN is within the replication slot range during open, when the replication slot exists.

Steps to reproduce

  1. Create table t1
  2. Start connector in CDC mode for table t1 and replication slot name slot_t1
  3. Insert changes in t1
  4. Stop connector
  5. Delete replication slot and re-create it.
  6. Insert few more changes in t1
  7. Resume connector

Version

latest

Bug: TestRelationSetAllTypes is flakey

Bug description

Occasionally running the tests for the Postgres connector results in an interface conversion panic. This is likely due to pglogrepl messages being received either out of order or at unexpected times.

=== RUN   TestRelationSetAllTypes
--- FAIL: TestRelationSetAllTypes (0.14s)
panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage [recovered]
        panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage

goroutine 11 [running]:
testing.tRunner.func1.2({0x1a41de0, 0xc00033c570})
        /usr/local/go/src/testing/testing.go:1389 +0x366
testing.tRunner.func1()
        /usr/local/go/src/testing/testing.go:1392 +0x5d2
panic({0x1a41de0, 0xc00033c570})
        /usr/local/go/src/runtime/panic.go:844 +0x258
github.com/conduitio/conduit-connector-postgres/source/logrepl/internal.TestRelationSetAllTypes(0xc0000036c0)
        /Users/dylanlott/dev/meroxa/conduit-connector-postgres/source/logrepl/internal/relationset_test.go:58 +0x61b
testing.tRunner(0xc0000036c0, 0x1b48c08)
        /usr/local/go/src/testing/testing.go:1439 +0x214
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:1486 +0x725
FAIL    github.com/conduitio/conduit-connector-postgres/source/logrepl/internal 1.480s

Steps to reproduce

  1. Run make run
  2. The TestRelationSetAllTypes test occasionally fails with a failed interface type conversion.

Version

v0.1.0

[Postgres Destination] connector can't insert data with the "key" in lowercase

Postghres Destination connector can't insert data with the "key" in lowercase
Example:
if the "key" in lowercase is used in the configuration in the source connector. And in the payload, the "key" is transferred in the uppercase.

Logs:
2022-05-12T07:52:41+00:00 ERR acker node stopped without processing all messages error="dropped 3 messages when stopping acker node" component=AckerNode node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker pipeline_id=224bf6bd-5cef-4957-a181-880910f41d55 stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}] 2022-05-12T07:52:41+00:00 ERR node stopped error="node 7de377e6-efe3-45de-acc7-f39695fc5e76-acker stopped with error: error receiving ack: insert exec failed: ERROR: column "id" specified more than once (SQLSTATE 42701)" component=pipeline.Service node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

Feature: Pick the latest logical replication proto version based on server version

Feature description

The connector always picks version 1 of the proto, however the protocol has evolved over the years.

As per proto versions defined here

Protocol version. Currently versions 1, 2, 3, and 4 are supported. A valid version is required.

Version 2 is supported only for server version 14 and above, and it allows streaming of large in-progress transactions.
Version 3 is supported only for server version 15 and above, and it allows streaming of two-phase commits.
Version 4 is supported only for server version 16 and above, and it allows streams of large in-progress transactions to be applied in parallel.

Bug: ensure existing publications publish for the configured tables

Bug description

When the connector is started with a publication that is pre-created, the published tables are not matched to the configured tables. If a replication slot is created on the publication it can produce records for an unexpected tables or none of the expected tables.

Steps to reproduce

  1. Create tables t1, t2, t3
  2. Create pub_t1_t2 publication for t1, t2
  3. Start connector configured to use pub_t1_t2 but table t3
  4. See records changes be published.

Version

latest

Add a trigger CDC mode

Feature description

Currently we have a CDC mode which uses PostgreSQL's WAL. We'd also like to have a CDC mode which uses PostgreSQL's triggers.

[Postgres source] When we create a source via UI and don't specify "key" then an error will fall when starting the pipeline.

[Postgres source] When we create a source via UI and don't specify "key" then an error will fall when starting the pipeline.
Reason: An error is occurred if the Postgres table isn't used primary key. But we can't specify a custom "key" in the UI when creating a connector.
Advice for improve: Need to add the "key" to the UI as an optional field of the config

Logs:
2022-05-09T10:44:26+00:00 ERR node stopped error="node 01f1505f-a15c-40c5-a9b8-ab50c056019a stopped with error: could not open source connector: failed to create logical replication iterator: failed to setup subscription: failed to find key for table users (try specifying it manually): getKeyColumn query failed: no rows in result set" component=pipeline.Service node_id=01f1505f-a15c-40c5-a9b8-ab50c056019a stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode.Run","line":54}]

Remove CDC mode long_polling from configs and docs

Feature description

Currently, it's not possible to use long polling in CDC. It was left as a placeholder in the configuration and the docs, but never implemented. We should remove it, because it's confusing to users.

The implementation of long polling CDC will be tracked here: #150.

Data isn't transferred from S3 source to Postgres Destination. The error is occurred (see logs)

Logs:
2022-05-25T08:00:16+00:00 ERR acker node stopped without processing all messages error="dropped 1 messages when stopping acker node" component=AckerNode node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker pipeline_id=0cacf098-fe7c-43c1-89d1-63365631108a stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}]
2022-05-25T08:00:16+00:00 ERR node stopped error="node 763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stopped with error: error receiving ack: failed to get key: invalid character 'p' looking for beginning of value" component=pipeline.Service node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

[Postgres source] When we create a source via UI we don't have the ability to specify the Postgres table which is used as the destination.

[Postghres source] When we create a source via UI we don't have the ability to specify the Postgres table which is used as the destination. An error will occur after starting the pipeline (see logs).
Advice for improvement: Need to add the "table" to the UI as a required field of the config.

Logs:
2022-05-09T11:08:01+00:00 ERR node stopped error="node 220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stopped with error: error receiving ack: no table provided for default writes" component=pipeline.Service node_id=220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

Bug/Feature: PostgreSQL table name differentiation

Bug description

Attempted to replicate data records from a table named orders to another table in the same database and schema. I assumed that I could accomplish this by setting the destination configuration table value to orders_moved (which did not yet exist).

I expected the destination connector to check for orders_moved and if it did not exist, to create it, and begin replicating data records into the new table created orders_moved. Instead, I saw no results. The destination connector flipped to degraded. I looked up the logs on our platform to find the following error:

error while nacking message: failed to write message to DLQ: DLQ nack threshold exceeded (0/1), original error: failed to execute query for record 0: ERROR: relation "orders_moved" does not exist (SQLSTATE 42P01)

I had to create the table orders_moved in the database in order for records to replicate successfully.

This was just a simple example.

Which begs the question: How would this work with multiple tables? Could there be a _prefix tag in the destination config? I would expect some way to be able to differentiate the table in the event the table might already exist in the destination.

Steps to reproduce

  1. Create an application in the test tenant.
  2. Configure a PostgreSQL source connector with a table that contains data records.
  3. Configure a PostgreSQL destination connector, set table value to a table of choice that does not exist.
  4. Deploy the application.
  5. Check the Datadog logs for the test tenant.

Version

v0.7.3

Bug: CDC/Snapshot cannot handle numeric type

Bug description

When a table has a numeric type, the connector will fail to protobuf encode the value, since it is not a plugin supported type. The connector will need to coerce the value to its proper kind.

2024-05-29T13:27:35+00:00 ERR pipeline stopped error="node pg-to-file:pg.in stopped with error: source stream was stopped unexpectedly: error reading from source: read stream error: error converting payload: error converting after: proto: invalid type: pgtype.Numeric

Steps to reproduce

  1. Create table with numerc(12,2) type and load some data.
  2. Start a pipeline with a pg connector for the specific table.
  3. Should see error in description

Version

latest

Feature: Use perm replication slot for logrepl

Feature description

Use perm replication slots when logical replication is used. This will ensure WAL is retained in times when the pipeline is not running for whatever reason (paused, stopped etc).

Additionally add an option to auto clean any left over replication slots and publications.

Bug: Last N messages always redelivered on reconnect

Bug description

A Source will redeliver a previously ACKed message on reconnect.

The connector uses the LSN at the Position and passes that when ACKing a message. This seems to differ from the example provided in jackc/pglogrepl where the ACKed LSN is the offset from the WALStart to the end of the message:
https://github.com/jackc/pglogrepl/blob/master/example/pglogrepl_demo/main.go#L112

Steps to reproduce

  1. Create a publication
  2. Create a non-temporary slot
  3. Insert a value into the tracked table
  4. Consume slot using Source
  5. Kill the app after enough time has passed to ACK message.
  6. Restart the app to consume slot using Source

Sample app to consume Source:

package main

import (
	"context"
	"encoding/json"
	"os"

	"github.com/conduitio/conduit-connector-postgres/source"
)

func main() {
	ctx := context.Background()

	sourceConfig := map[string]string{
		"url":                     os.Getenv("DB_URL"),
		"table":                   os.Getenv("DB_TABLE"),
		"key":                     os.Getenv("DB_TABLE_KEY"),
		"columns":                 "",
		"snapshot_mode":           "never",
		"cdc_mode":                "logrepl",
		"logrepl.publicationName": os.Getenv("DB_PUBLICATION"),
		"logrepl.slotName":        os.Getenv("DB_SLOT_NAME"),
	}
	s := source.NewSource()
	if err := s.Configure(ctx, sourceConfig); err != nil {
		panic(err)
	}
	if err := s.Open(ctx, nil); err != nil {
		panic(err)
	}
	defer s.Teardown(ctx)

	for {
		record, err := s.Read(ctx)
		if err != nil {
			panic(err)
		}
		msg, err := json.Marshal(record.Payload)
		if err != nil {
			panic(err)
		}
		if err := s.Ack(ctx, record.Position); err != nil {
			panic(err)
		}

		println(string(msg))
	}
}

Version

53d5c54

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.