Giter Site home page Giter Site logo

tibigdata's People

Contributors

al-assad avatar ankita25 avatar daemonxiao avatar dependabot[bot] avatar feloxx avatar humengyu2012 avatar itinycheng avatar kolbe avatar marsishandsome avatar mengxin9014 avatar ngaut avatar purelind avatar pyscala avatar qidi1 avatar shiyuhang0 avatar sunnyholyd avatar sunxiaoguang avatar winkyao avatar xuanyu66 avatar youngwookim avatar zhangyangyu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tibigdata's Issues

[BUG] NoClassDefFoundError when print KeyError to string

Bug Report

1. Describe the bug

When prewrite failed with some KeyError, the code will invoke KeyError.toString, which will cause NoClassDefFoundError.

2. Minimal reproduce step (Required)

Since we don't know which KeyError has occurred, it's hard to reproduce

3. What did you see instead (Required)

Sometime is
image
Sometime is
image

4. What did you expect to see? (Required)

The KeyErr message.

5. how to reproduce the bug

    KeyError.getDefaultInstance().toString();

[BUG] incorrect uniqueIndex key when table is not intHandle

Describe the bug

When the value of unique index is null, it's not distinct and should add the handle to the key of the unique index

cluster index PR #index.go

But in TiBigData, it will add the handle to the key of the unique index when handle is PkHandle

https://github.com/tidb-incubator/TiBigData/blob/master/tidb/src/main/java/io/tidb/bigdata/tidb/key/IndexKey.java#L59-L94

What did you do

  private static final String TABLE =
      "CREATE TABLE IF NOT EXISTS `%s`.`%s`\n"
          + "("
          + "  `id` bigint(20),\n"
          + "  `name` varchar(255) primary Key CLUSTERED,\n"
          + "  `age` int(11) NULL DEFAULT NULL,\n"
          + "  unique key(`id`)"
          + ")";

  public void testUpsert() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    TableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
    String dstTable = RandomUtils.randomString();
    Map<String, String> properties = defaultProperties();
    properties.put(SINK_IMPL.key(), TIKV.name());
    properties.put(SINK_TRANSACTION.key(), SinkTransaction.GLOBAL.name();
    properties.put(WRITE_MODE.key(), "upsert");
    TiDBCatalog tiDBCatalog = initTiDBCatalog(dstTable, TABLE, tableEnvironment, properties);

    tableEnvironment.sqlUpdate(
        String.format(
            "INSERT INTO `tidb`.`%s`.`%s` " + "VALUES(cast(null as int), 'before', 1)",
            DATABASE_NAME, dstTable));
    tableEnvironment.execute("test");

  tableEnvironment.sqlUpdate(
        String.format(
            "INSERT INTO `tidb`.`%s`.`%s` " + "VALUES(cast(null as int), 'before2', 10)",
            DATABASE_NAME, dstTable));
    tableEnvironment.execute("test");
}

What do you expect

The two rows are inserted successfully.

What happens instead

image

Since null != null, the oldValue should be null, isEmptyArray(oldValue) should be true
image

BTW, only when needToAppendHandle is true, the value will be ZERO_BYTES.
https://github.com/tidb-incubator/TiBigData/blob/master/tidb/src/main/java/io/tidb/bigdata/tidb/codec/TiDBEncodeHelper.java#L184-L186
https://github.com/tidb-incubator/TiBigData/blob/master/tidb/src/main/java/io/tidb/bigdata/tidb/codec/TiDBEncodeHelper.java#L111-L113

Flink: Use an existing property instead of hard-coded value

There is a hard-coded value for scala version:

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${dep.flink.version}</version>
</dependency>

It would be good to replace with the property ${dep.scala.binary.version}

[Bug][Flink-table-sink] Why add unique key to the pk set when generating sink table schema

The flink-tidb-connector doesn't work as expected when I use upsert mode to sink data to TiDB, the reason for this is as in title, flink version: 1.13.x. Is there any reason for making unique key as part of the primary?

Code block:

private String[] getKeyFields(Context context, ReadableConfig config, String databaseName,
      String tableName) {
    // check write mode
    TiDBWriteMode writeMode = TiDBWriteMode.fromString(config.get(WRITE_MODE));
    String[] keyFields = null;
    if (writeMode == TiDBWriteMode.UPSERT) {
      try (ClientSession clientSession = ClientSession.create(
          new ClientConfig(context.getCatalogTable().toProperties()))) {
        Set<String> set = ImmutableSet.<String>builder()
            .addAll(clientSession.getUniqueKeyColumns(databaseName, tableName)) // Why add all unique keys to pk set?
            .addAll(clientSession.getPrimaryKeyColumns(databaseName, tableName))
            .build();
        keyFields = set.size() == 0 ? null : set.toArray(new String[0]);
      } catch (Exception e) {
        throw new IllegalStateException(e);
      }
    }
    return keyFields;
  }

