Giter Site home page Giter Site logo

long2ice / synch Goto Github PK

View Code? Open in Web Editor NEW
342.0 8.0 100.0 2.21 MB

Sync data from the other DB to ClickHouse(cluster)

Home Page: https://github.com/long2ice/synch

License: Apache License 2.0

Python 99.02% Dockerfile 0.27% Makefile 0.71%
clickhouse mysql replication postgresql data-etl increment-etl kafka

synch's Introduction

Synch

pypi docker license workflows workflows

中文文档

Introduction

Sync data from other DB to ClickHouse, current support postgres and mysql, and support full and increment ETL.

synch

Features

  • Full data etl and real time increment etl.
  • Support DDL and DML sync, current support add column and drop column and change column of DDL, and full support of DML also.
  • Email error report.
  • Support kafka and redis as broker.
  • Multiple source db sync to ClickHouse at the same time。
  • Support ClickHouse MergeTree,CollapsingMergeTree,VersionedCollapsingMergeTree,ReplacingMergeTree.
  • Support ClickHouse cluster.

Requirements

  • Python >= 3.7
  • redis, cache mysql binlog file and position and as broker, support redis cluster also.
  • kafka, need if you use kafka as broker.
  • clickhouse-jdbc-bridge, need if you use postgres and set auto_full_etl = true, or exec synch etl command.

Install

> pip install synch

Usage

Config file synch.yaml

synch will read default config from ./synch.yaml, or you can use synch -c specify config file.

See full example config in synch.yaml.

Full data etl

Maybe you need make full data etl before continuous sync data from MySQL to ClickHouse or redo data etl with --renew.

> synch --alias mysql_db etl -h

Usage: synch etl [OPTIONS]

  Make etl from source table to ClickHouse.

Options:
  --schema TEXT     Schema to full etl.
  --renew           Etl after try to drop the target tables.
  -t, --table TEXT  Tables to full etl.
  -h, --help        Show this message and exit.

Full etl from table test.test:

> synch --alias mysql_db etl --schema test --table test --table test2

Produce

Listen all MySQL binlog and produce to broker.

> synch --alias mysql_db produce

Consume

Consume message from broker and insert to ClickHouse,and you can skip error rows with --skip-error. And synch will do full etl at first when set auto_full_etl = true in config.

> synch --alias mysql_db consume -h

Usage: synch consume [OPTIONS]

  Consume from broker and insert into ClickHouse.

Options:
  --schema TEXT       Schema to consume.  [required]
  --skip-error        Skip error rows.
  --last-msg-id TEXT  Redis stream last msg id or kafka msg offset, depend on
                      broker_type in config.

  -h, --help          Show this message and exit.

Consume schema test and insert into ClickHouse:

> synch --alias mysql_db consume --schema test

Monitor

Set true to core.monitoring, which will create database synch in ClickHouse automatically and insert monitoring data.

Table struct:

create table if not exists synch.log
(
    alias String,
    schema String,
    table String,
    num        int,
    type       int, -- 1:producer, 2:consumer
    created_at DateTime
)
    engine = MergeTree partition by toYYYYMM
(
    created_at
) order by created_at;

ClickHouse Table Engine

Now synch support MergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, ReplacingMergeTree.

Use docker-compose(recommended)

Redis Broker, lightweight and for low concurrency
version: "3"
services:
  producer:
    depends_on:
      - redis
    image: long2ice/synch
    command: synch --alias mysql_db produce
    volumes:
      - ./synch.yaml:/synch/synch.yaml
  # one service consume on schema
  consumer.test:
    depends_on:
      - redis
    image: long2ice/synch
    command: synch --alias mysql_db consume --schema test
    volumes:
      - ./synch.yaml:/synch/synch.yaml
  redis:
    hostname: redis
    image: redis:latest
    volumes:
      - redis
volumes:
  redis:
Kafka Broker, for high concurrency
version: "3"
services:
  zookeeper:
    image: bitnami/zookeeper:3
    hostname: zookeeper
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - zookeeper:/bitnami
  kafka:
    image: bitnami/kafka:2
    hostname: kafka
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - JMX_PORT=23456
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
    depends_on:
      - zookeeper
    volumes:
      - kafka:/bitnami
  kafka-manager:
    image: hlebalbau/kafka-manager
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zookeeper:2181"
      KAFKA_MANAGER_AUTH_ENABLED: "false"
    command: -Dpidfile.path=/dev/null
  producer:
    depends_on:
      - redis
      - kafka
      - zookeeper
    image: long2ice/synch
    command: synch --alias mysql_db produce
    volumes:
      - ./synch.yaml:/synch/synch.yaml
  # one service consume on schema
  consumer.test:
    depends_on:
      - redis
      - kafka
      - zookeeper
    image: long2ice/synch
    command: synch --alias mysql_db consume --schema test
    volumes:
      - ./synch.yaml:/synch/synch.yaml
  redis:
    hostname: redis
    image: redis:latest
    volumes:
      - redis:/data
volumes:
  redis:
  kafka:
  zookeeper:

Important

  • You need always keep a primary key or unique key without null or composite primary key.
  • DDL sync not support postgres.
  • Postgres sync is not fully test, be careful use it in production.

ThanksTo

Powerful Python IDE Pycharm from Jetbrains.

jetbrains

License

This project is licensed under the Apache-2.0 License.

synch's People

Contributors

holadepo avatar long2ice avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

synch's Issues

mysql decimal数据类型不支持

使用consume时出错,提示
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/service.py", line 100, in write_column
column.write_data(items, buf)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 77, in write_data
self._write_data(items, buf)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 80, in _write_data
prepared = self.prepare_items(items)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 61, in prepare_items
check_item_type(x)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 37, in check_item_type
raise exceptions.ColumnTypeMismatchException(value)
clickhouse_driver.columns.exceptions.ColumnTypeMismatchException: 0.00

