Giter Site home page Giter Site logo

sugarcrm / debezium Goto Github PK

View Code? Open in Web Editor NEW

This project forked from debezium/debezium

0.0 0.0 3.0 42.04 MB

Working design document: https://gist.github.com/morozov/1157990ccd7ca00ca89637da914a4c08.

Home Page: https://debezium.io

License: Apache License 2.0

Java 93.13% Groovy 1.83% Shell 0.55% ANTLR 3.84% TSQL 0.04% Dockerfile 0.04% Python 0.14% JavaScript 0.01% PLSQL 0.41% Batchfile 0.01%

debezium's People

Contributors

ani-sha avatar btiernay avatar debezium-builder avatar fbolton avatar ggaborg avatar grzegorz8 avatar gunnarmorling avatar harveyyue avatar jcechace avatar jchipmunk avatar jpechane avatar keweishang avatar kgalieva avatar krnaveen14 avatar kucharo2 avatar martinmedek avatar morozov avatar naros avatar novotnyjiri avatar plugarut avatar prburgu avatar renatomefi avatar rhauch avatar rk3rn3r avatar roldanbob avatar sgc109 avatar spicy-sauce avatar tovacohen avatar vjuranek avatar zalmane avatar

Watchers

 avatar

debezium's Issues

Rework SqlServerTopicSelector to include database name into the topic name

Currently the selector builds the topic name as <server>.<schema>.<table>. It makes it challenging to route messages from each database to its separate topic because there's no database name in the topic name. We need to add the database name to the default selector and make the topic name look like <server>.<database>.<schema>.<table>.

Making this change will require changes in existing tests.

Extract offset context from object states to method signatures

Implement the changes using the approach used in #7.