The issue:
flink-tidb-connector use official jdbc connector flink-jdbc-connector to complete data wiriting to tidb;
In upsert mode, the records in the buffer of flink-jdbc-connector are deduplicated by the primary key, and execute executeBatch to flush data out is disordered(because type of buffer is HashMap), refer to: TableBufferReducedStatementExecutor.java;
These may cause multiple records with the same primary key in the same batch and write TiDB out of order.

ClassCastException when commonHandle with type `Timestamp` and `Date`

When Clustered index with type Timestamp and Date, it will throw ClassCastException

image

It is caused by the code below in CommonHandle

 if (dataTypes[i].getType().equals(MySQLType.TypeTimestamp)) {
        dataTypes[i].encode(cdo, DataType.EncodeType.KEY, ((long) data[i]) / 1000);
      } else if (dataTypes[i].getType().equals(MySQLType.TypeDate)) {
        long days = (long) data[i];
        if (Converter.getLocalTimezone().getOffset(0) < 0) {
          days += 1;
        }
        dataTypes[i].encode(cdo, DataType.EncodeType.KEY, new Date((days) * 24 * 3600 * 1000));
      } 

presto 查询tidb 日期错误

你好 这边使用tidb驱动出现的问题,如下图
CREATE TABLE receiving (
receiving_id int(11) NOT NULL,
expected_date date NOT NULL DEFAULT '0000-00-00',
UNIQUE KEY udx_receiving_id (receiving_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
使用presto tidb驱动查询 和使用mysql驱动查询

image
image

Any plans to publish TiDB Flink connector to Maven Central Repository?

Hi there,

Integrating TiDB with Flink is a great idea, as Flink is superb in real-time computing and TiDB is suitable for quick data access. However, I found that the connector is only found in this GitHub repo and not uploaded to Maven repo yet, which is not convenient for project setup and future maintenance.

May I know if there are any plans to publish TiDB Flink connector to Maven repo?

Thanks : )

Presto query error: getConnectorPlanOptimizers()

Hi there,

I was testing TiBigData recently and successfully compiled. But when I used Presto to query, the following error was reported:
io.tidb.bigdata.prestodb.tidb.optimization.TiDBPlanOptimizerProvider.getConnectorPlanOptimizers()Ljava/util/Set;

Is there anyone konws what happend and how to resolve ?

Thanks : )

Roadmap 2022

This is TiBigData roadmap 2022.

It's under iterative edition and we welcome everyone to make advice.

Features

Flink

  • Fast Lookup with PK & Secondary Indices
  • Support Pre Split & Auto Random & Auto Increment when batch load
  • Support Upsert & Delete DML
  • Unified Batch & Streaming Processing
  • Support Catalog
  • Support External Connector Interface
  • Support FDW integration and Cloud EMR integration
  • Support Async Query
  • SSL Support
  • Telemetry

Spark (we'd like to merge TiSpark into TiBigData)

  • Support Pre Split & Auto Random & Auto Increment when batch load
  • Support Upsert & Delete DML
  • Unified Batch & Streaming Processing
  • Support Catalog
  • Support External Connector Interface
  • Support FDW integration and Cloud EMR integration
  • Support Async Query
  • SSL Support
  • Telemetry

Quality

  • Establish GA criteria

[Proposal] Flink Delete proposal

Problems

Currently, TiBigData#Flink doesn't support DELETE Rowkind in the TiKV sink, in other words, we can't consume delete changelog to execute delete.
As a real batch&streaming engine, It's better to support delete in Flink.

Goals

  • Support delete in TiKV sink with upsert mode by a new configuration sink.tikv.delete (false by default)
  • Only support in the minibatch transaction, global is not supported
  • Only support delete in cluster index so far

Solution

TiBigData is not able to support DELETE statement before Flink SQL supports it. Thus, we only support delete in Unified Batch & Streaming Mode with TiCDC->Kafka source.

  1. Add a configuration to open delete
  2. Use a new class TiRow to distinguish between delete Rowkind and insert/update Rowkind in MiniBatch and ignore delete Rowkind in Global.
  3. Optimize deduplication logic
  4. Exclude delete Rowkind to upsert when flush buffer
  5. Use delete Rowkind to delete when flush buffer
    • get handle from pk
    • encode key and value with handle
    • 2PC to delete

Limitation

  • We can not guarantee the order once data is out of order in Kafka
  • We can not guarantee the order when your parallelism is more than 1 in minibatch

Design

#201

Implement

#202

Test

#202

TiDB JDBC driver is not optimized for TP workload

Existing TiDB JDBC driver discovers real tidb servers every time when establishing a connection. This is simple however suboptimal for TP workload. A more sophisticated implementation can help it work better in latency sensitive scenarios.

[BUG] Flink 1.14: No suitable driver found for jdbc url

Describe the bug

When I try to compile flink-1.14 into a jar and use it in SQL-Flink client, I met a mistake.

What did you do

  1. compile
mvn clean package -DskipTests -am -pl flink/flink-1.14 -Ddep.flink.version=1.14.4 -Dmysql.driver.scope=compile -Dflink.jdbc.connector.scope=compile -Dflink.kafka.connector.scope=compile
  1. cp to Flink/lib
cp ${TIBIGDATA_HOME}/flink/flink-1.14/target/flink-tidb-connector-1.14-${TIBIGDATA_VERSION}.jar lib
  1. run
cd ~/Flink/
bin/sql-client.sh
Flink SQL> CREATE CATALOG `tidb2`
> WITH (
>     'type' = 'tidb',
>     'tidb.database.url' = 'jdbc:mysql://127.0.0.1:4000/tispark_test',
>     'tidb.username' = 'root',
>     'tidb.password' = '',
> );
[ERROR] Could not execute SQL statement. Reason:
java.sql.SQLException: No suitable driver found for jdbc:mysql://127.0.0.1:4000/tispark_test

What do you expect

Flink SQL> CREATE CATALOG `tidb2`
> WITH (
>     'type' = 'tidb',
>     'tidb.database.url' = 'jdbc:mysql://127.0.0.1:4000/tispark_test',
>     'tidb.username' = 'root',
>     'tidb.password' = '',
> );
[INFO] Execute statement succeed.

What happens instead

Here is error stack.

Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute SQL statement.
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:211) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.cli.CliClient.executeOperation(CliClient.java:625) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:447) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_332]
	at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client_2.12-1.14.4.jar:1.14.4]
