Giter Site home page Giter Site logo

connectors's Introduction

CI Slack | Docs home | Free account | Data platform comparison reference | Email list

Build millisecond-latency, scalable, future-proof data pipelines in minutes.

Estuary Flow is a DataOps platform that integrates all of the systems you use to produce, process, and consume data.

Flow unifies today's batch and streaming paradigms so that your systems โ€“ current and future โ€“ are synchronized around the same datasets, updating in milliseconds.

With a Flow pipeline, you:

  • ๐Ÿ“ท Capture data from your systems, services, and SaaS into collections: millisecond-latency datasets that are stored as regular files of JSON data, right in your cloud storage bucket.

  • ๐ŸŽฏ Materialize a collection as a view within another system, such as a database, key/value store, Webhook API, or pub/sub service.

  • ๐ŸŒŠ Derive new collections by transforming from other collections, using the full gamut of stateful stream workflow, joins, and aggregations โ€” in real time.

Workflow Overview

Using Flow

Flow combines a low-code UI for essential workflows and a CLI for fine-grain control over your pipelines. Together, the two interfaces comprise Flow's unified platform. You can switch seamlessly between them as you build and refine your pipelines, and collaborate with a wider breadth of data stakeholders.

โžก๏ธ Sign up for a free Flow account here.

See the BSL license for information on using Flow outside the managed offering.

Resources

Support

The best (and fastest) way to get support from the Estuary team is to join the community on Slack.

You can also email us.

Connectors

Captures and materializations use connectors: plug-able components that integrate Flow with external data systems. Estuary's in-house connectors focus on high-scale technology systems and change data capture (think databases, pub-sub, and filestores).

Flow can run Airbyte community connectors using airbyte-to-flow, allowing us to support a greater variety of SaaS systems.

See our website for the full list of currently supported connectors.

If you don't see what you need, request it here.

How does it work?

Flow builds on a real-time streaming broker created by the same founding team called Gazette.

Because of this, Flow collections are both a batch dataset โ€“ they're stored as a structured "data lake" of general-purpose files in cloud storage โ€“ and a stream, able to commit new documents and forward them to readers within milliseconds. New use cases read directly from cloud storage for high-scale backfills of history, and seamlessly transition to low-latency streaming on reaching the present.

What makes Flow so fast?

Flow mixes a variety of architectural techniques to achieve great throughput without adding latency:

  • Optimistic pipelining, using the natural back-pressure of systems to which data is committed.
  • Leveraging reduce annotations to group collection documents by key wherever possible, in memory, before writing them out.
  • Co-locating derivation states (registers) with derivation compute: registers live in an embedded RocksDB that's replicated for durability and machine re-assignment. They update in memory and only write out at transaction boundaries.
  • Vectorizing the work done in external Remote Procedure Calls (RPCs) and even process-internal operations.
  • Marrying the development velocity of Go with the raw performance of Rust, using a zero-copy CGO service channel.

connectors's People

Contributors

dependabot[bot] avatar dyaffe avatar jgraettinger avatar jman0119 avatar mdibaiee avatar oliviamiannone avatar psfried avatar saterus avatar snowzach avatar travjenkins avatar willdonnelly avatar williamhbaker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

connectors's Issues

Kafka Source Connector

We'd like to be able to consume messages from Kafka topics/partitions as an Airbyte-compatible connector.

DynamoDB Source

We'd like to have a source connector for dynamodb. We've done a bit of research and thinking on it already, which I'll write up here.

DynamoDB has a feature called streams that allows reading CDC events. There is a separate feature that can automatically forward these CDC events to a Kinesis stream. Our connector should use dynamodb streams, not the kinesis feature. The kinesis feature has a seemingly higher cost, and CDC events may be duplicated in the kinesis stream. Additionally, the kinesis feature does not scale automatically to adjust for the throughput on the dynamodb table. Dynamodb streams has a lower cost and scales automatically as the dynamodb table itself scales. The dynamodb streams API is very similar to the Kinesis API, though, so the two implementations can likely share a lot of code.

When processing CDC data, it's important to process events in the correct order. If you have an insert followed by a delete on the same key, you will end up with incorrect results if you processed the delete before the insert. This is complicated by design of Shards in dynamodb streams. Shards each have a parent shard, and each shard may have 0 or more child shards. You must read a parent shard entirely before you start reading a child shard if you want to process documents in the correct order. This is problematic because of how we distribute reads of the dynamodb (or kinesis) shards among the set of Flow consumer shards. Consumer shard A may decide to process dynamo shard X before Consumer shard B has finished reading the parent of X. Solving this seems like it would require some shared state.

Another possible approach for dealing with ordering is to use the ApproximateCreationDateTime of each dynamodb record with a maximize reduction strategy on the root document so that the most recent CDC document for each key is the one that will take precedence.

source-mysql: Verify binlog replication cursor behavior

This is analagous to the whole "make sure the replication slot LSN advances properly" issue with PostgreSQL, just with different terminology.

The initial implementation of source-mysql ignores the issue entirely, which actually isn't totally awful, because the "cursor" values we're using consist of a (BinlogFile, Offset) tuple, and I'm pretty sure that (unlike PostgreSQL logical replication) the MySQL binlog replication actually just starts reading and reporting events from the specified offset in the named file.

This means that (assuming the above description is correct) the only real concerns here are:

  1. Do we have to do anything special to indicate that we're done with a particular binlog file before the upstream DB can delete it?
  2. If we don't have to do anything special, what guarantees can we provide and/or what is our recovery strategy if a MySQL capture is offline for so long that there's a gap in the logs?

Materialization to Firebolt

Firebolt is essentially a Snowflake competitor that's up and coming but with good traction. Because they only accept delta updates, they haven't been able to create integrations with the standard ELT providers (like Fivetran) or with Kafka. That makes them a big near-term opportunity due to the fact that their customers want to take data from their Kafka clusters and other endpoints.