Components that have OffsetContext as their state or API that need to be aware of multiple partitions:

  1. BaseSourceTask#lastOffset
  2. ChangeEventSourceCoordinator#previousOffset
  3. StreamingChangeEventSource implementations (e.g. SqlServerStreamingChangeEventSource#offsetContext)
  4. SnapshotChangeEventSource implementations (e.g. RelationalSnapshotChangeEventSource#previousOffset)
  5. ChangeEventSourceFactory (instantiates the previous two)
  6. AbstractChangeRecordEmitter#offsetContext and its implementations.

Integration testing

Once the code changes are implemented, add an integration test to capture changes from two databases.

Monitor the list of CDC-enabled databases on an instance

After #10 is implemented, each connector task will be able to work with its individual database taken from a pre-configured list of databases. In order to handle the provisioning of new customer instances, the connector should be able to automatically detect and track the list of CDC-enabled databases.

Acceptance criteria
  1. Introduce a configuration parameter that will tell the connector to auto-discover CDC-enabled databases instead of using a hard-coded list (e.g. database.names.source = cdc).
  2. In Connector#taskConfig(), if the configuration above is enabled, list the database names using SELECT name FROM sys.databases WHERE is_cdc_enabled = 1.
  3. Start a background thread that will periodically poll the list of CDC-enabled databases. The poll interval will be defined by the database.names.poll.interval.ms configuration parameter.
  4. If the fetched list of database names is different from the currently used, the connector should call via context.requestTaskReconfiguration().

Caveats:

  1. If the connector restarts during a snapshot, we'll have duplicates. It's currently unclear how to avoid that.
  2. If multiple databases are CDC-enabled in a short period of time, it may cause too often connector restarts and snapshot duplicates (see above).

Given the above caveats, we may want to implement only the initialization part at the beginning and restart the connector manually when needed w/o having to change its configuration.

Intermittent test failures

There is a certain set of tests that accidentally pass of fail which seems irrelevant to the PRs where they are run:

2021-04-19T16:13:35.8053958Z [ERROR] excludeColumnWhenCaptureInstanceExcludesColumns(io.debezium.connector.sqlserver.SqlServerConnectorIT)  Time elapsed: 46.689 s  <<< FAILURE!
2021-04-19T16:13:35.8056587Z java.lang.AssertionError: expecting actual value not to be null
2021-04-19T16:13:35.8058956Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.lambda$excludeColumnWhenCaptureInstanceExcludesColumns$60(SqlServerConnectorIT.java:1680)
2021-04-19T16:13:35.8063813Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.excludeColumnWhenCaptureInstanceExcludesColumns(SqlServerConnectorIT.java:1667)
2021-04-19T16:13:35.8066894Z 
2021-04-19T16:13:35.8068391Z [ERROR] testMaxLsnSelectStatementWithFalse(io.debezium.connector.sqlserver.SqlServerConnectorIT)  Time elapsed: 48.841 s  <<< FAILURE!
2021-04-19T16:13:35.8070212Z java.lang.AssertionError: expecting actual value not to be null
2021-04-19T16:13:35.8072544Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.lambda$testMaxLsnSelectStatementWithFalse$103(SqlServerConnectorIT.java:2563)
2021-04-19T16:13:35.8076452Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.testMaxLsnSelectStatementWithFalse(SqlServerConnectorIT.java:2550)
2021-04-19T16:13:35.8078775Z 
2021-04-19T16:13:35.8080273Z [ERROR] shouldPropagateSourceTypeByDatatype(io.debezium.connector.sqlserver.SqlServerConnectorIT)  Time elapsed: 46.589 s  <<< ERROR!
2021-04-19T16:13:35.8082254Z java.lang.RuntimeException: java.lang.NullPointerException
2021-04-19T16:13:35.8085159Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.shouldPropagateSourceTypeByDatatype(SqlServerConnectorIT.java:2442)
2021-04-19T16:13:35.8088145Z Caused by: java.lang.NullPointerException
2021-04-19T16:13:35.8090255Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.lambda$shouldPropagateSourceTypeByDatatype$100(SqlServerConnectorIT.java:2445)
2021-04-19T16:13:35.8094190Z 	at io.debezium.connector.sqlserver.SqlServerConnectorIT.shouldPropagateSourceTypeByDatatype(SqlServerConnectorIT.java:2442)

One of the reasons for the failures above has been fixed in #44. The common reason for the rest of the failures is that consumeRecordsByTopic() returns fewer records than requested.

This issue is critical since it makes developers spend unplanned time on diagnosing the build issues.

Acceptance criteria:
  1. Run the tests above locally and reproduce the failures (at least one of the above).
  2. Find the root cause (probably some race condition).
  3. Eliminate the root cause.
Developer notes:
  1. #44 addresses one of the race conditions (the about waiting for snapshot completion).

Prototype API changes at the BaseSourceTask level

The Debezium framework should drive iteration over multiple partitions per task. We need to implement the components that will describe multiple partitions at the task level before implementing the iteration.

See #15.

Recover database schema from multiple partitions in one pass

Currently, during connector start, the database schema is recovered from a single partition up to a given offset:

public final void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {

For a multi-partition connector task to function, it needs to know the schema of all partitions it will process.

Acceptance criteria:
Implement a method:

public final void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser);

Where offsets is a map of the sources above to their respective positions. During the comparison, the source info of each message should be used as a key in the offsets map to look up the corresponding offset. If the key doesn't exist, the message should be skipped. If the key exists, then the corresponding offset should be used for comparison in the same way as currently implemented.

In pseudo-code:
Before:

for (message : messages) {
  if (message.partition == partition && message.offset < offset) {
    apply(message);
  }
}

After:

for (message : messages) {
  if (offsets.contains(message.partition)
      && message.offset < offsets.get(message.partition)) {
      apply(message);
    }
  }
}

Implement iteration over partitions in the partitioned coordinator

Once #16 and #17 are resolved, rework the temporary code block to actually iterate over all partitions:
https://github.com/sugarcrm/debezium/blob/efc7d20e4d973ee5e04863b5923f154ae41a7c38/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java#L128-L132

Acceptance criteria:

  1. Rework SqlServerTaskPartition.Provider#getPartitions() to retrieve real names of all databases instead of the currently used one. See #21.
  2. Implement the iteration logic for the snapshot and the streaming sources.

At this point, the connector should be able to consequently snapshot and then alternate between streaming two databases.

Use fully-qualified names for all tables and procedures in Debezium connector for SQL Server

Acceptance criteria
  1. Each table name or stored procedure in each SQL query or statement executed by the connector is referenced by its fully qualified name including the database name.
  2. The ;databaseName=${" + JdbcConfiguration.DATABASE + "}" fragment of the JDBC configuration is removed from the SqlServerConnection URL pattern since it’s no longer needed.
  3. The project compiles, all tests pass.
Example workflow
  1. Identify all SQL fragments in the plugin codebase (e.g. the occurrences of "SELECT)
  2. Add a database placeholder to each table or procedure reference in each fragment, e.g.: SELECT sys.fn_cdc_get_max_lsn()SELECT [#].sys.fn_cdc_get_max_lsn().
  3. In the code that uses the fragment, implement the replacement of the placeholder with the database name. If the actual value is unavailable in the current scope, add a new method argument databaseName. Pass the database name from the connector configuration down the line where necessary.

Accept multiple database names by the connector and divide them between multiple tasks

Acceptance criteria
  1. Replace the database.dbname configuration parameter with database.names which accepts a comma-separated list of database names.
  2. Rework SqlServerConnector#taskConfigs() to return up to N configurations instead of one, where N = max(number of database names, maxTasks).
  3. Each task configuration should contain a task.database.names parameter.
  4. The list of database names is divided between the tasks in a round-robin fashion. For instance, given the list of database names [a, b, c, d, e] and maxTasks = 2, the connector should produce the following list of configurations: [[a, c, e], [b, d]].
  5. When connecting to the database, instead of using the value of database.dbname, each task should use the first value from its database list. Iteration over the list of the databases is out of the scope of this case.
  6. Demonstrate the ability of the connector to process two databases in parallel by scheduling one task per database (database.names = foo,bar; max.tasks = 2).

Remove leftovers of the singular dbname from SQL Server connector

Some tests and code still rely on database.dbname being present in the connector configuration. To make sure there's no hidden shared state that the connector may depend on, we need to get rid of it.

Acceptance criteria:

  1. database.dbname is an optional configuration parameter that internally resolves into a list of a single database.

Rethink the API around streaming execution state

Apart from the fix implemented in #44 (which may be a band aid), we may consider improving the following aspects of the code:

  1. Callers of offsetContext.getStreamingExecutionState() need to check for NULL before using the returned value:
    if (offsetContext.getStreamingExecutionState() != null && offsetContext.getStreamingExecutionState().getTablesSlot() != null) {

    It would be safer if the consumer of this API didn't have to know about the implementation details and just called some methods on some object (Law of Demeter).
  2. The current logic assumes that the events have been streamed unless the object is in one of the blacklisted states. If a new state is added, it will be automatically considered out of that list. It's safer to have the inverse logic (a whitelist):
    switch (streamingExecutionState.getStatus()) {
    case NO_CHANGES_IN_DATABASE:
    case NO_MAXIMUM_LSN_RECORDED:
    return false;
    default:
    return true;
    }
  3. Double-check if the streaming execution state has to be part of the offset context.

Acceptance criteria:

  1. Identify a safer API that still solves the original problem (#19).

Add partition awareness to base task components

In #15, there's the TaskOffsetContext implemented that contains all partitions and offsets to be processed by a task. Next, we need to implement the iteration over all task partitions in the event coordinator by introducing a parallel "partitioned" event source class hierarchy. There are two points to this:

  1. Not all connectors are partitioned by design (the MySQL one isn't).
  2. By introducing a new API, we'll be able to continue with the changes only for SQL Server and simplify the migration of other connectors to the new API later.

Acceptance criteria:

  1. Introduce the following classes and interfaces:
    • PartitionedChangeEventSourceCoordinator
    • PartitionedChangeEventSourceFactory
    • PartitionedSnapshotChangeEventSource
    • PartitionedStreamingChangeEventSource
      Apart from being parametrized with <O extends OffsetContext>, these APIs should be parametrized with <P extends TaskPartition>. This will allow the underlying components to use the database name of the currently processed partition via partition.getDatabaseName(). Copy/paste where extension is not possible (e.g. the coordinator).
  2. Alongside with the OffsetContext offsetContext arguments introduced in #13, add the TaskPartition partition argument where necessary.
  3. Move the loop introduced in #15 to the partitioned coordinator: https://github.com/sugarcrm/debezium/blob/efc7d20e4d973ee5e04863b5923f154ae41a7c38/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java#L128-L132 Leave the break temporarily, repeat for each source (snapshot, streaming).
  4. Update the SQL Server connector components to implement the partitioned API.

UPD: work on this issue showed that there's more classes apart from the Coordinator which need their APIs changed. Copy-pasting more code will increase the likelihood of being not in-sync with upstream when the changes are made to the duplicated code. In order to expedite development and lower this risk, let's take a shortcut:

  1. Instead of introducing new class hierarchy, add partition awareness to the existing one.
  2. Temporarily delete all plugins except for the one for SQL Server that would otherwise require code/API changes.

Move realDatabaseName out of the SqlServerConnection state

Currently, the "real" name of the source database is detected right after connecting to the database at the connection level:

public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode, SqlServerValueConverters valueConverters,
Supplier<ClassLoader> classLoaderSupplier) {
super(config, FACTORY, classLoaderSupplier);
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();

In order to make the connection reusable across multiple databases (#10), we need to:

  1. Detect the "real" name of all databases the task is working with.
  2. Make the real name available to all components that use it right now.

The most natural place for that seems to be the SqlServerTaskPartition and its Provider being introduced as part of #15.

Acceptance criteria:

  1. Add a dependency on SqlServerConnection to SqlServerPartitionTask.Provider.
  2. While initializing each task partition, execute a query like:
    SELECT name FROM sys.databases WHERE name = ?;
    It will return the database name in its actual case (e.g. mAsTeRmaster).
  3. If the query above doesn't return one and only one row, JDBC will throw an exception which the code should catch and log as a warning. No reason to fail the task.
  4. Initialize each SqlServerTaskPartition with a real database name instead of the configured one.
  5. Remove the current logic from the connection class.

Move the poll interval logic from the streaming source to corrdinator

Instead of pausing after not having seen any updates in a given database (source-level logic), the partitioned connector should pause after not having seen changes in any of its partitions. So this logic should be moved from SqlServerStreamingChangeEventSource to PartitionedChangeEventSourceCoordinator (#16).

Acceptance criteria:
When streaming changes, the connector should pause between iterations.

Previous logic:

while (true) {
    if (!hasChanges(databaseName)) {
        sleep();
    }
}

Required logic:

while (true) {
    hasChanges = false;

    for (databaseName : databaseNames) {
        if (hasChanges(db)) {
            hasChanges = true;
        }
    }

    if (!hasChanges) {
        sleep();
    }
}

Introduce the configuration for SQL Server task databases

Once #9 and #12 are resolved, replace the TODO in the code with the actual logic:
https://github.com/sugarcrm/debezium/blob/efc7d20e4d973ee5e04863b5923f154ae41a7c38/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskPartition.java#L55-L56

Acceptance criteria:

  1. Introduce the task.database.names configuration parameter doc using all the standards for describing the configuration (e.g. see debezium#2044 (comment)).
  2. Replace the temporary single database name with the list of database names.
  3. Add validation that the list is not empty (better if implemented at the configuration level via the framework, otherwise, the logic at the task level will suffice).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.