Caused by: org.apache.flink.table.api.ValidationException: Could not execute CREATE CATALOG: (catalogName: [tidb2], properties: [{tidb.password=, tidb.username=root, tidb.cluster-jks-trust-path=/config/cert/jks/server-cert-store, tidb.cluster-jks-enable=true, type=tidb, tidb.cluster-jks-trust-password=12345678, tidb.cluster-tls-enable=true, tidb.cluster-jks-key-path=/config/cert/jks/client-keystore, tidb.database.url=jdbc:mysql://127.0.0.1:4000/tispark_test, tidb.cluster-jks-key-password=123456}])
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1301) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1122) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	... 11 more
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Can not init client session
	at io.tidb.bigdata.flink.connector.TiDBCatalog.initClientSession(TiDBCatalog.java:107) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.flink.connector.TiDBCatalog.open(TiDBCatalog.java:115) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1297) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1122) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	... 11 more
Caused by: java.lang.IllegalStateException: can not get pdAddresses
	at io.tidb.bigdata.tidb.ClientSession.loadPdAddresses(ClientSession.java:320) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.tidb.ClientSession.<init>(ClientSession.java:103) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.tidb.ClientSession.create(ClientSession.java:589) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.flink.connector.TiDBCatalog.initClientSession(TiDBCatalog.java:105) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.flink.connector.TiDBCatalog.open(TiDBCatalog.java:115) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1297) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1122) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	... 11 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://127.0.0.1:4000/tispark_test
	at java.sql.DriverManager.getConnection(DriverManager.java:689) ~[?:1.8.0_332]
	at java.sql.DriverManager.getConnection(DriverManager.java:247) ~[?:1.8.0_332]
	at io.tidb.bigdata.tidb.JdbcConnectionProviderFactory$BasicJdbcConnectionProvider.getConnection(JdbcConnectionProviderFactory.java:62) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.tidb.ClientSession.loadPdAddresses(ClientSession.java:311) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.tidb.ClientSession.<init>(ClientSession.java:103) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.tidb.ClientSession.create(ClientSession.java:589) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.flink.connector.TiDBCatalog.initClientSession(TiDBCatalog.java:105) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at io.tidb.bigdata.flink.connector.TiDBCatalog.open(TiDBCatalog.java:115) ~[flink-tidb-connector-1.14-0.0.5-SNAPSHOT.jar:?]
	at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1297) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1122) ~[flink-table_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:209) ~[flink-sql-client_2.12-1.14.4.jar:1.14.4]
	... 11 more

Flink and TiBigData version info

  • Flink version: 1.14.4
  • TiBigData Flink version: 1.14

MRConnector doesn't refresh tikv hosts when a node is rotated in a cluster

It tries to connect to old tikv nodes even after tidb makes them "tombstone". When running TiDBMRDemo , I see following errors