During handling of the above exception, another exception occurred:

mysql to clickhouse

Synch installation failed, do you have detailed steps?> synch --alias mysql_db etl-h: no synch command found

pip install error

CentOS7 执行 pip install synch 异常(已安装 python 3.7 和 pip3)

WARNING: Discarding https://files.pythonhosted.org/packages/2d/df/5440ee86bbb248325cdd6e1fc9cbbba365018a0d2d57f673610e020e6b1d/mysqlclient-1.3.2.tar.gz#sha256=e44025830501b9f70f8c2fe8eeff66f0df2df198802f7295801dac199b8236ef (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
  Downloading mysqlclient-1.3.1.tar.gz (76 kB)
     |████████████████████████████████| 76 kB 299 kB/s 
    ERROR: Command errored out with exit status 1:
     command: /usr/local/python3/bin/python3.7 -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup.py'"'"'; __file__='"'"'/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-lyq28ain
         cwd: /tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/
    Complete output (10 lines):
    /bin/sh: mysql_config: 未找到命令
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup.py", line 17, in <module>
        metadata, options = get_config()
      File "/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup_posix.py", line 47, in get_config
        libs = mysql_config("libs_r")
      File "/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup_posix.py", line 29, in mysql_config
        raise EnvironmentError("%s not found" % (mysql_config.path,))
    OSError: mysql_config not found
    ----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/6b/ba/4729d99e85a0a35bb46d55500570de05b4af10431cef174b6da9f58a0e50/mysqlclient-1.3.1.tar.gz#sha256=3549e8a61f10c8cd8eac6581d3f44d0594f535fb7b29e6090db3a0bc547b25ad (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
  Downloading mysqlclient-1.3.0.tar.gz (76 kB)
     |████████████████████████████████| 76 kB 336 kB/s 
    ERROR: Command errored out with exit status 1:
     command: /usr/local/python3/bin/python3.7 -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup.py'"'"'; __file__='"'"'/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-7v_13f8p
         cwd: /tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/
    Complete output (10 lines):
    /bin/sh: mysql_config: 未找到命令
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup.py", line 17, in <module>
        metadata, options = get_config()
      File "/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup_posix.py", line 47, in get_config
        libs = mysql_config("libs_r")
      File "/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup_posix.py", line 29, in mysql_config
        raise EnvironmentError("%s not found" % (mysql_config.path,))
    OSError: mysql_config not found
    ----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/6a/91/bdfe808fb5dc99a5f65833b370818161b77ef6d1e19b488e4c146ab615aa/mysqlclient-1.3.0.tar.gz#sha256=06eb5664e3738b283ea2262ee60ed83192e898f019cc7ff251f4d05a564ab3b7 (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
Collecting synch
  Downloading synch-0.6.7-py3-none-any.whl (38 kB)
Collecting sqlparse
  Downloading sqlparse-0.4.1-py3-none-any.whl (42 kB)
     |████████████████████████████████| 42 kB 392 kB/s 
Collecting synch
  Downloading synch-0.6.6-py3-none-any.whl (37 kB)
  Downloading synch-0.6.1-py3-none-any.whl (36 kB)
Collecting pydantic
  Downloading pydantic-1.8.2-cp37-cp37m-manylinux2014_x86_64.whl (10.1 MB)
     |████████████████████████████████| 10.1 MB 740 kB/s 
Collecting synch
  Downloading synch-0.6.0-py3-none-any.whl (35 kB)
ERROR: Cannot install synch==0.6.0, synch==0.6.1, synch==0.6.6, synch==0.6.7 and synch==0.7.1 because these package versions have conflicting dependencies.

The conflict is caused by:
    synch 0.7.1 depends on mysqlclient
    synch 0.6.7 depends on mysqlclient
    synch 0.6.6 depends on mysqlclient
    synch 0.6.1 depends on mysqlclient
    synch 0.6.0 depends on mysqlclient

To fix this you could try to:
1. loosen the range of package versions you've specified
2. remove package versions to allow pip attempt to solve the dependency conflict

ERROR: ResolutionImpossible: for help visit https://pip.pypa.io/en/latest/user_guide/#fixing-conflicting-dependencies

AttributeError: 'NoneType' object has no attribute 'get'

Centos 7.3 ClickHouse 21.3.5.42 最新的synch 报如下错误,不知是什么原因?

[root@webserver synch]# synch --alias mysql_db etl --schema fmmp --tables T_AGENCY
Traceback (most recent call last):
File "/usr/local/python/bin/synch", line 8, in
sys.exit(cli())
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1137, in call
return self.main(*args, **kwargs)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1062, in main
rv = self.invoke(ctx)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1665, in invoke
super().invoke(ctx)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 763, in invoke
return __callback(*args, **kwargs)
File "/usr/local/python/lib/python3.9/site-packages/click/decorators.py", line 26, in new_func
return f(get_current_context(), *args, **kwargs)
File "/usr/local/python/lib/python3.9/site-packages/synch/cli.py", line 36, in cli
init(config)
File "/usr/local/python/lib/python3.9/site-packages/synch/factory.py", line 161, in init
dsn = Settings.get("sentry", "dsn")
File "/usr/local/python/lib/python3.9/site-packages/synch/settings.py", line 105, in get
c = c.get(arg)
AttributeError: 'NoneType' object has no attribute 'get'

日志输出太频繁

2020-12-11 19:25:11 - synch.replication.continuous:65 - INFO - Block 1 seconds timeout, insert current 0 events

config配置寻求帮助

