Giter Site home page Giter Site logo

conduit-connector-postgres's People

Contributors

ahmeroxa avatar dependabot[bot] avatar dylanlott avatar hariso avatar lovromazgon avatar maha-hajja avatar raulb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

conduit-connector-postgres's Issues

Add a trigger CDC mode

Feature description

Currently we have a CDC mode which uses PostgreSQL's WAL. We'd also like to have a CDC mode which uses PostgreSQL's triggers.

[Postgres Destination] connector can't insert data with the "key" in lowercase

Postghres Destination connector can't insert data with the "key" in lowercase
Example:
if the "key" in lowercase is used in the configuration in the source connector. And in the payload, the "key" is transferred in the uppercase.

Logs:
2022-05-12T07:52:41+00:00 ERR acker node stopped without processing all messages error="dropped 3 messages when stopping acker node" component=AckerNode node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker pipeline_id=224bf6bd-5cef-4957-a181-880910f41d55 stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}] 2022-05-12T07:52:41+00:00 ERR node stopped error="node 7de377e6-efe3-45de-acc7-f39695fc5e76-acker stopped with error: error receiving ack: insert exec failed: ERROR: column "id" specified more than once (SQLSTATE 42701)" component=pipeline.Service node_id=7de377e6-efe3-45de-acc7-f39695fc5e76-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

Postgres table in which data exists can't be upsert unless "key" is specified in the config when creating

Postgres table in which data exists can't be upsert unless "key" is specified in the config when creating  Postgres Destination connector (request: /v1/connectors). Even if the "key" field is present in the metadata
Steps to reproduce

  1. There is a table with data with the  primary "key" in Postgres
  2. Create a new pipeline
  3. Create Postgres destination connector with configs
    "settings": {
    "url": "postgresql://postgres:postgres@localhost:5432/postgres",
    table: "",
    "key": ""
    }
  4. Create Source connector that transfers the "key" in metadata (For example, Postgres or Snowflake Source)
  5. Run the pipeline -> the system is trying to transfer data by "key" which already exists
    Actual result: data isn't transferred to the Postgres table
    Expected result: The data should be updated by "key" from the metadata.

2022-05-12T13:56:39+00:00 ERR node stopped error="node 9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stopped with error: error receiving ack: ERROR: duplicate key value violates unique constraint "table2_pkey" (SQLSTATE 23505)" component=pipeline.Service node_id=9873f4b8-61d7-483e-8b24-a52f12395ad0-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

[Postgres source] When we create a source via UI and don't specify "key" then an error will fall when starting the pipeline.

[Postgres source] When we create a source via UI and don't specify "key" then an error will fall when starting the pipeline.
Reason: An error is occurred if the Postgres table isn't used primary key. But we can't specify a custom "key" in the UI when creating a connector.
Advice for improve: Need to add the "key" to the UI as an optional field of the config

Logs:
2022-05-09T10:44:26+00:00 ERR node stopped error="node 01f1505f-a15c-40c5-a9b8-ab50c056019a stopped with error: could not open source connector: failed to create logical replication iterator: failed to setup subscription: failed to find key for table users (try specifying it manually): getKeyColumn query failed: no rows in result set" component=pipeline.Service node_id=01f1505f-a15c-40c5-a9b8-ab50c056019a stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode.Run","line":54}]

Bug: Last N messages always redelivered on reconnect

Bug description

A Source will redeliver a previously ACKed message on reconnect.

The connector uses the LSN at the Position and passes that when ACKing a message. This seems to differ from the example provided in jackc/pglogrepl where the ACKed LSN is the offset from the WALStart to the end of the message:
https://github.com/jackc/pglogrepl/blob/master/example/pglogrepl_demo/main.go#L112

Steps to reproduce

  1. Create a publication
  2. Create a non-temporary slot
  3. Insert a value into the tracked table
  4. Consume slot using Source
  5. Kill the app after enough time has passed to ACK message.
  6. Restart the app to consume slot using Source

Sample app to consume Source:

package main

import (
	"context"
	"encoding/json"
	"os"

	"github.com/conduitio/conduit-connector-postgres/source"
)