Feb 01, 2022 12:53:06 AM org.tikv.shade.io.grpc.internal.ManagedChannelImpl$NameResolverListener handleErrorInSyncContext
WARNING: [Channel<21>: (XXX-YYY-ZZZ.com:20160)] Failed to resolve name. status=Status{code=UNAVAILABLE, description=Unable to resolve host XXX-YYY-ZZZ..com, cause=java.lang.RuntimeException: java.net.UnknownHostException: XXX-YYY-ZZZ..com: Name or service not known
at org.tikv.shade.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:223)
at org.tikv.shade.io.grpc.internal.DnsNameResolver.doResolve(DnsNameResolver.java:282)
at org.tikv.shade.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:318)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: XXX-YYY-ZZZ.com: Name or service not known
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at org.tikv.shade.io.grpc.internal.DnsNameResolver$JdkAddressResolver.resolveAddress(DnsNameResolver.java:631)
at org.tikv.shade.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:219)
... 5 more
}
Feb 01, 2022 12:53:06 AM org.tikv.shade.io.grpc.internal.ManagedChannelImpl$NameResolverListener handleErrorInSyncContext
WARNING: [Channel<31>: (XXX-YYY-ZZZ.com:20160)] Failed to resolve name. status=Status{code=UNAVAILABLE, description=Unable to resolve host XXX-YYY-ZZZ.com, cause=java.lang.RuntimeException: java.net.UnknownHostException: XXX-YYY-ZZZ.com: Name or service not known
at org.tikv.shade.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:223)
at org.tikv.shade.io.grpc.internal.DnsNameResolver.doResolve(DnsNameResolver.java:282)
at org.tikv.shade.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:318)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: XXX-YYY-ZZZ.com: Name or service not known
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at org.tikv.shade.io.grpc.internal.DnsNameResolver$JdkAddressResolver.resolveAddress(DnsNameResolver.java:631)
at org.tikv.shade.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:219)
... 5 more
}

[Proporsal] Flink connector write bypass TiDB

Problem

Currently, Fink connector uses JdbcDynamicTableSink to write data through TiDB server.

However, this approach leads to the following problems.

  • Lack of atomicity, if the Flink task fails and exists during the writing process, some data will be written successfully while the others not.
  • Lack of isolation, other transactions can see part of the written data during the writing process.
  • Lack of failure recovery mechanism, tasks need to be idempotency, for example, after failure, you need to clean up written data first, and then re-run the Flink program.
  • Latency is higher than directly writing to TiKV.
  • Streaming or batch processing of large data will affect the performance of TiDB server and thus affect online business queries.

Solution

Fink connector direct-write TiKV can solve these problems.

  • TiKV provides transaction support for atomic, isolation, and failure recovery.
  • Writing directly to TiKV has lower latency and thus does not affect the TiDB server.

Design

#164

Implement

#161

Test

#161

Doc

#161

Warning when creating TiDB catalog: deprecated driver class

Flink version: 1.12

  1. start a flink sql-client with flink-tidb-connector-1.12-0.0.2-SNAPSHOT.jar, mysql-connector-java-8.0.21.jar
./bin/sql-client.sh embedded
  1. create tidb catalog
create catalog tidb with (
   'type' = 'tidb',
   'tidb.database.url' = 'jdbc:tidb://172.16.5.81:23400/test',
   'tidb.username' = 'root', 'tidb.password' = ''
);
  1. get this warning
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] Catalog has been created.

[BUG] TIKV sink's upsert bug

Version

  • TiBigData Flink 1.14
  • TiDB v6.0.0
  • TiCDC v6.0.0
  • Kafka 2.13-3.2.0

Bug

  1. create table
CREATE TABLE `test`.`flink_sink`  (
  `id` bigint(20) NOT NULL,
  `name` varchar(255) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
);

CREATE TABLE `test`.`flink`  (
  `id` bigint(20) NOT NULL,
  `name` varchar(255) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
);
  1. Use Kafka source get change log from TiCDC
  2. create catalog in sql-client and set
    • 'tidb.sink.impl' = 'TiKV'
    • 'tikv.sink.buffer-size' = '1'
    • 'tidb.write_mode'='upsert'
CREATE CATALOG `tidb` WITH (   'type' = 'tidb',   'tidb.database.url' = 'jdbc:mysql://127.0.0.1:4000?useSSL=false',   'tidb.username' = 'root',   'tidb.password' = '',   'tidb.streaming.source' = 'kafka',   'tidb.streaming.codec' = 'json',   'tidb.streaming.kafka.bootstrap.servers' = 'localhost:9092',   'tidb.streaming.kafka.topic' = 'cdc',   'tidb.streaming.kafka.group.id' = 'test_cdc_group',   'tidb.streaming.ignore-parse-errors' = 'true' ,'tidb.sink.impl'='TIKV','tikv.sink.buffer-size'='1','tidb.write_mode'='upsert')
  1. execute SQL in flink sql-client
INSERT INTO `tidb`.`test`.`flink_sink` SELECT id,name FROM `tidb`.`test`.`flink`
  1. Use JDBC to insert data into flink_sink
INSERT into test.flink_sink VALUES (1,1)
  1. Use JDBC to insert data into flink
INSERT into test.flink VALUES (1,1_update)
  1. check flink_sink: (1,1) will be deleted rather than upsert

Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]