你好,看到你的作品真的很赞!
小白初学clickhouse,请问再不使用kafka和redis情况下,只是想把mysql表同步到clickhouse,config配置文件怎么配呢?感觉config配置写的文档不是很详细。请求帮助,谢谢!

kafka的问题

请教一下,目前遇到了这么一种情况:
kafka出错导致索引文件和日志文件时间戳不一致(可能是重启或zookeeper出问题或内存不足等原因)
producer.send正常发送,但是消费者无法消费。

网上搜索,大都是说删除索引文件,重启kafka。按照这种方式做了以后会导致消息(mysql数据)丢失,因为代码中producer.send正常(错误好像不影响send,暂未复现该情况),redis中的binlog位置会更新。

怎么才能让producer感知到kafka(容器单机部署非集群)出问题了,使其发送消息失败且不更新redis中的binlog的位置。
我试着将producer改成同步,但是感觉好像没关系。

报错信息
ARN [Log partition=mysql2ch-4, dir=/var/lib/kafka/data] Found a corrupted index file corresponding to log file /var/lib/kafka/data/mysql2ch-4/00000000000000000000.log due to Corrupt time index found, time index file (/var/lib/kafka/data/mysql2ch-4/00000000000000000000.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1587959542575}, recovering segment and rebuilding index files... (kafka.log.Log)

clipboard
clipboard1

我处理的方式是 删除每个分区中的.timeindex和.index文件,重启kafka。
结果导致了数据丢失,因为redis更新了位置

pymysql.util 依赖问题

File "/opt/app/python3/clickhouse_synch/lib/python3.9/site-packages/pymysqlreplication/binlogstream.py", line 9, in
from pymysql.util import int2byte
ModuleNotFoundError: No module named 'pymysql.util

Redis增量同步方式报错。

接受到的消息里有数字类型的。encode失败?