func main() {
	ctx := context.Background()

	sourceConfig := map[string]string{
		"url":                     os.Getenv("DB_URL"),
		"table":                   os.Getenv("DB_TABLE"),
		"key":                     os.Getenv("DB_TABLE_KEY"),
		"columns":                 "",
		"snapshot_mode":           "never",
		"cdc_mode":                "logrepl",
		"logrepl.publicationName": os.Getenv("DB_PUBLICATION"),
		"logrepl.slotName":        os.Getenv("DB_SLOT_NAME"),
	}
	s := source.NewSource()
	if err := s.Configure(ctx, sourceConfig); err != nil {
		panic(err)
	}
	if err := s.Open(ctx, nil); err != nil {
		panic(err)
	}
	defer s.Teardown(ctx)

	for {
		record, err := s.Read(ctx)
		if err != nil {
			panic(err)
		}
		msg, err := json.Marshal(record.Payload)
		if err != nil {
			panic(err)
		}
		if err := s.Ack(ctx, record.Position); err != nil {
			panic(err)
		}

		println(string(msg))
	}
}

Version

53d5c54

Bug: TestRelationSetAllTypes is flakey

Bug description

Occasionally running the tests for the Postgres connector results in an interface conversion panic. This is likely due to pglogrepl messages being received either out of order or at unexpected times.

=== RUN   TestRelationSetAllTypes
--- FAIL: TestRelationSetAllTypes (0.14s)
panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage [recovered]
        panic: interface conversion: pglogrepl.Message is *pglogrepl.CommitMessage, not *pglogrepl.RelationMessage

goroutine 11 [running]:
testing.tRunner.func1.2({0x1a41de0, 0xc00033c570})
        /usr/local/go/src/testing/testing.go:1389 +0x366
testing.tRunner.func1()
        /usr/local/go/src/testing/testing.go:1392 +0x5d2
panic({0x1a41de0, 0xc00033c570})
        /usr/local/go/src/runtime/panic.go:844 +0x258
github.com/conduitio/conduit-connector-postgres/source/logrepl/internal.TestRelationSetAllTypes(0xc0000036c0)
        /Users/dylanlott/dev/meroxa/conduit-connector-postgres/source/logrepl/internal/relationset_test.go:58 +0x61b
testing.tRunner(0xc0000036c0, 0x1b48c08)
        /usr/local/go/src/testing/testing.go:1439 +0x214
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:1486 +0x725
FAIL    github.com/conduitio/conduit-connector-postgres/source/logrepl/internal 1.480s

Steps to reproduce

  1. Run make run
  2. The TestRelationSetAllTypes test occasionally fails with a failed interface type conversion.

Version

v0.1.0

[Postgres source] When we create a source via UI we don't have the ability to specify the Postgres table which is used as the destination.

[Postghres source] When we create a source via UI we don't have the ability to specify the Postgres table which is used as the destination. An error will occur after starting the pipeline (see logs).
Advice for improvement: Need to add the "table" to the UI as a required field of the config.

Logs:
2022-05-09T11:08:01+00:00 ERR node stopped error="node 220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stopped with error: error receiving ack: no table provided for default writes" component=pipeline.Service node_id=220d9aa5-7456-49a4-b8a6-1bb34a5cd13d-acker stack=[{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/home/alyakimenko/go/src/github.com/conduitio/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

Feature: Support multiple tables

Feature description

It would be great if the connector exposed the support for more than one table at a time. At the moment, the number of tables dictates the number of replication slots which is not ideal.

Data isn't transferred from S3 source to Postgres Destination. The error is occurred (see logs)

Logs:
2022-05-25T08:00:16+00:00 ERR acker node stopped without processing all messages error="dropped 1 messages when stopping acker node" component=AckerNode node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker pipeline_id=0cacf098-fe7c-43c1-89d1-63365631108a stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/stream/acker.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*AckerNode.teardown","line":133}]
2022-05-25T08:00:16+00:00 ERR node stopped error="node 763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stopped with error: error receiving ack: failed to get key: invalid character 'p' looking for beginning of value" component=pipeline.Service node_id=763bb6b1-a77e-45a5-ba69-e30a90d04e43-acker stack=[{"file":"/Users/serhii/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service.runPipeline.func1","line":387},{"file":"/Users/serhii/conduit/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*destination.Ack","line":194},{"file":"/Users/serhii/conduit/pkg/plugin/builtin/v1/destination.go","func":"github.com/conduitio/conduit/pkg/plugin/builtin/v1.(*destinationPluginAdapter.Ack","line":137}]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.