public class TidbAlarmClearUp {

public static void main(String[] args) throws Exception {

    //运行job
    String jobName = TidbAlarmClearUp.class.getSimpleName();

    // set up the streaming execution environment
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

    bsTableEnv.executeSql("CREATE TABLE flinkTest(\n"
            + " id     bigint,\n"
            + " ps_id  int,\n"
            + " ps_key string,\n"
            + " happen_time timestamp\n"
            + ") WITH (\n"
            + "  'connector' = 'tidb',\n"
            + "  'tidb.database.url' = 'jdbc:tidb://tc-fat-tidb.tidb-fat:4000/health',\n"
            + "  'tidb.driver' = 'com.mysql.jdbc.Driver',\n"
            + "  'tidb.database.name' = 'xxxx',\n"
            + "  'tidb.table.name' = 'flink_test',\n"
            + "  'tidb.maximum.pool.size' = '10',\n"
            + "  'tidb.minimum.idle.size' = '10',"
            + "  'tidb.username' = 'xxxx',\n"
            + "  'tidb.password' = 'xxxxx'\n"
            + ")"
    );

    // execute Table
    TableResult tableResult1 = bsTableEnv.executeSql(
            "INSERT INTO flinkTest(id,ps_id,ps_key,happen_time) " +
                     "SELECT id,ps_id,ps_key,happen_time FROM HealthOnlineAlarm ");
}

}

2020-11-20 09:02:47,044 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, HealthOnlineAlarm]], fields=[id, ps_id, ps_key, happen_time]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (e28717441b6b8eb1ba8013dc2e6a5527) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:204) ~[?:1.8.0_265]
at com.zhihu.tibigdata.flink.tidb.TypeUtils.getObjectWithDataType(TypeUtils.java:146) ~[flink-tidb-connector-0.0.1.jar:?]
at com.zhihu.tibigdata.flink.tidb.TiDBRowDataInputFormat.nextRecord(TiDBRowDataInputFormat.java:167) ~[flink-tidb-connector-0.0.1.jar:?]
at com.zhihu.tibigdata.flink.tidb.TiDBRowDataInputFormat.nextRecord(TiDBRowDataInputFormat.java:53) ~[flink-tidb-connector-0.0.1.jar:?]
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

Unable to start Trino after using PrestoSQL-TiDB-Connector

I followed this doc to build the connector and the build finished successfully.
https://github.com/tidb-incubator/TiBigData/tree/master/prestosql

my tidb plugin folder content as blew:

root@trino-worker01:/home/trino/trino-server-361/etc/catalog# ls -l /home/trino/trino-server-361/plugin/tidb/
total 38940
-rw-r--r-- 1 root root     4467 9月  28 15:03 aopalliance-1.0.jar
-rw-r--r-- 1 root root    44329 9月  28 15:03 bigdata-core-0.0.5-SNAPSHOT.jar
-rw-r--r-- 1 root root    26353 9月  28 15:03 bootstrap-202.jar
-rw-r--r-- 1 root root   805301 9月  28 15:03 bval-jsr-2.0.0.jar
-rw-r--r-- 1 root root   415348 9月  28 15:03 cglib-nodep-3.3.0.jar
-rw-r--r-- 1 root root   230905 9月  28 15:03 checker-qual-3.8.0.jar
-rw-r--r-- 1 root root    83762 9月  28 15:03 configuration-202.jar
-rw-r--r-- 1 root root    13854 9月  28 15:03 error_prone_annotations-2.5.1.jar
-rw-r--r-- 1 root root     4617 9月  28 15:03 failureaccess-1.0.1.jar
-rw-r--r-- 1 root root  2874025 9月  28 15:03 guava-30.1.1-jre.jar
-rw-r--r-- 1 root root   856934 9月  28 15:03 guice-4.2.3.jar
-rw-r--r-- 1 root root   156005 9月  28 15:03 HikariCP-3.4.5.jar
-rw-r--r-- 1 root root     8781 9月  28 15:03 j2objc-annotations-1.3.jar
-rw-r--r-- 1 root root    68167 9月  28 15:03 jackson-annotations-2.11.1.jar
-rw-r--r-- 1 root root   349108 9月  28 15:03 jackson-core-2.10.3.jar
-rw-r--r-- 1 root root  1404171 9月  28 15:03 jackson-databind-2.10.3.jar
-rw-r--r-- 1 root root    83893 9月  28 15:03 jackson-datatype-guava-2.10.3.jar
-rw-r--r-- 1 root root    34402 9月  28 15:03 jackson-datatype-jdk8-2.10.3.jar
-rw-r--r-- 1 root root    72117 9月  28 15:03 jackson-datatype-joda-2.10.3.jar
-rw-r--r-- 1 root root   105898 9月  28 15:03 jackson-datatype-jsr310-2.10.3.jar
-rw-r--r-- 1 root root     9328 9月  28 15:03 jackson-module-parameter-names-2.10.3.jar
-rw-r--r-- 1 root root    26586 9月  28 15:03 javax.annotation-api-1.3.2.jar
-rw-r--r-- 1 root root     2497 9月  28 15:03 javax.inject-1.jar
-rw-r--r-- 1 root root    16537 9月  28 15:03 jcl-over-slf4j-1.7.30.jar
-rw-r--r-- 1 root root  2920901 9月  28 15:03 jmxutils-1.21.jar
-rw-r--r-- 1 root root    32601 9月  28 15:03 json-202.jar
-rw-r--r-- 1 root root    19936 9月  28 15:03 jsr305-3.0.2.jar
-rw-r--r-- 1 root root     2199 9月  28 15:03 listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
-rw-r--r-- 1 root root     3908 9月  28 15:03 log-202.jar
-rw-r--r-- 1 root root    23702 9月  28 15:03 log4j-over-slf4j-1.7.30.jar
-rw-r--r-- 1 root root   471901 9月  28 15:03 logback-core-1.2.3.jar
-rw-r--r-- 1 root root    19470 9月  28 15:03 log-manager-202.jar
-rw-r--r-- 1 root root  1006956 9月  28 15:45 mysql-connector-java-5.1.48.jar
-rw-r--r-- 1 root root    48203 9月  28 15:03 prestosql-connector-0.0.5-SNAPSHOT.jar
-rw-r--r-- 1 root root    41203 9月  28 15:03 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root     8510 9月  28 15:03 slf4j-jdk14-1.7.30.jar
-rw-r--r-- 1 root root   103273 9月  28 15:03 tidb-jdbc-0.0.5-SNAPSHOT.jar
-rw-r--r-- 1 root root 27276468 9月  28 15:03 tikv-client-java-3.2.0-SNAPSHOT.jar
-rw-r--r-- 1 root root    19555 9月  28 15:03 units-1.6.jar
-rw-r--r-- 1 root root    93107 9月  28 15:03 validation-api-2.0.1.Final.jar