"/usr/local/python3/lib/python3.7/site-packages/synch/replication/continuous.py" 171L, 6023C 1
return f(get_current_context(), *args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/synch/cli.py", line 82, in consume
alias, schema, tables_pk, table_dict, last_msg_id, skip_error,
File "/usr/local/python3/lib/python3.7/site-packages/synch/replication/continuous.py", line 130, in continuous_etl
f"insert event error,error: {e}", exc_info=True, stack_info=True
Message: "insert event error,error: 'int' object has no attribute 'encode'"

如下是消息
2020-11-24 20:41:53 - synch.reader.mysql:126 - DEBUG - send to queue success: key:ipark,event:{'table': 'indicators_statistical', 'schema': 'ipark', 'action': 'insert', 'values': {'id': 467, 'tenant_id': '1', 'indicators_type': '12', 'unique_key': '50189.1154933992', 'indicators_number': 1, 'indicators_date': datetime.datetime(2020, 11, 24, 20, 41, 52), 'create_time': datetime.datetime(2020, 11, 24, 20, 41, 52), 'modify_time': datetime.datetime(2020, 11, 24, 20, 41, 52)}, 'event_unixtime': 1606221713103948, 'action_seq': 2}

UnicodeDecodeError - 'utf-8' codec can't decode byte 0xff

Hi, i'm getting the fillowing error from the producer(using docker compose, with dev tag, as sentry breaks latest tag)
what might cause it?
i see producer is connected, and pulling data + sending to queue
some records have this error, is there a way to overcome it?

producer_1  | Traceback (most recent call last):
producer_1  |   File "/usr/local/bin/synch", line 5, in <module>
producer_1  |     cli()
producer_1  |   File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1137, in __call__
producer_1  |     return self.main(*args, **kwargs)
producer_1  |   File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1062, in main
producer_1  |     rv = self.invoke(ctx)
producer_1  |   File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1668, in invoke
producer_1  |     return _process_result(sub_ctx.command.invoke(sub_ctx))
producer_1  |   File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
producer_1  |     return ctx.invoke(self.callback, **ctx.params)
producer_1  |   File "/usr/local/lib/python3.9/site-packages/click/core.py", line 763, in invoke
producer_1  |     return __callback(*args, **kwargs)
producer_1  |   File "/usr/local/lib/python3.9/site-packages/click/decorators.py", line 26, in new_func
producer_1  |     return f(get_current_context(), *args, **kwargs)
producer_1  |   File "/synch/synch/cli.py", line 91, in produce
producer_1  |     reader.start_sync(broker)
producer_1  |   File "/synch/synch/reader/mysql.py", line 109, in start_sync
producer_1  |     for schema, table, event, file, pos in self._binlog_reading(
producer_1  |   File "/synch/synch/reader/mysql.py", line 178, in _binlog_reading
producer_1  |     for row in binlog_event.rows:
producer_1  |   File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 433, in rows
producer_1  |     self._fetch_rows()
producer_1  |   File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 428, in _fetch_rows
producer_1  |     self.__rows.append(self._fetch_one_row())
producer_1  |   File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 481, in _fetch_one_row
producer_1  |     row["values"] = self._read_column_data(self.columns_present_bitmap)
producer_1  |   File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 132, in _read_column_data
producer_1  |     values[name] = self.__read_string(1, column)
producer_1  |   File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 224, in __read_string
producer_1  |     string = string.decode(encoding)
producer_1  | UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

Decimal数据类型不支持,执行后报错

[root@k8smaster site-packages]# synch --config /etc/synch.ini etl --schema test --renew
2020-06-30 10:39:16 - synch.reader:19 - DEBUG - select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA='test' and TABLE_NAME='hexin_erp_product' and COLUMN_KEY='PRI'
2020-06-30 10:39:16 - synch.replication.clickhouse:27 - DEBUG - drop table test.hexin_erp_product
2020-06-30 10:39:16 - synch.reader:43 - INFO - drop table success:test.hexin_erp_product
2020-06-30 10:39:16 - synch.replication.clickhouse:27 - DEBUG - select count(*)from system.tables where database = 'test' and name = 'hexin_erp_product'
2020-06-30 10:39:16 - synch.replication.clickhouse:27 - DEBUG - CREATE TABLE test.hexin_erp_product ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('192.168.66.33:3306', 'test', 'hexin_erp_product', 'root', 'Hexin2007')
2020-06-30 10:39:17 - synch.replication.clickhouse:38 - DEBUG - select COLUMN_NAME, COLUMN_TYPE from information_schema.COLUMNS where TABLE_NAME = 'hexin_erp_product' and COLUMN_TYPE like '%decimal%'and TABLE_SCHEMA = 'test'
2020-06-30 10:39:17 - synch.replication.clickhouse:27 - DEBUG - alter table test.hexin_erp_product modify column purchase_minprice Decimal(10,2)
2020-06-30 10:39:17 - synch.replication.clickhouse:27 - DEBUG - alter table test.hexin_erp_product modify column purchase_maxprice Decimal(10,2)
2020-06-30 10:39:18 - synch.replication.clickhouse:27 - DEBUG - alter table test.hexin_erp_product modify column apply_price Decimal(10,2) #一直卡在这边有10分钟,最好报如下错误
Traceback (most recent call last):
File "/usr/local/bin/synch", line 11, in
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 66, in cli
parse_args.run(parse_args)
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 19, in run
args.func(args)
File "/usr/local/lib/python3.6/site-packages/synch/replication/etl.py", line 12, in make_etl
Global.reader.etl_full(Global.writer, schema, tables, renew)
File "/usr/local/lib/python3.6/site-packages/synch/reader/init.py", line 50, in etl_full
writer.fix_table_column_type(self, schema, table)
File "/usr/local/lib/python3.6/site-packages/synch/replication/clickhouse.py", line 45, in fix_table_column_type
self.execute(fix_sql)
File "/usr/local/lib/python3.6/site-packages/synch/replication/clickhouse.py", line 28, in execute
return self._client.execute(sql, params=params, *args, **kwargs)
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 224, in execute
columnar=columnar
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 347, in process_ordinary_query
columnar=columnar)
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 89, in receive_result
return result.get_result()
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/result.py", line 50, in get_result
for packet in self.packet_generator:
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 101, in packet_generator
packet = self.receive_packet()
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 115, in receive_packet
packet = self.connection.receive_packet()
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/connection.py", line 409, in receive_packet
packet.type = packet_type = read_varint(self.fin)
File "clickhouse_driver/varint.pyx", line 40, in clickhouse_driver.varint.read_varint
File "clickhouse_driver/bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.BufferedReader.read_one
File "clickhouse_driver/bufferedreader.pyx", line 188, in clickhouse_driver.bufferedreader.BufferedSocketReader.read_into_buffer
socket.timeout: timed out

这个表(hexin_erp_product)的字段如下:
CREATE TABLE hexin_erp_product (
id INT(11) NOT NULL DEFAULT '0' COMMENT '主键ID',
cate_id INT(11) NULL DEFAULT '0' COMMENT '分类ID',
supplier_id INT(11) NULL DEFAULT '0' COMMENT '供应商ID',
brand_id INT(11) NULL DEFAULT '0',
product_cname VARCHAR(128) NULL DEFAULT '' COMMENT '中文名称' COLLATE 'utf8mb4_general_ci',
product_ename VARCHAR(128) NULL DEFAULT '' COMMENT '英文名称' COLLATE 'utf8mb4_general_ci',
product_status TINYINT(4) NULL DEFAULT '1' COMMENT '商品状态(已审核通过的商品) 1正常 2停售 3清仓 4打折',
old_parent_sku VARCHAR(100) NULL DEFAULT '' COMMENT '旧的商品编码(兼容新旧数据)' COLLATE 'utf8_general_ci',
parent_sku VARCHAR(100) NULL DEFAULT '' COMMENT '商品sku' COLLATE 'utf8_general_ci',
purchase_day INT(11) NULL DEFAULT '0' COMMENT '采购到货天数',
purchaser VARCHAR(50) NULL DEFAULT '' COMMENT '采购人员' COLLATE 'utf8_general_ci',
purchase_minprice DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '最低采购价',
purchase_maxprice DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '最高采购价',
product_image LONGTEXT NULL COMMENT '主图' COLLATE 'utf8mb4_general_ci',
purchase_link VARCHAR(255) NULL DEFAULT '' COMMENT '采购链接' COLLATE 'utf8mb4_general_ci',
is_electric TINYINT(1) NULL DEFAULT '1' COMMENT '是否带电(1否2是)',
is_powder TINYINT(1) NULL DEFAULT '1' COMMENT '是否粉末(1否2是)',
is_liquid TINYINT(1) NULL DEFAULT '1' COMMENT '是否液体(1否2是)',
is_magnetic TINYINT(1) NULL DEFAULT '1' COMMENT '是否带磁(1否2是)',
is_tort TINYINT(1) NULL DEFAULT '1' COMMENT '是否侵权(1否2是)',
is_knowledge TINYINT(1) NULL DEFAULT '1' COMMENT '是否知识产权(1否2是)',
material VARCHAR(255) NULL DEFAULT '' COMMENT '材质' COLLATE 'utf8_general_ci',
unit VARCHAR(20) NULL DEFAULT '' COMMENT '单位' COLLATE 'utf8_general_ci',
season VARCHAR(10) NULL DEFAULT '' COMMENT '季节(春季、夏季、秋季、冬季)' COLLATE 'utf8_general_ci',
apply_cname VARCHAR(50) NULL DEFAULT NULL COMMENT '申报中文' COLLATE 'utf8_general_ci',
apply_ename VARCHAR(50) NULL DEFAULT '' COMMENT '申报英文' COLLATE 'utf8_general_ci',
apply_price DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '申报价值',
apply_code VARCHAR(50) NULL DEFAULT '' COMMENT '申报海关编码' COLLATE 'utf8_general_ci',
storage_id INT(11) NULL DEFAULT '0' COMMENT '默认发货仓库',
origin_country VARCHAR(50) NULL DEFAULT '' COMMENT '原产国二字代码' COLLATE 'utf8_general_ci',
origin_country_code VARCHAR(20) NULL DEFAULT '' COLLATE 'utf8_general_ci',
max_stock INT(11) NULL DEFAULT '0' COMMENT '库存上限',
min_stock INT(11) NULL DEFAULT '0' COMMENT '最小库存',
cost_price DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '成本价格',
out_box_single_weight DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱净重(单位:克)',
out_box_height DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱高(单位:cm)',
out_box_length DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱长(单位:cm)',
out_box_width DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱宽(单位:cm)',
out_box_gross_weight DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱毛重(单位:克)',
box_single_weight DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒净重(单位:克)',
box_height DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒高(单位:cm)',
box_length DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒长(单位:cm)',
box_width DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒宽(单位:cm)',
box_gross_weight DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒毛重(单位:克)',
checker VARCHAR(50) NULL DEFAULT '' COMMENT '审核人的uuid' COLLATE 'utf8mb4_general_ci',
check_status TINYINT(1) NULL DEFAULT '1' COMMENT '审核状态(1待审核 2审核通过 3审核不通过4待发布)',
check_time INT(11) NULL DEFAULT '0' COMMENT '审核时间',
check_info VARCHAR(255) NULL DEFAULT '' COMMENT '审核时间' COLLATE 'utf8mb4_general_ci',
developer VARCHAR(50) NULL DEFAULT '' COMMENT '开发者uuid 业绩归属人' COLLATE 'utf8_general_ci',
create_time INT(11) NULL DEFAULT '0',
modify_time INT(11) NULL DEFAULT '0' COMMENT '修改时间',
del_flag TINYINT(1) NULL DEFAULT '1' COMMENT '是否删除(1否2是)',
product_sub_images LONGTEXT NULL COMMENT '商品图片明细,多个以逗号隔开' COLLATE 'utf8_general_ci',
property_data TEXT NULL COMMENT '属性信息' COLLATE 'utf8_general_ci',
pid VARCHAR(50) NULL DEFAULT '' COMMENT '平台商品ID' COLLATE 'utf8_general_ci',
description TEXT NULL COMMENT '描述' COLLATE 'utf8_general_ci',
unsale_time INT(11) NULL DEFAULT '0' COMMENT '停售时间',
comment TEXT NULL COMMENT '商品链接标题' COLLATE 'utf8mb4_general_ci',
comment2 TEXT NULL COMMENT '授权标记' COLLATE 'utf8_general_ci',
tag_id VARCHAR(255) NOT NULL DEFAULT '' COMMENT '自定义标签id' COLLATE 'utf8_general_ci',
product_link VARCHAR(255) NOT NULL DEFAULT '' COMMENT '商品链接' COLLATE 'utf8_general_ci',
fabric_weight INT(11) NULL DEFAULT '0' COMMENT '面料克重',
size_img_str TEXT NULL COMMENT '尺码表图片路径' COLLATE 'utf8_general_ci',
is_model TINYINT(4) NULL DEFAULT '0' COMMENT '是否有模型图(0否1是)',
is_real TINYINT(4) NULL DEFAULT '0' COMMENT '是否有实物图(0否1是)',
model_from TINYINT(4) NULL DEFAULT '0' COMMENT '模型图来源(1和新自拍2工厂自拍3其他来源)',
real_from TINYINT(4) NULL DEFAULT '0' COMMENT '模型图来源(1和新自拍2工厂自拍3其他来源)',
is_order TINYINT(4) NULL DEFAULT '0' COMMENT '是否订做(0否1是)',
first_arrive_time INT(11) NULL DEFAULT '0' COMMENT '首单到货时间',
first_order_num VARCHAR(255) NULL DEFAULT '' COMMENT '首单订做数量' COLLATE 'utf8_general_ci',
start_order_num VARCHAR(255) NULL DEFAULT '0' COMMENT '起订量' COLLATE 'utf8_general_ci',
other_comment TEXT NULL COMMENT '其他备注' COLLATE 'utf8_general_ci',
is_paste TINYINT(4) NULL DEFAULT '1' COMMENT '是否膏体(1否2是)',
un_sale_reason TEXT NULL COMMENT '停售原因' COLLATE 'utf8_general_ci',
is_new TINYINT(1) NULL DEFAULT '0' COMMENT 'new(0 fou 1 shi )',
package_size VARCHAR(255) NULL DEFAULT '' COMMENT '产品包装袋尺寸' COLLATE 'utf8_general_ci',
publish_time INT(11) NOT NULL DEFAULT '0' COMMENT '发布时间',
tort_reason VARCHAR(255) NOT NULL COMMENT '侵权原因' COLLATE 'utf8_general_ci',
tort_time INT(11) NOT NULL DEFAULT '0' COMMENT '侵权时间',
size_adress VARCHAR(70) NOT NULL DEFAULT '' COMMENT '尺寸表名字' COLLATE 'utf8_general_ci',
edit_status TINYINT(4) NULL DEFAULT '1' COMMENT '编辑状态 1未编辑 2已编辑',
original_cate_id INT(11) NOT NULL DEFAULT '0' COMMENT '生成商品子sku的分类原始id(0-无做过移动)',
edit_time INT(11) NOT NULL DEFAULT '0' COMMENT '编辑商品的时间记录',
warehouse_entry_time INT(11) NOT NULL DEFAULT '0' COMMENT '入库时间',
tag_attribute INT(1) NOT NULL DEFAULT '1' COMMENT '1-普货;2-敏感货;3-液体',
image_tag VARCHAR(20) NOT NULL DEFAULT '' COMMENT '图片标签以","分隔开(可多选),1-工厂自拍实物图,2-和新自拍实物图,,3-工厂自拍模特图,4-和新自拍模特图,5.其他来源' COLLATE 'utf8_general_ci',
version INT(11) NOT NULL DEFAULT '0' COMMENT '版本号',
PRIMARY KEY (id)
)
COLLATE='latin1_swedish_ci'
ENGINE=InnoDB
;

synch配置文件如下:
[core]

when set True, will display sql information.

debug = True

current support redis and kafka

broker_type = redis

source database, current support mysql and postgres

source_db = mysql

these tables skip delete, multiple separated with comma, format with schema.table

skip_delete_tables =

these tables skip update, multiple separated with comma, format with schema.table

skip_update_tables =

skip delete or update dmls, multiple separated with comma, example: delete,update

skip_dmls =

how many num to submit,recommend set 20000 when production

insert_num = 1

how many seconds to submit,recommend set 60 when production

insert_interval = 1

auto do full etl at first when table not exists

auto_full_etl = True

[sentry]

sentry environment

environment = development

sentry dsn

dsn =

[redis]
host = 127.0.0.1
port = 6379
password =
db = 0
prefix = synch

enable redis sentinel

sentinel = False

redis sentinel hosts,multiple separated with comma

sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002
sentinel_master = master

stream max len, will delete redundant ones with FIFO

queue_max_len = 200000

[mysql]
server_id = 33

optional, read from show master status result if empty

init_binlog_file =mysql-bin.000992

optional, read from show master status result if empty

init_binlog_pos =66134615
host = 192.168.66.33
port = 3306
user = root
password = 123456

sync schema, format with mysql.schema, each schema for one section.

[mysql.test]

multiple separated with comma

tables = hexin_erp_product

kafka partition, need when broker_type=kafka

kafka_partition = 0

when source_db = postgres

[postgres]
host = postgres
port = 5432
user = postgres
password =

[postgres.postgres]
tables = test
kafka_partition = 0

[clickhouse]
host = 127.0.0.1
port = 9000
user = default
password =

need when broker_type=kafka

[kafka]

kafka servers,multiple separated with comma

servers = 127.0.0.1:9092
topic = synch

能帮忙看一下是什么原因吗?

运行mysql2ch produce 出错

按照您的最新配置文件,
mysql_server_id = 1
init_binlog_file = binlog.000088
init_binlog_pos = 2238

运行produce报错:
File "/usr/local/bin/mysql2ch", line 8, in
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/mysql2ch/cli.py", line 66, in cli
parse_args.run(parse_args)
File "/usr/local/lib/python3.6/site-packages/mysql2ch/cli.py", line 27, in run
args.func(args)
File "/usr/local/lib/python3.6/site-packages/mysql2ch/producer.py", line 43, in produce
skip_update_tables=settings.skip_update_tables,
File "/usr/local/lib/python3.6/site-packages/mysql2ch/reader.py", line 93, in binlog_reading
for binlog_event in stream:
File "/usr/local/lib/python3.6/site-packages/pymysqlreplication/binlogstream.py", line 430, in fetchone
pkt = self._stream_connection._read_packet()
File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 684, in _read_packet
packet.check_error()
File "/usr/local/lib/python3.6/site-packages/pymysql/protocol.py", line 220, in check_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.6/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.InternalError: (1236, 'Could not find first log file name in binary log index file')

请求帮助!

执行报错,帮忙看看.......

[[email protected] mysql2ch]# /usr/bin/mysql2ch
Traceback (most recent call last):
File "/usr/bin/mysql2ch", line 11, in
load_entry_point('mysql2ch==0.3.0', 'console_scripts', 'mysql2ch')()
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 489, in load_entry_point
return get_distribution(dist).load_entry_point(group, name)
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 2852, in load_entry_point
return ep.load()
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 2443, in load
return self.resolve()
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 2449, in resolve
module = import(self.module_name, fromlist=['name'], level=0)
File "build/bdist.linux-x86_64/egg/mysql2ch/init.py", line 4, in
File "build/bdist.linux-x86_64/egg/mysql2ch/settings.py", line 57, in
File "build/bdist.linux-x86_64/egg/mysql2ch/settings.py", line 19, in parse_schema_table
AttributeError: 'NoneType' object has no attribute 'split'

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa6 in position 2: invalid start byte (mysql8.0.20)

同样的synch,读同样的表 。mysql8.0.13 版本的 slave 就不报错 ,换成mysql 8.0.20的 slave 就报这个错
File "/usr/local/python383/lib/python3.8/site-packages/synch/reader/mysql.py", line 185, in _binlog_reading
for row in binlog_event.rows:
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 433, in rows
self._fetch_rows()
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 428, in _fetch_rows
self.__rows.append(self._fetch_one_row())
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 481, in _fetch_one_row
row["values"] = self._read_column_data(self.columns_present_bitmap)
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 132, in _read_column_data
values[name] = self.__read_string(1, column)
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 224, in __read_string
string = string.decode(encoding)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa6 in position 2: invalid start byte

你好,问一下通过synch可以做到不同数据库之间表的同步吗?

source database: hexin
source database: hexin_storage

target database:hexin_count

说明:
hexin,hexin_storage这两个源数据库中的表需要时时同步到clickhouse的hexin_count数据库

我在操作过程中报以下错误:
cat /etc/synch.ini
[mysql]
server_id = 33

optional, read from show master status result if empty

init_binlog_file =mysql-bin.001000

optional, read from show master status result if empty

init_binlog_pos =2930840
host = 192.168.66.33
port = 3306
user = root
password = 123456

sync schema, format with mysql.schema, each schema for one section.

[mysql.hexin]

multiple separated with comma

tables = hexin_erp_storage_out_detail,hexin_erp_storage_out_main,hexin_erp_order,hexin_erp_order_goods

kafka partition, need when broker_type=kafka

kafka_partition = 0

[root@k8smaster ~]# synch --config /etc/synch.ini etl --schema hexin_count --renew
Traceback (most recent call last):
File "/usr/local/bin/synch", line 11, in
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 66, in cli
parse_args.run(parse_args)
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 19, in run
args.func(args)
File "/usr/local/lib/python3.6/site-packages/synch/replication/etl.py", line 12, in make_etl
Global.reader.etl_full(Global.writer, schema, tables, renew)
File "/usr/local/lib/python3.6/site-packages/synch/reader/init.py", line 32, in etl_full
for table in tables:
TypeError: 'NoneType' object is not iterable

可以实现不同库中的表同步到同一个clickhouse的同一个数据库中吗?

docker运行mysql2ch-struct.error: 'I' format requires 0 <= number <= 4294967295

功能正常可用,运行时报错

Traceback (most recent call last):
File "/synch/synch/replication/continuous.py", line 126, in continuous_etl
writer.insert_events(schema, table, insert_events)
File "/synch/synch/writer/init.py", line 82, in insert_events
self.execute(insert_sql, list(map(lambda x: x.get("values"), insert_data)))
File "/synch/synch/writer/init.py", line 55, in execute
return self._client.execute(sql, params=params, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/client.py", line 214, in execute
rv = self.process_insert_query(
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/client.py", line 371, in process_insert_query
rv = self.send_data(sample_block, data,
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/client.py", line 407, in send_data
self.connection.send_data(block)
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/connection.py", line 509, in send_data
self.block_out.write(block)
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/streams/native.py", line 38, in write
write_column(self.context, col_name, col_type, items,
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/columns/service.py", line 112, in write_column
raise errors.TypeMismatchError(
clickhouse_driver.errors.TypeMismatchError: Code: 53. Type mismatch in VALUES section. Repeat query with types_check=True for detailed info. Column PUBLISH_DATE: 'I' format requires 0 <= number <= 4294967295
字段“PUBLISH_DATE”是datatime类型,大部分数据同步成功,应该是有个别数据报错了
请教:这是什么原因的报错?

同步有性能问题

我是从 mysql-clickhouse-replication看到mysql2ch的,如果是小规模业务使用,勉强够用。
为什么这么说呢,因为clickhouse的优势是批量写入和OLAP,在数据的修复和删除方面,性能非常的差,在这个项目中,只是从功能上满足的同步的需求,但性能方面有较大的问题。
基于MergeTree引擎来做,如何做到大数据量下的数据实时同步,通过实践来看,MergeTree家族中VersionedCollapsingMergeTree是一个更适合同步场景的方案。官方链接:https://clickhouse.tech/docs/zh/engines/table-engines/mergetree-family/versionedcollapsingmergetree/#versionedcollapsingmergetree

版本折叠合并树
这个引擎:

允许快速写入不断变化的对象状态。
删除后台中的旧对象状态。 这显着降低了存储体积。
请参阅部分 崩溃 有关详细信息。

引擎继承自 MergeTree 并将折叠行的逻辑添加到合并数据部分的算法中。 VersionedCollapsingMergeTree 用于相同的目的 折叠树 但使用不同的折叠算法,允许以多个线程的任何顺序插入数据。 特别是, Version 列有助于正确折叠行,即使它们以错误的顺序插入。

非常适合数据库同步,特别是mysql数据在不断变化的同步,因此基于这个引擎实现了我们内部有mysql--->clickhouse同步方案。
此方案是将DML全部转换成insert语句,依靠clickhouse的批量写入优势,再结合VersionedCollapsingMergeTree引擎的特点,基于version进行数据版本控制,最终形成升级的方案。

从原理上来看,实现思路是一致的,如:
1、kafka、redis支持;
2、多线程支持;
3、数据库、数据表的自定义配置;
4、基于binlog;
5、ETL外部动态加载方法;
6、高实时,TPS相较于此方案,有数量级的提升,特别是数据库高并发场景下;

缺点:查询会稍微麻烦一点,需要进行查询的聚合,官方给出的sql如下:
SELECT
UserID,
sum(PageViews * Sign) AS PageViews,
sum(Duration * Sign) AS Duration,
Version
FROM UAct
GROUP BY UserID, Version
HAVING sum(Sign) > 0

目前此方案应用于地铁领域的实施数据交易处理,用于大数据报表,主要处理金融业务,目前使用良好,欢迎交流!

运行mysql2ch etl -h没有反应

[root@localhost bin]# mysql2ch etl -h
usage: mysql2ch etl [-h] --schema SCHEMA [--tables TABLES] [--renew]

optional arguments:
-h, --help show this help message and exit
--schema SCHEMA Schema to full etl.
--tables TABLES Tables to full etl,multiple tables split with comma,default
read from environment.
--renew Etl after try to drop the target tables.

配置文件:

[core]
mysql_server_id = 1

redis stream max len, will delete redundant ones with FIFO

queue_max_len = 200000
init_binlog_file = binlog.000024
init_binlog_pos = 252563

these tables skip delete,multiple separated with comma

skip_delete_tables =

these tables skip update,multiple separated with comma

skip_update_tables =

skip delete or update dmls,multiple separated with comma

skip_dmls =

how many num to submit,recommend set 20000 when production

insert_num = 1

how many seconds to submit,recommend set 60 when production

insert_interval = 1

[sentry]

sentry environment

environment = development

sentry dsn

dsn = https://[email protected]/1

[redis]
host = 127.0.0.1
port = 6379
password =
db = 0
prefix = mysql2ch

enable redis sentinel

sentinel = false

redis sentinel hosts,multiple separated with comma

sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002
sentinel_master = master

[mysql]
host = 127.0.0.1
port = 3306
user = root
password =

sync schema test from mysql

[mysql.saas_im]

multiple separated with comma

tables = letter_record_back

[clickhouse]
host = 127.0.0.1
port = 9000
user = default
password =

clickhouse sql 错误

- synch.replication.clickhouse:57 - DEBUG - alter table sheme.table delete where id in (xxxxx)

主键id是string
正确的sql应该是 alter table epsandbox.q_order delete where id in (‘xxxxx’)

多表复制,配置文件应该怎么配置

作者,你好,请问下 同步多张表的yaml或者 conf配置文件 应该怎么写 我用 逗号分隔,程序直接连两张表都取出来了。
2021-08-16 15:45:29 - synch.replication.etl:84 - WARNING - No pk found in clickhouse.order_goods,invoice_order, skip

synch used for real time crud updates?

I noticed in the synch.yaml

insert_num: 1 # how many num to submit,recommend set 20000 when production
insert_interval: 1 # how many seconds to submit,recommend set 60 when production

Why can't production, I set both as 1? If i set the interval to 60, it won't be real time or near real time updates?

Same to insert_num, it seems in production after hitting 20,000 records will get the updates.

If I maintain 1 for both settings in production, will the program crashed?

FileNotFoundError: [Errno 2] No such file or directory: 'synch.yaml'

运行:
synch --alias mysql_db etl --renew
报错信息:
Traceback (most recent call last):
File "f:\python38\lib\runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "f:\python38\lib\runpy.py", line 87, in run_code
exec(code, run_globals)
File "F:\Python38\Scripts\synch.exe_main
.py", line 7, in
File "f:\python38\lib\site-packages\click\core.py", line 829, in call
return self.main(*args, **kwargs)
File "f:\python38\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "f:\python38\lib\site-packages\click\core.py", line 1256, in invoke
Command.invoke(self, ctx)
File "f:\python38\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "f:\python38\lib\site-packages\click\core.py", line 610, in invoke
return callback(*args, **kwargs)
File "f:\python38\lib\site-packages\click\decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "f:\python38\lib\site-packages\synch\cli.py", line 36, in cli
init(config)
File "f:\python38\lib\site-packages\synch\factory.py", line 159, in init
Settings.init(config_file)
File "f:\python38\lib\site-packages\synch\settings.py", line 12, in init
with open(file_path, "r") as f:
FileNotFoundError: [Errno 2] No such file or directory: './synch.yaml'

mysql添加新列导致数据同步出错

mysql添加新列并在clickhouse中也添加该列后,依然报错。

因为拼接的插入clickhouse的sql语句是不带列名的,而insert数据有部分没有新列,有部分有,比如insert into tb values(...),(...,‘新列’)

是不是可以考虑获取clickhouse列名来进行添加,或者有其他更好的办法吗

Mongo sync to clickhouse

这个框架可以加入Mongo sync to Clickhouse的支持吗?直接加个Mongo Reader就可以了吧?

consumer.py中is_insert的判断问题

如果consumer出问题暂停了,producer正常,这样就会积压很多events。
当consumer恢复正常后,因为时间的问题,会导致event一条一条的执行。
image

正常情况下应该是下面这种
image

.env配置
INSERT_NUMS=20000
INSERT_INTERVAL=60

INSERT_INTERVAL 没有用

how many num to submit

INSERT_NUMS=20000

how many seconds to submit

INSERT_INTERVAL=60

即使过了60s,只要INSERT_NUMS没有达到设定的值,就不会提交!

支持表通配符配置

是否支持表通配符配置

source_dbs:
  - db_type: mysql
    databases:
      - database: synch_mysql_test
        auto_create: true
        tables:
          - table: log_*

synch -c synch.yaml not working

When I ran this

[eric@localhost mysetup]$ synch -c synch.yaml 
Traceback (most recent call last):
  File "/home/wenching/.local/bin/synch", line 11, in <module>
    sys.exit(cli())
  File "/home/wenching/.local/lib/python3.6/site-packages/synch/cli.py", line 78, in cli
    parse_args.run(parse_args)
AttributeError: 'Namespace' object has no attribute 'run'

If I look thru the codes, I believe it's expecting the command line parameters

parse_args = parser.parse_args()
parse_args.run(parse_args)

Any help? Thanks.

使用kafka方式增量订阅报错

(sync2) [root@hadoop1 sync2]# synch -c synch.yaml --alias mysql_db produce
Traceback (most recent call last):
File "/data/sync2/sync2/bin/synch", line 8, in
sys.exit(cli())
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 829, in call
return self.main(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/cli.py", line 91, in produce
broker = get_broker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/factory.py", line 87, in get_broker
b = KafkaBroker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/broker/kafka.py", line 27, in init
key_serializer=lambda x: x.encode(),
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/producer/kafka.py", line 383, in init
**self.config)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 244, in init
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
(sync2) [root@hadoop1 sync2]#

删除过滤bug

writer.py 第208->212行 应改为
if 'delete' not in skip_dmls_all and skip_dml_table_name not in skip_delete_tb_name:

否则正常添加或更新可能会跟删除一起过滤掉

运行时是否会有字符集问题?

命令:synch --alias mysql_db etl --schema flink_t --table sych --renew

错误:ImportError: cannot import name 'charset_to_encoding' from 'pymysql.charset' (/usr/local/python3/lib/python3.7/site-packages/pymysql/charset.py)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.