We should create a materialization connector for Firebolt that works with delta updates. Here are their docs. I can also provide credentials to a test account which they've provisioned to us.

snowflake: internal stage failure of the connector

Possibly due to token expiration?

{"err":"runTransactions: app.ConsumeMessage: polling pending: reading Loaded: EOF","level":"error","msg":"servePrimary failed","shard":"/gazette/consumers/flow/reactor/items/materialize/****/00000000-00000000","time":"2021-10-06T18:45:14Z"}
{"err":"store.Commit: merging Store documents: 090247 (22000): Internal error accessing stage area:Anonymous caller does not have storage.objects.list access to the Google Cloud Storage bucket. (Status Code: 401).","fence":253,"keyBegin":0,"keyEnd":4294967295,"level":"error","materialization":"****","msg":"RunTransactions failed","time":"2021-10-06T18:45:14Z"}

parser: match projections under collation space

We've seen cases of real world data which freely mix the capitalization of a given field in different CSV files (for example, citi bike).
Meanwhile, Flow requires that manual projections are unique under a caseless, unicode-normalized collation.

Currently the connector parser exactly matches CSV columns to projection fields. This means that variations in capitalization are distinct projections from it's perspective, and also can't be provided as distinct projections in Flow (because they're equal under collation).

The parser should also match CSV columns to projections using the same collation as Flow (caseless, and unicode normalized).

Bonus: two other cases of automatic spaced matches which would be nice to have:

  • It's common to have root-document properties like "/bike_id", that should match "Bike ID"
  • Properties like "/foo/bar/baz" should match projections like "Foo Bar Baz".

Materialization Documentation

Need to create an engineering focused document about how materialization connectors work and are created.

  • Protocol Flow
  • Sample Connector Instructions

PostgreSQL Source Connector

We need a connector which can continuously source changes from PostgreSQL via Logical Replication.

There's an Airbyte PostgreSQL connector, and it even supports replication, but it operates in a polling mode where the connector gets re-run periodically to collect new changes and then shuts down, and making it run continuously appears nontrivial.

But since that connector exists, that means that this one can be focused solely on the low-latency replication side of things, which simplifies the problem a bit.

source-postgres: Backfill latency improvements

As currently written, the source-postgres connector must perform a table backfill to completion before it can resume replication. When the tables are reasonably small this is fine, but if they grow large enough that this backfill takes hours or even days this has two major implications:

  1. Adding a new table will essentially pause the capture of all other tables until the backfill finishes.
  2. The new table will also not be replicated until the backfill finishes.

The first problem could likely be solved by some refactoring to allow the table-scanning process and replication event processing to be interleaved. There already has to be logic around filtering change events from not-yet-scanned portions of the table, so the only major change is that there needs to be some way to very briefly pause replication processing while each individual chunk is read out. But I suspect we'll need this sooner or later.

The second problem almost feels like a "well what did you expect?" sort of situation, but it's not exactly. The problem is that there may be absurdly gigantic tables in which the majority of the data is old and will never be modified. It would be nice if we could immediately jump to the new data, scan and then start replication on that, and then have a separate batch process which will slowly grind through the older data. This could be provided by some sort of "threshold" feature so we could express things like "only capture rows whose created_at column is more recent than <date>".

Flaky test in source-postgres

This test run had a test failure in TestSlotLSNAdvances.

Here's the last bit of the test output:

2022-02-23T21:38:41.1380829Z #18 51.10 time="2022-02-23T21:38:40Z" level=debug msg="checking slot LSN" after=0/17D25D0 before=0/17D25D0 iter=48
2022-02-23T21:38:41.2776158Z #18 51.30 time="2022-02-23T21:38:41Z" level=info msg="initializing connector" database=flow host=localhost port=5432 slot=flow_test_slot user=flow
2022-02-23T21:38:41.2778834Z #18 51.31 time="2022-02-23T21:38:41Z" level=debug msg="starting replication" publication=flow_publication slot=flow_test_slot startLSN=0/186AFE8
2022-02-23T21:38:41.2781002Z #18 51.31 time="2022-02-23T21:38:41Z" level=debug msg="sending Standby Status Update" ackLSN=0/186AFE8
2022-02-23T21:38:41.4282782Z #18 51.40 time="2022-02-23T21:38:41Z" level=debug msg="queried primary key" key="[id]" table=public.test_slotlsnadvances_one
2022-02-23T21:38:41.4284781Z #18 51.40 time="2022-02-23T21:38:41Z" level=debug msg="queried primary key" key="[slot]" table=public.flow_watermarks
2022-02-23T21:38:41.4285503Z #18 51.40 time="2022-02-23T21:38:41Z" level=debug msg="queried primary key" key="[id]" table=public.test_slotlsnadvances_one
2022-02-23T21:38:41.4286899Z #18 51.40 time="2022-02-23T21:38:41Z" level=debug msg="writing watermark" watermark=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4287619Z #18 51.41 time="2022-02-23T21:38:41Z" level=info msg="streaming until watermark" tail=false watermark=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4289678Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="streaming to watermark" watermark=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4290918Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=203a069e-8925-4f81-a2f1-f1b927a9a232 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4291677Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4293107Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=2b6d6c6e-813a-482b-9c24-1607b38a61b9 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4294137Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4294794Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=83fd4c45-86b9-409e-bc4a-86ef76cc31ff expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4295379Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4296017Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=a94472c5-b003-472a-9f07-6d4e6ca03486 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4296593Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4297936Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=cb2063df-46ce-449e-8d17-a1b757a1ef32 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4298753Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4299581Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=2c58773e-2fc3-471d-a16c-cce928ea61b6 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4300653Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4301705Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=0991e4ef-45a9-44df-b6f3-b2275634bbb8 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4302526Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4303687Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=e0da5859-e9c8-467d-9e1a-3d80f814b67d expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4304871Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4306052Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=ddd5c064-fe01-44b6-96cc-2b25c9b55223 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4306658Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4307322Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=2682d52a-3f39-4962-a0a1-4b16fa1ddfd9 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4308073Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4308706Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=e83c9874-d4ae-407e-90c4-05aed4e991dd expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4309289Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4310337Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=ef7c5929-4393-4f9b-a6a0-02a6c1e60fe2 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4311178Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4311984Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=30bef966-e3b4-42d1-be0d-c4bdf32bd787 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4312619Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4313286Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=4b28bc93-338c-4226-8a50-93a12651d316 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4314269Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4314932Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=ba13f8f6-771e-42e2-9ce9-7752b266c026 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4315679Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4316332Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=0a0e93df-9f4a-433c-aea4-78314f03cf8b expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4316905Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4317635Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=e03b103f-d79d-48ec-bda5-58c454225779 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4318205Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4318847Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=afacfbb0-99ab-4f93-b18e-e81aefdbeed4 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4320703Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4321408Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=939bf687-d1c3-49b4-ad23-b28d35378cc0 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4322058Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4322870Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=6091f5e8-fcc9-42e2-9fc6-bac7df6c3440 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4323467Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4324414Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=83361753-6d5b-4f32-a036-5c568cc67436 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4325138Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4325747Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=b1c9c4a0-1a90-41e5-ab56-0a3d07ed808b expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4326296Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4326915Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=ef02f5d9-49f1-476b-a410-a28e1ef07ffc expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4327475Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4328088Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=7d45c953-ab51-461e-8a40-e5694f271668 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4328656Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4329267Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=e615f456-0a8a-4776-91d1-934c9885f2d9 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4330094Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4330976Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=b5d5e62e-c2e0-4e8e-8995-a5525a1cfa48 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4331567Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4332285Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=67b0262a-4184-4c91-afe8-21e59af42723 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4332875Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4333691Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=3e9f0e78-03cb-4d95-9652-b7fa29e4b465 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4334425Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4335036Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=5b3dc201-36b9-434e-bfe3-9117d957e29f expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4335616Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4336228Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=a257707f-dc45-4b62-93ab-63807194561f expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4336780Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4337395Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=7f562fb1-afd5-460b-be61-5c16ed21e022 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4337948Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4338589Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=cca1ffe0-98a6-46a3-ad43-adf3c968a11b expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4339252Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4340131Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=c2b0f87c-d815-4149-903a-d4cbb2ed3bd6 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4340892Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4341542Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=e85dda7e-db98-48d0-a65f-c38d12945cf7 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4342147Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4342820Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=4ed3e5a3-356c-4abd-ab76-cf7a22c546bc expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4343426Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4344382Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=fcb53209-b8f5-4cd6-8e4e-92db3c3806e3 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4345079Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4345698Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=fdc901d6-9eaf-4562-ae03-9c5da4e45dca expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4346257Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4346865Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=b3c1301e-b1ea-49a4-94c7-fc659a336a26 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4347617Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4348654Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=c4e7671d-d97c-4e34-95e4-9b5de04d8837 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4349379Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4350291Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=b9612c59-402b-4aab-8f58-fbe14ce59f4c expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4351164Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4352048Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=3f1a90dd-5506-4a2e-95b7-197c4d5d5c1a expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4353156Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4354615Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=0e1cd159-3ffe-4b58-a382-b1144ef8646c expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4355337Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4356132Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=73d49ea0-dcd5-4f22-bf4d-0e67d8bb4495 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4356856Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4358194Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=4ab27f7e-b526-4bb2-b143-ee66f2aef516 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4358741Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4359549Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=2b268e4e-972d-4ae4-85c8-29d3a3a1ff3f expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4360287Z #18 51.41 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4361120Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=2e29d379-fbd3-4a6d-a236-97da5d9fbc43 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4361714Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4362361Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=ee4e115f-fb01-4517-ace9-1de1399373b1 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4362941Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4363726Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=743c97a5-242f-4471-a27b-466597078feb expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4364452Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4365073Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=d74b72ec-46cd-49b5-8ed1-fbc2a207a600 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4365632Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4366267Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="watermark change" actual=4c6606bf-6be0-4242-99ed-56c4ed81de21 expected=4c6606bf-6be0-4242-99ed-56c4ed81de21
2022-02-23T21:38:41.4366825Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="ignoring stream" op=u stream=public.flow_watermarks
2022-02-23T21:38:41.4367302Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="replication stream close requested"
2022-02-23T21:38:41.4367810Z #18 51.42 time="2022-02-23T21:38:41Z" level=debug msg="checking slot LSN" after=0/17D25D0 before=0/17D25D0 iter=49
2022-02-23T21:38:41.5787509Z #18 51.62     capture_test.go:175: slot "flow_test_slot" restart LSN failed to advance after 50 retries

source-mysql: Extend datatype support

The initial implementation of source-mysql has just enough datatype translation logic (int, varchar, text, and double) to make the unit tests pass.

Ideally this should be extended to all column types that MySQL supports, both for discovery (translating MySQL column type names to JSON schema) and for capture (converting the captured values so that their JSON serialization will match the JSON schema).

This may require a bit of additional plumbing in the generic sqlcapture package, because currently the discovered type information is not available during the translation of captured values, and the MySQL client library basically only returns int/float64/[]byte/nil types, so any nontrivial output translation will probably require that metadata to know when it should be applied.

materialize-bigquery: Contention between shards updating `flow_checkpoints_v1`

When running a BigQuery materialization with >1 shard, the transactor Commit operations start interfering with each other, causing one or another shard to repeatedly fail with logs looking like:

{
  "_meta": {
    "uuid": "2d55e682-7e77-11ec-8400-0f6f4ecbb06e"
  },
  "shard": {
    "name": "acmeCo/dataset/to-bigquery",
    "kind": "materialization",
    "keyBegin": "00000000",
    "rClockBegin": "00000000"
  },
  "ts": "2022-01-26T07:11:26.845462691Z",
  "level": "error",
  "message": "shard failed",
  "fields": {
    "error": "exit status 1 with stderr:\n\ntime=\"2022-01-26T06:45:01Z\" level=info msg=\"opening bigquery\" bucket=wgd-test-bucket bucket_path=materialize-bigquery/ dataset=dataset project_id=acmeCo region=EU\ntime=\"2022-01-26T07:11:26Z\" level=error msg=\"RunTransactions failed\" error=\"prior transaction: transactor.Commit: merge error: Query error: Transaction is aborted due to concurrent update against table acmeCo:dataset.flow_checkpoints_v1. Transaction ID: 61e30eea-0000-2dab-8e60-94eb2c1b1a92 at [24:9]\" fence=40 keyBegin=0 keyEnd=2147483647 materialization=acmeCo/dataset/to-bigquery\nprior transaction: transactor.Commit: merge error: Query error: Transaction is aborted due to concurrent update against table acmeCo:dataset.flow_checkpoints_v1. Transaction ID: 61e30eea-0000-2dab-8e60-94eb2c1b1a92 at [24:9]\n"
  }
}

Where the actual error message, lightly formatted for readability, is:

exit status 1 with stderr:

time="2022-01-26T06:45:01Z" level=info msg="opening bigquery"
  bucket=wgd-test-bucket bucket_path=materialize-bigquery/
  dataset=dataset project_id=acmeCo region=EU
time="2022-01-26T07:11:26Z" level=error msg="RunTransactions failed"
  error="prior transaction: transactor.Commit: merge error: Query error: Transaction is aborted due to concurrent update against table acmeCo:dataset.flow_checkpoints_v1. Transaction ID: 61e30eea-0000-2dab-8e60-94eb2c1b1a92 at [24:9]"
  fence=40 keyBegin=0 keyEnd=2147483647 materialization=acmeCo/dataset/to-bigquery
prior transaction: transactor.Commit: merge error: Query error: Transaction is aborted due to
  concurrent update against table acmeCo:dataset.flow_checkpoints_v1. Transaction ID:
  61e30eea-0000-2dab-8e60-94eb2c1b1a92 at [24:9]

In my testing it appeared that a 2-shard materialization would generally settle into a steady state within ~10 minutes at most after which failures stopped occurring, and with 4 shards a steady state was reached after ~30 minutes. I suspect this "steady state" is mostly a coincidence, and just means that the transaction commits have managed to stagger themselves enough to execute without conflict the majority of the time.

Relatedly, it seems like materializing from a US datacenter into an EU BigQuery dataset may not ever reach such a nice steady state, perhaps because the BigQuery concurrency logic is somehow taking into account the client<->datacenter latency and so the window of possible conflict becomes much larger when the datacenter is further away?

But anyway, the important thing is that the query which reads from and updates flow_checkpoints_v1 shouldn't have any conflicts in the first place because the SELECT and UPDATE statements are restricted with WHERE key_begin=x AND key_end=y which should mean that each shard only ever touches its own row. So for these concurrent transactions to conflict at all means that something is wrong, either with the transactions we're executing or with the way that BigQuery's optimistic concurrency control is detecting conflicts.

SSH Tunneling for CDC Connectors

We should implement the ability to connect to a database using change data capture using SSH port forwarding. Initial conversations with companies that want to do CDC on their databases have led to an immediate question around whether we can support SSH port forwarding or tunneling.

Whatever we do here should be well documented so that we can point to a simple process when a person sets up CDC on their database using Flow connectors.

snowflake: streaming PUTs to internal stages

Not long ago, I attempted to update the connector to use the gosnowflake driver's recently added support for PUTs to Snowflake stages. I ran into a bug that has since been fixed, and we should try again.

While we're at it, we should switch to streaming PUTs to the internal stage as we consume from the Store iterator (instead of staging to a local temporary file, and then starting to upload only after the Store iterator is consumed). In my own profiling, it seems like this would materially reduce data stalls as we execute transactions, as these PUTs typically take seconds to complete for larger files.

As further context, our philosophy on connector errors and retries has shifted, and we're planning to implement a watch-dog in the control plane which looks for failed shards and restarts them with a backoff policy. That means the connector doesn't need to worry about spurious errors and retries while executing the PUT to Snowflake -- it can implement the more efficient, direct strategy of simply streaming to the stage and hoping for the best.

Support Avro files in parser

We'd like to support reading avro files in the parser.
These should be associated with the .avro extension.
Support for compressed data blocks is likely required, as the use of compression seems widespread.
This xandr page happens to have some pretty good examples that we could potentially use for testing.

Handle Airbyte spec auth_type

This currently prevents us from running any airbyte connector that supports OAuth...which is most of them. Ex. airbyte/source-google-sheets fails in this way. Github fails in a slightly different way.

We should do two things here:

  1. Research and write up a design for how we could implement OAuth with our current system
  2. auth_type has two sub categories -- firstly OAuth and secondly something related to API key sign in. Our discover workflow should ignore OAuth and present the options under the other item.

materialize-snowflake: snowsql logging errors

The materialize-snowflake connector is continuously printing these log messages:

Failed to initialize log. No logging is enabled: [Errno 13] Permission denied: '/snowsql_rt.log_bootstrap'
Failed to initialize log. No logging is enabled: [Errno 13] Permission denied: '/snowsql_rt.log'
Failed to initialize log. No logging is enabled: [Errno 13] Permission denied: '/snowsql_rt.log_bootstrap'
Failed to initialize log. No logging is enabled: [Errno 13] Permission denied: '/snowsql_rt.log'

This question seems relevant, but there may be other, better options out there.

source-postgres: Support columns with array types

Fairly self-explanatory, PostgreSQL allows columns to be multidimensional arrays of primitive types, and so the PostgreSQL source connector ought to support that properly.

Currently there's some hacked-in and incomplete support: any array will get serialized as a string using PostgreSQL's own text format, and during discovery there's some hacked-in logic to emit the corresponding schema {"type":"string"} for a small subset of possible array types (just integers, floats, and text).

Ideally a proper solution will work generically for the dozens of different PostgreSQL data types, and convert each one to an actual JSON array of the equivalent primitive value. How exactly to handle multidimensionality is still TBD, but it might suffice to translate arrays into something like

{
  "type": "object",
  "properties": {
    "dims": <something>,
    "data": {
      "type": "array",
      "items": {"type": <elementType>}
    }
  }
}

Or possibly even throw away the dimension information entirely and just return the underlying linear value array.

materialize-bigquery: Failure to clean up tables during local development

When deploying with --wait-and-cleanup to a local temp-data-plane the BigQuery materialization should delete the tables it created on exit. But, at least in my specific situation, it isn't doing this and the tables just linger. This in turn means that subsequent attempts to run the same catalog will silently fail to materialize any new data.

The deploy exits with the following error message (edited lightly for readability):

applying schema updates: query: wait: googleapi: Error 400: Query error: Transaction is aborted due to
concurrent update against table helpful-kingdom-273219:fenestra_materialization_test.flow_checkpoints_v1.
Transaction ID: 61c2c7ad-0000-2ef4-aadd-14c14eec2a00 at [1:185], invalidQuery

This error message appears to be coming from the code responsible for deleting the materialized tables on exit, which would explain why those tables aren't deleted as expected. However according to Johnny: "this smells like a bigger problem [...] because raced transactions against the table are expected to happen".

I have not dug any further into the issue for now, just filing a tracking bug so it doesn't get forgotten.

Enable conversion from OPENSSH key to RSA key

thrussh does not take OPENSSH private key, which is a common format from ssh-keygen.

Although the OPENSSH key could be converted to RSA key via the command ssh-keygen -p -N "" -m pem -f id_rsa manually, it is better to integrate this step in the connector and make it transparent to the user.

materialize-bigquery: excessive full table scans

We've observed that BigQuery does a poor job of avoiding full table scans during the Load phase of materialization transactions, even where the collection key is explicitly selected to allow for much more efficient scans (for example, by using an ascending date as a leading component).

We'd expect that BigQuery would take advantage of min/max ranges over keyed fields to cull the set of micro partitions it must actually process, as Snowflake does, but that's not what we're observing in practice.

source-postgres: Ensure correctness of captured data

We have reason to believe that the transition between table-scanning backfill and replication event processing is subtly incorrect and could miss or duplicate change events in edge cases, especially if there are concurrent "random-access-ish" modifications during the table scan.

The core of the problem is that, in the absence of writes, there's no way to precisely line up the transaction which reads a bunch of data via SELECT queries with the sequence of change events we receive via replication. Currently we sample the value of pg_current_wal_lsn() at the start of the read transaction and treat that as "the point at which the table was scanned", but this function isn't itself transactional, so not only is there a bit of a race condition there but there's also no guarantee that this LSN corresponds at all with the version of the table we're seeing.

We have anecdotal evidence that Debezium-based CDC solutions can exhibit subtle errors in the captured data, and this is a strong candidate for why that might be (since Debezium uses this same approach).

There are two avenues we might try for a more robust solution:

  • PostgreSQL replication must solve this problem somehow, or it wouldn't be possible to add replicas to a live database. We should investigate how exactly this works (probably snapshots?) and whether we could do the same thing.
  • Note how this whole problem was qualified with "in the absence of writes" up above. If the transaction which reads data out of the table also emitted any writes, then we could observe where those writes show up in the replication stream and thereby get an actual point of correspondence between the two processes. This is essentially what Netflix's DBLog system does, and has the advantage of being reasonably database-agnostic. But it requires that the Flow system have write access to the database, at least for one table.

source-mysql: Stress test and verify correctness

The initial implementation of source-mysql appears to work in unit tests, but the current unit tests don't cover all aspects of correctness.

I previously wrote a "correctness stress-test" which I used to prove that the old non-watermark-based capture implementation for PostgreSQL could produce incorrect output in certain edge cases. I need to dust that off and either:

  1. Translate the whole thing into a generic test which could be run as part of each SQL capture connector's test suite.
  2. Leave it as an external binary which launches the Dockerized capture connector, but make it generic and put the code somewhere in the repository so we can keep using it in the future.

Azure Postgres Connector Failing due to cron auth

Our standard steps for setting up a Postgres database don't exactly work. The first issue happens when one attempts to run the following:

postgres=> GRANT SELECT ON ALL TABLES IN SCHEMA information_schema, pg_catalog TO flow_capture;
WARNING: no privileges were granted for "sql_implementation_info"
ERROR: permission denied for table sql_parts

It next appears when trying to run a catalog set up to use this same database:

flowctl discover --image [ghcr.io/estuary/source-postgres:dev](http://ghcr.io/estuary/source-postgres:dev)
ERRO[0001] running connector failed                      error="exit status 1" logSource="[ghcr.io/estuary/source-postgres:dev](http://ghcr.io/estuary/source-postgres:dev)" operation=discover
FATA[0001] fatal error                                   err="fetching connector bindings: exit status 1 with stderr:\n\n{\"timestamp\":\"2022-03-08T22:47:29.876372355Z\",\"level\":\"INFO\",\"message\":\"received server public key: Ed25519(PublicKey { key: [52, 32, 85, 188, 70, 183, 144, 148, 34, 136, 135, 150, 154, 183, 205, 138, 126, 217, 47, 174, 194, 98, 72, 101, 233, 59, 251, 190, 154, 23, 231, 69] })\"}\ntime=\"2022-03-08T22:47:29Z\" level=info msg=\"initializing connector\" database=postgres host=localhost port=6000 slot=flow_slot user=flow_capture\n{\"timestamp\":\"2022-03-08T22:47:30.40290684Z\",\"level\":\"INFO\",\"message\":\"Unhandled global request: Ok(\\\"[[email protected]](mailto:[email protected])\\\")\",\"log.target\":\"thrussh::client::encrypted\",\"log.module_path\":\"thrussh::client::encrypted\",\"log.file\":\"/home/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/thrussh-0.33.5/src/client/encrypted.rs\",\"log.line\":481}\nunable to list database columns: ERROR: permission denied for schema cron (SQLSTATE 42501)\nError:  unable to list database columns: ERROR: permission denied for schema cron (SQLSTATE 42501)\n"

The unable to list all database columns error appears to happen due to our flow_capture user not being able to grant usage on schema cron to flow_capture:

postgres=> GRANT USAGE ON SCHEMA cron TO flow_capture;
WARNING: no privileges were granted for "cron"
GRANT

Weirdly, the user Azure creates for us doesn't have this issue and can be granted those rights. To reproduce this, I'm using this VPN server to ssh and this postgres db. Please message me for a Flow catalog which can be used for the repro.

Materialization Testing Framework

We would like to build a materialization connector testing framework that can drive connectors directly without the Flow Runtime. It should be able to provide details/reporting around:

Correctness

  • Storing correct values
  • Subsequent upserts ending with the correct values (transactions in correct order etc)

Performance

  • Records per second
  • Bytes/Second
  • Transactions per time frame
  • Optimal transaction size determination

Support Parquet Materialization Connector

To implement a connector that materialize a collection of data into parquet files.

Some thoughts on the requirements and designs:

  • The collection data are materialized into cloud storage (s3 / gcs).
  • The materialized data will be stored in a sequence of chunked files, each file containing data collected in a specified time period (e.g. 5 mins),
  • The connector keeps storing the data into a local staging file first, and uploads the file to the cloud after a specified time period.
  • A driver checkpoint is made, and persisted to the store of Flow, only after the data is successfully uploaded to the cloud.
  • There could be cases in which no transaction data is received, and no action of uploading to cloud is triggered for the remaining data in the local staging file. To handle that, there will be some additional logic to watch the local file, and upload it to the cloud if it gets too old. The uploaded file could be overridden by the transactor if new transaction data is received that triggers the normal actions.

Partial Updates for Rockset Materializations

Currently the Rockset connector functions with in delta update mode. We've recently uncovered an issue where TOAST'd columns require either:

  1. REPLICA IDENTITY FULL with delta updates on the materialization side
  2. Merge updates if that postgres setting isn't selected

It's impractical to require REPLICA IDENTITY FULL because Amazon seems to suggest that it can have performance implications. As such, we should modify our Rockset connector to also allow for merge updates. Also, no other CDC provider recommends that as a recommended path presumably due to performance impact.

Self-contained Docker Builds

We currently build Docker containers locally on a host machine, then copy the built artifact into a container built from the same base image. This can encounter problems when the host machine does not match the container's base system. This can be avoided by ensuring all artifacts are statically linked, but this can cause its own problems. Since the connectors will always be run from Docker, this is not exactly necessary.

By moving the build of the artifacts into the container itself, we can ensure that each connector has only its necessary dependencies available at build and run time. This should ensure simple, reliable, consistent builds as we continue to create more connectors.

materialize-rockset: fix required fields

api-key should be the only required field in the endpoint config, but the generated spec from schemalate indicates that http_logging and max_concurrent_requests are, as well. Unsure where the inconsistency originates, but it's a bit of a confusing UX.

Various materialize connectors missing config descriptions

A critical part of the connector docs are the tables generated from the configs. They look like this, and exist for both the endpoint config and resource/bindings:

Property Title Description Type Required/Default
/database Database Name of the logical database to materialize to. string
/host Host Host name of the database to connect to. string Required
/password Password User password configured within the database. string Required
/port Port Port on which to connect to the database. integer
/user User Database user to use. string Required

Right now, this is done manually using flowctl api spec and schemalate, which gives me a chance to edit them, but in the future it will likely be automated. The Title and Description also have UI implications (I would imagine).

So, all fields for both the endpoint and resource should have a descriptive title and a description that will output automatically. Currently, most of the materialization connectors are missing at least some components.

Here's a list of what's missing for each. I've also included connectors that are missing READMEs, because that seems important too, though it doesn't have immediate doc/UI implications:

materialize-bigquery

  • Missing Title and Description for resource config
  • Unsure about endpoint config - md generation fails due to estuary/flow#406
  • Possibly remove credentials-file, per estuary/flow#367 (comment) - from both config and README.

materialize-elasticsearch

  • Missing Title from resource config
  • In resource config, in the description for the field /field_overrides/-/pointer`, "delimited" is spelled incorrectly
  • Missing Title and Description from endpoint config
  • Missing README

materialize-postgres

  • Missing Title and Description from resource and endpoint configs
  • Missing README

materialize-rockset

  • Missing Title and Description from resource and endpoint configs

materialize-s3-parquet

  • Missing Title and Description from resource and endpoint configs
  • Missing README

materialize-snowflake

  • Missing Title and Description from resource and endpoint configs
  • Missing README

CSV parser should allow missing columns when headers are explicitly configured

Given an example CSV file that doesn't have a header row:

1
2,3
4,5,6

And the following parser configuration:

parser:
  format: csv
  csv:
    headers: [a, b, c]

The desired behavior is that the CSV file should parse without error, as long as the fields b and c are nullable. The current behavior is for the parser to return an error after the first row, because it's expecting each row to only have one column (based on the first row that it read).

We should update the parser to successfully parse the file in this scenario. Note that we should also ensure that a CSV file with more columns than there are headers should still fail. For example, the following CSV file should result in an error at line 2, given the same configuration as above with only 3 headers:

1
2,3,4,5

Materialization to Apache Beam

Apache Beam aka Google Cloud Dataflow is a transformation framework that Google is pushing heavily. They don't currently have a way to get data into it from SaaS or database sources and would like to have one that works in a streaming fashion. This means there's a relatively rare opportunity to get onto their partner list and have them sell our product if we can provide that in the short term.

Google has a UI that would show our solution if we built a connector for Apache Beam. Google users would then be able to select Estuary as a source and dive into any source we support to load Beam. Beam supports being loaded by Kafka, and as such this could look like the same work that it would take to support a materialization connector to Materialize. Here are their docs.

Note that we probably need them to fork their Kafka connector and have them call it "Estuary" for this because the UI will only show net new endpoints. If we just used their Kafka connector as is, they would see Kafka and not Estuary in the UI.

source-postgres: debezium-compatible output

We'd like this and future connectors to be maximally compatible with Debezium's output, documented here:
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-create-events

There are some notable additions:

  • before and after states which are populated directly from the replication log, if available. The WAL may have before states available, and without this you must use a derivation register in order to re-construct it.
  • Additional, and database-agnostic ordering info through the /source/sequence field, and the /source/txId field for understanding transaction boundaries.

We've encountered other systems like materialize we'd like to integrate, which make active use of this ordering metadata, so it's desirable to have it be available directly from CDC connectors in a consistent way. While we're at it, I see no downside in using remaining bits of the Debezium change event shape wherever practical.

source-postgres: proper support for TOAST semantics

We want to gracefully handle various settings of REPLICA IDENTITY within captured tables.
These are a bit tricky because they omit unchanged TOAST values and thus break the current expectation of seeing full restatements of all row fields on changes.

The plan is to combine:
a) Omitting TOAST fields which are unavailable under the current REPLICA IDENTITY, while also
b) Adding reduce annotations which provide the correct merging behavior during materializations, etc.

Support Parsing Avro Documents

It's rather common in the Kafka ecosystem to encode Message payloads with Apache Avro. However, Avro documents are not self-describing, they require a external schema definition to be deserialized.

TODO:

  • Support passing a schema definition to the parser
  • Parse Avro documents and reformat them as JSON
  • Add support for converting Avro schema definitions to JSON schema definitions

Out of Scope:

  • Fetching an Avro schema from the Confluent Schema Registry

Skip Malformed CSV Rows

We'd like to add the capability to be less strict when parsing data in capture connectors. This is not always desirable, but there are enough use cases that would benefit from this that we'd like to give users the option. It's not always possible/feasible to clean up the source data before we parse it. We can't defer these errors until derivation time, as the parser halts the capture when encountering malformed data.

TODO:

  • Add config for % of errors limit
    • default limit = 0
    • hidden configuration in the CaptureSpec
    • connectors: merge explicitly supplied parser options into config
  • Halt w/ error when error limit exceeds threshold
    • % of entire file
    • % of last 1000 rows
    • other?
  • Capture malformed/skipped rows and output as error logs

Investigate the viability of using Debezium for future CDC connectors

Converting this from a note to an issue so I have a good place to put progress updates. I'll write up something more detailed later, but the quick summary:

  • Debezium has an incremental backfills mode, which was one of the major issues with using it in the past.
  • This does not automatically make the Debezium-based Airbyte connectors usable for our purposes, but it potentially means that we could write our own connectors based around Debezium as the capture engine.
  • This might be desirable for a few reasons, most notably that once we have Debezium-based capture working at all we get a lot of additional databases for very little effort, and in some cases (such as MySQL) there has been a lot of work in the Debezium logic that we would prefer not to try and replicate (for instance, the whole "parsing DDL queries to keep track of table schema changes" thing).
  • The biggest issue is that Debezium only really addresses the "capture" part of a source connector, and so we would still need additional (database-specific) logic for table discovery.
  • This logic is relatively simple: Enumerate tables and columns in the DB, map DB types to JSON Schema according to a hard-coded lookup table, emit resulting catalog. The tricky bit is that the lookup table has to match what Debezium will actually do for every type.
  • In practice, this can be tricky if value representation differs between backfills and replicated change events. This is probably one place where Debezium does a better job than we'd want to do ourselves (this is where parsing and interpreting the DDL queries ought to shine, as an approach), however I still want to verify that.
  • So currently I'm working on turning my "MySQL value capture" test cases into a quick-and-dirty sanity check to make sure Debezium is actually doing a good job here.

materialization: materialize CDC format

Background

Materialize is an interesting materialization target we'd like to explore.

It has an idealized change-data-capture format that has it's baking in the way timely dataflow works. Here's a longer blog post with more background. tl;dr is that Materialize wants to consume updates like:

update (record0, time0, +2)
update (record1, time0, +1)
update (record2, time0, +1)
progress (time0, 3)
update (record1, time1, -1)
update (record2, time1, +1)
progress (time1, 2)
update (record0, time2, -1)
update (record2, time2, -1)
progress (time2, 2)
progress (time3, 0)

Materialize innately wants consistent differential delta updates of complete rows. record0, record1, etc are restatements of a record as it existed at a point in time, and the +1 / -1 are updates of that record in a multi-set -- which is differential dataflow's fundamental abstraction. Given a consistent and correct CDC stream of this kind, Materialize is able to avoid storing the source table in memory and operate only on the change stream, which I understand to be the desired ideal.

However CDC solutions don't provide that for various reasons. Either because of weak consistency semantics, or because it's not available in the logical log. For example:

  • The Materialize Postgres source must keep the entire source table indexed, presumably to handle retractions.
  • It's support for the Debezium envelope notes that duplicates can happen and support is best-effort. There are additional caveats around log compaction violating correctness.

A Flow + Materialize solution therefor looks interesting for a few reasons:

  • CDC captures that include only delta changes, like Postgres, can be combined with Registers to produce full and consistent accountings of changes in update (record, +1/-1) terms.
    • This avoids Materialize having to keep an in-memory copy of the source. Flow can maintain those states as a HA + scale-out concern.
  • The +1/-1 differential deltas could be managed as reduction annotations.
    • This allows Flow to substantially roll-up of changes before pushing them to Materialize, which is almost certainly important for new materializations of long-lived CDCs.

Assumptions

  1. We don't want to pass-through a first-class notion of Flow's internal CDC log time, because it's not helpful to tell Materialize about timepoints in the distant pass. In other words, time0, time1, etc should always be ~now. This is because Materialize (and timely dataflow as a whole) have a global understanding of the marching progress of time. You can't usefully tell it timepoints of a months-old capture that's just been stood up as a new source. This may be wrong - I recall that hyper-times are possible in Timely Dataflow but that may not be in Materialize just yet.

  2. Materialize sources are all pull-oriented in nature. A successful integration would require a push API where Flow tells Materialize of update and progress messages, and receives ACKs back in reply. An ACK means that Materialize has retained all CDC records through the ACK and is able to re-play them on its startup, as happens internally with current native sources. I assume this is feasible. The API could be either streaming -- which would require explicit ACKs -- or synchronous, such as an HTTP PUT with ACK being conveyed by OK response.

  3. Breaking Change / Contentious if Materialize sees a progress at time1, then updates at time2, and then another progress at time1 -- it must treat the second progress at time1 as a roll-back and discard the intermediary time2 updates.

    If 1) above is correct, then this is going to be required by any exactly-once integration with a streaming system, because that system a) must align its own transaction boundaries with Materialize "progress" updates for correctness, but also b) dynamically sizes its transaction boundaries based on stream behavior (like data stalls).

    If 1) is incorrect, than the stream itself could encode repeatable times sent as update / progress. However, this still precludes an important optimization: being able to roll-up / compact a larger set of differential updates into a smaller set that's actually sent to Materialize.

    To illustrate the specific concern, suppose that Materialize instead decided to discard all but the last-received update of a record at timeT. This is reasonable at first blush, because if transactions are consistent on their selection of timeT, than a failed transaction would presumably be followed by a re-statement of the record at timeT. However this fails to account for the second, post-recovery transaction being smaller than the transaction that failed. In other words, a transaction could read through offset N of a stream and then fail after sending updates at timeT. A next transaction could read through offset M (< N) and also send updates at timeT -- but would fail to account for records between M:N and these records would effectively be sent twice.

Desired Outcome

We can PoC a materialization connector and end-to-end example which drives a proposed / currently-non-existent Materialize push API. It would:

  • Perform CDC using the Postgres capture connector across two example tables A & B.
  • Apply derivations to map CDC records into differential deltas of A & B.
  • Materialize using the proposed connector, driving a stand-in / no-op API to which it POSTs.

The materialization connector would side-step table creation in Materialize for now -- we'd need to coordinate how that would work, and it's more important to show the live CDC log.

Connector implementation sketch

The connector supports only delta-updates, and assumes (& eventually verifies) that source collections are ~schematized as:

{"record":{"the":"record"},"delta":-2}
  • record also forms the collection key -- each distinct tuple of record fields is its own key.
  • delta has a sum reduction strategy.

State

The connector uses driver checkpoints managed in recovery logs, and doesn't rely on transactions within materialize (not supported).

The driver checkpoint is:

{
  progress: {timestamp, num-records},
  next: timestamp,
}

progress is an intention of a progress update which will be written once a current transaction has been committed to the materialization's recovery log. On startup, the first thing a connector does is to tell Materialize of "progress" in its recovered checkpoint. This ensures we cannot fail to deliver "progress" if a txn committed, and is the same technique used for ACK intents within Gazette's exactly-once semantics.

next is the timestamp to use in the next transaction. The rationale for committing it into the driver state, rather than a transaction independently picking the current time, is that "update" messages of a failed transaction are guaranteed to be repeated by a successor transaction. If assumption 3) doesn't pan out then Materialize would at least discard the earlier "updates" which are explicitly repeated (TODO this is unsatisfying and still needs design work).

Transaction lifecycle:

  • The Load phase is a no-op ("delta updates" mode).
  • The Store phase:
    • Sends the progress intent recorded in the last driver checkpoint.
      • At this point we know for sure that the prior txn has committed to the recovery log.
    • Sends updates to Materialize reshaped as {record, timestamp, delta}, using the next timestamp from the last checkpoint.
    • It tracks the total number of documents sent (it will be one per record tuple).
  • The Prepare phase emits a driver checkpoint update that:
    • Sets the progress to be written (former next timestamp and number of sent records).
    • Sets the next timestamp to be used in the next transaction using the current wall-time.
  • The (async) Commit phase blocks until the "progress" and "updates" written during the Store phase have been ACK'd.

Implement a MySQL source connector

I've been working on this for the past week, and while I'm not entirely done with the details the broad strokes are mostly there. Filing this bug so I can update the Big Board.

A large chunk of the logic previously written in source-postgres is actually the generic "SQL Database Capture" process, so the first step of this is/was to factor out all of that into a generic sqlcapture package, and have source-postgres implement the various database operations (discover tables, stream change events via replication, write watermarks, scan table chunks, etc) plus a thin main function that injects that concrete implementation into the generic machinery that coordinates everything.

A similar bit of refactoring allows the tests to be factored out into another package as well, because most of the existing source-postgres tests should be just as applicable to any other SQL database capture.

After all that, implementing the MySQL source "only" requires implementing all those concrete operations one-by-one. This is actually working at the moment, and all the "genericized" tests are passing. However I expect I'll be working on various details for at least the rest of this week.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.