And I add the catalog for tidb, content as blow.

connector.name=tidb
tidb.database.url=jdbc:mysql://xxx:3306/xxx
tidb.username=xxx
tidb.password=xxx
tidb.filter-push-down=true

But after I restart the trino, it errored as below.

2021-09-28T15:49:51.601+0800    ERROR   main    io.trino.server.Server  No service providers of type io.trino.spi.Plugin
java.lang.IllegalStateException: No service providers of type io.trino.spi.Plugin
        at com.google.common.base.Preconditions.checkState(Preconditions.java:591)
        at io.trino.server.PluginManager.loadPlugin(PluginManager.java:139)
        at io.trino.server.PluginManager.loadPlugin(PluginManager.java:129)
        at io.trino.server.ServerPluginsProvider.loadPlugins(ServerPluginsProvider.java:48)
        at io.trino.server.PluginManager.loadPlugins(PluginManager.java:110)
        at io.trino.server.Server.doStart(Server.java:122)
        at io.trino.server.Server.lambda$start$0(Server.java:77)
        at io.trino.$gen.Trino_361____20210928_074943_1.run(Unknown Source)
        at io.trino.server.Server.start(Server.java:77)
        at io.trino.server.TrinoServer.main(TrinoServer.java:38)

My trino version: 361
mysql connector I use: mysql-connector-java-5.1.48.jar

Please help if anything wrong with my setup?

[BUG] deduplicate with null unique key

Describe the bug

TiBigData insert with a row with null unique key has unexpected results

MINIBATCH

What did you do

  private static final String TABLE =
      "CREATE TABLE IF NOT EXISTS `%s`.`%s`\n"
          + "("
          + "  `id` bigint(20),\n"
          + "  `name` varchar(255) primary Key CLUSTERED,\n"
          + "  `age` int(11) NULL DEFAULT NULL,\n"
          + "  unique key(`id`)"
          + ")";

  public void testUpsert() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    TableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
    String dstTable = RandomUtils.randomString();
    Map<String, String> properties = defaultProperties();
    properties.put(SINK_IMPL.key(), TIKV.name());
    properties.put(SINK_TRANSACTION.key(), SinkTransaction.MINIBATCH.name();
    properties.put(WRITE_MODE.key(), "upsert");
    TiDBCatalog tiDBCatalog = initTiDBCatalog(dstTable, TABLE, tableEnvironment, properties);

    tableEnvironment.sqlUpdate(
        String.format(
            "INSERT INTO `tidb`.`%s`.`%s` " + "VALUES(cast(null as int), 'before', 1), (cast(null as int), 'before', 1)",
            DATABASE_NAME, dstTable));
    tableEnvironment.execute("test");
}

What do you expect

Null should not equal null, no exception should be thrown

What happens instead

image
image

Global

What did you do

  private static final String TABLE =
      "CREATE TABLE IF NOT EXISTS `%s`.`%s`\n"
          + "("
          + "  `id` bigint(20),\n"
          + "  `name` varchar(255) primary Key CLUSTERED,\n"
          + "  `age` int(11) NULL DEFAULT NULL,\n"
          + "  unique key(`id`)"
          + ")";

  public void testUpsert() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    TableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
    String dstTable = RandomUtils.randomString();
    Map<String, String> properties = defaultProperties();
    properties.put(SINK_IMPL.key(), TIKV.name());
    properties.put(SINK_TRANSACTION.key(), SinkTransaction.GLOBAL.name();
    properties.put(WRITE_MODE.key(), "upsert");
    TiDBCatalog tiDBCatalog = initTiDBCatalog(dstTable, TABLE, tableEnvironment, properties);

    tableEnvironment.sqlUpdate(
        String.format(
            "INSERT INTO `tidb`.`%s`.`%s` " + "VALUES(cast(null as int), 'before', 1), (cast(null as int), 'before', 1)",
            DATABASE_NAME, dstTable));
    tableEnvironment.execute("test");
}

What do you expect

Null should not equal null, no exception should be thrown

What happens instead

image

  @Override
  public void processElement(
      Row row, KeyedProcessFunction<List<Object>, Row, Row>.Context ctx, Collector<Row> out)
      throws Exception {
    if (existState.value() == null) {
      existState.update(true);
      out.collect(row);
    } else if (!sinkOptions.isDeduplicate()) {
      String names = String.join(", ", uniqueIndexColumnNames);
      String values =
	//Object::toString throw java.lang.NullPointerException
          ctx.getCurrentKey().stream().map(Object::toString).collect(Collectors.joining(", "));
      throw new IllegalStateException(
          String.format(
              "Duplicate index in one batch, please enable deduplicate, index: [%s] = [%s]",
              names, values));
    }
  }

[Proposal] Real-time wide table join

Problem

Currently, Flink doesn't support the statement INSERT ... ON DUPLICATE KEY UPDATE.
It is a common requirement that updates some fields based on the primary key in the Materialized View scenario.
image

Solution

  • Use SQL hints to Achieve semantics INSERT ... ON DUPLICATE KEY UPDATE.
  • Prune column to get the needed columns
  • Use InsertOrDuplicateKeyUpdateOutputFormat to generate statement INSERT ... ON DUPLICATE KEY UPDATE and batch execute them in TiDB.

Design

#198

Implement

#196

Test

#196

Doc

#196

How TiDB-Flink partitions data to tidb sink?

I have two tables,

Source:

CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'kafka',
    'format'='custom-ogg',
    ...
  );

Sink:

CREATE TABLE all_users_sink (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'connector'='tidb';
    // ...
  );

My question is, when the concurrency of the sink is not 1 and the data of the same PRIMARY KEY exists in multiple concurrency, Can the sink ensure that the data update sequence of the same PRIMARY KEY among multiple concurrency is consistent? Or the data with same PRIMARY will be partitioned to the same sink?

I'm a little bit confused, can you answer this question? Thanks!

Flink connector throw exception when reading region

Flink connector throw exception when reading table. The number of records is about 50 million.

TIDB VERSION
v4.0.9

POM CONFIG

<dependency>
        <groupId>io.tidb</groupId>
        <artifactId>flink-tidb-connector-1.13</artifactId>
        <version>0.0.4</version>
</dependency>

DDL

create table if not exists test (
    `id` string
  )
with (
    'connector' = 'tidb',
    'tidb.database.url' = 'jdbc:mysql://ip:port/database',
    'tidb.table.name' = 'test',
    'tidb.database.name' = 'test',
    'tidb.username' = 'test',
    'tidb.password' = 'test'

EXCEPTION

org.tikv.common.exception.TiClientInternalException: Error reading region:
	at org.tikv.common.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:184)
	at org.tikv.common.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:161)
	at org.tikv.common.operation.iterator.DAGIterator.hasNext(DAGIterator.java:107)
	at io.tidb.bigdata.tidb.RecordCursorInternal.advanceNextPosition(RecordCursorInternal.java:49)
	at io.tidb.bigdata.flink.connector.source.reader.TiDBSourceSplitRecords.nextRecordFromSplit(TiDBSourceSplitRecords.java:74)
	at io.tidb.bigdata.flink.connector.source.reader.TiDBSourceSplitRecords.nextRecordFromSplit(TiDBSourceSplitRecords.java:35)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:125)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.tikv.common.exception.RegionTaskException: Handle region task failed:
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.tikv.common.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:179)
	... 18 more
Caused by: org.tikv.common.exception.RegionTaskException: Handle region task failed:
	at org.tikv.common.operation.iterator.DAGIterator.process(DAGIterator.java:227)
	at org.tikv.common.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:85)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.tikv.common.exception.GrpcException: retry is exhausted.
	at org.tikv.common.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:148)
	at org.tikv.common.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:119)
	at org.tikv.common.operation.RegionErrorHandler.handleRequestError(RegionErrorHandler.java:176)
	at org.tikv.common.operation.KVErrorHandler.handleRequestError(KVErrorHandler.java:115)
	at org.tikv.common.operation.KVErrorHandler.handleResponseError(KVErrorHandler.java:91)
	at org.tikv.common.policy.RetryPolicy.callWithRetry(RetryPolicy.java:87)
	at org.tikv.common.AbstractGRPCClient.callWithRetry(AbstractGRPCClient.java:85)
	at org.tikv.common.region.RegionStoreClient.coprocess(RegionStoreClient.java:612)
	at org.tikv.common.operation.iterator.DAGIterator.process(DAGIterator.java:214)
	... 7 more
Caused by: org.tikv.common.exception.GrpcException: send tikv request error: Request Failed with unknown reason for [{Region[7134] ConfVer[5] Version[715] Store[2] KeyRange[t\200\000\000\000\000\000\005%_r\223\353\361\356\346\355\223\244]:[t\200\000\000\000\000\000\005%_r\223\353\361\374<\322\000a]}], try next peer later
	at org.tikv.common.operation.RegionErrorHandler.handleRequestError(RegionErrorHandler.java:179)
	... 13 more
Caused by: org.tikv.common.exception.GrpcException: Request Failed with unknown reason for [{Region[7134] ConfVer[5] Version[715] Store[2] KeyRange[t\200\000\000\000\000\000\005%_r\223\353\361\356\346\355\223\244]:[t\200\000\000\000\000\000\005%_r\223\353\361\374<\322\000a]}]
	... 12 more


When the DDL statement is different from the actual schema in the database, ArrayIndexOutOfBoundsException will be reported

DDL

CREATE TABLE if not exists table_a (
       `user_id` BIGINT NULL COMMENT '',
       `id` BIGINT NULL COMMENT '',
       `position_id` BIGINT NULL COMMENT '',
       `status` STRING NULL COMMENT '',
       `transaction_id` BIGINT NULL COMMENT '',
    PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
    ) WITH(
          'connector'='kafka',
          'topic'='xxxx',
          'properties.bootstrap.servers'='xxx',
          'properties.group.id'='xxx',
          'properties.auto.offset.reset'='earliest',
          'scan.startup.mode'='earliest-offset',
          'format'='debezium-avro-confluent',
          'debezium-avro-confluent.schema-registry.url'='xxxx'
          );
CREATE TABLE if not exists table_b (
     `user_id` BIGINT NULL COMMENT '',
     `id` BIGINT NULL COMMENT '',
     `position_id` BIGINT NULL COMMENT '',
     `status` STRING NULL COMMENT '',
     `transaction_id` BIGINT NULL COMMENT '',
    ) WITH (
          'connector' = 'tidb',
          'tidb.database.url' = 'jdbc:mysql://xxxx',
          'tidb.username' = 'xxxx',
          'tidb.password' = 'xxxxx',
          'tidb.database.name' = 'xxxxx',
          'tidb.maximum.pool.size' = '1',
          'tidb.minimum.idle.size' = '1',
          'tidb.table.name' = 'withdraws',
          'tidb.write_mode' = 'upsert',
          'sink.buffer-flush.max-rows' = '0'
          );
insert into table_b select * from table_a;

The actual schema in tidb has one more auto-increment column than table_b, and the following error is reported when the task is started.

java.lang.ArrayIndexOutOfBoundsException: -1
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_291]
	at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_291]
	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_291]
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_291]
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-tidb-connector-1.13-0.0.4.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
2021-11-19 07:55:36,996 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 25e6800fc67392651c32db54b2fcc483 

[BUG] Load databases error when initial client session

Describe the bug
Load databases when initial client session, which cause task to fail.

What happens instead
Caused by: java.lang.IllegalArgumentException: invalid encoded hash data key prefix: m at org.tikv.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.codec.MetaCodec.decodeHashDataKey(MetaCodec.java:71) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.codec.MetaCodec.hashGetFields(MetaCodec.java:122) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.catalog.CatalogTransaction.getDatabases(CatalogTransaction.java:75) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.catalog.Catalog$CatalogCache.loadDatabases(Catalog.java:195) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.catalog.Catalog$CatalogCache.<init>(Catalog.java:136) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.catalog.Catalog$CatalogCache.<init>(Catalog.java:124) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.catalog.Catalog.<init>(Catalog.java:44) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at org.tikv.common.TiSession.getCatalog(TiSession.java:396) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at io.tidb.bigdata.tidb.ClientSession.<init>(ClientSession.java:120) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?] at io.tidb.bigdata.tidb.ClientSession.create(ClientSession.java:529) ~[pool-1-thread-7-1655696784783-flink-tidb-connector-1.13-0.0.5-SNAPSHOT.jar:?]

Additional context
TiDB version: 5.2.1

Support Feature Lookup

现在调研 tidb catalog,尝试下来发现维表 Join 暂时还不支持,问下:

  1. Source 大概什么时候可以支持 LookupTableSource 特性
  2. Source 部分重写了,和 Flink JDBC 具体的区别是什么?直连tikv 分片加速?

JDBC Connector URL Parse returns exception

I use flink tidb connector for put the CDC data in kafka to Tidb,But some exception happend,if i use jdbc:mysql://ip1,ip2,ip3:4000/database1 to connect,flink job has fail,and error log is
java.sql.SQLSyntaxErrorException: function READ ONLY has only noop implementation in tidb now, use tidb_enable_noop_functions to enable these functions.
but when i use this url :jdbc:mysql://ip1:4000,ip2:4000,ip3:4000/database1,flink job is run,Why does this happen?if my url is wrong,why return read only error log?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.