Giter Site home page Giter Site logo

bryzgaloff / airflow-clickhouse-plugin Goto Github PK

View Code? Open in Web Editor NEW
123.0 123.0 35.0 247 KB

The most popular ClickHouse plugin for Airflow. 🔝 Top-1% downloads on PyPI: https://pypi.org/project/airflow-clickhouse-plugin! Based on mymarilyn/clickhouse-driver.

License: MIT License

Python 100.00%
airflow clickhouse python python3

airflow-clickhouse-plugin's People

Contributors

alexander-chashnikov avatar bobelev avatar bryzgaloff avatar bryzgaloff-whisk avatar corsettis avatar d-ganchar avatar epikhinm avatar gameram avatar gkarg avatar glader avatar maximtar avatar ne1r0n avatar r3b-fish avatar saimon46 avatar viktortnk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-clickhouse-plugin's Issues

Object of type datetime is not json serializable

When trying to run a select query on a clickhouse table with a datetime variable it fails as below

[2022-12-12, 07:32:02 UTC] {xcom.py:600} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
[2022-12-12, 07:32:02 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 2385, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 212, in set
    map_index=map_index,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 597, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable

How can i handle datetime variable separately?

Issue: Integration tests failing due to connection...

Description:
When attempting to run integration tests using the provided command, the tests fail with a connection error. The integration tests utilize ClickHouse for database operations, and the error indicates a failure to connect to localhost:9000. However, it is observed that accessing http://localhost:9000/ directly works as expected, indicating that the port is accessible.

Archive.zip

Steps to Reproduce:

  • extract content of Archive.zip to a folder with name FOLDERNAME
  • open terminal in FOLDERNAME
  • run "docker compose build"
  • run "docker compose up"
  • open docker exec of FOLDERNAME-airflow-webserver-1
  • run "python3 -m unittest discover -t tests -s unit" => works
  • run "docker run -p 9000:9000 --ulimit nofile=262144:262144 -it clickhouse/clickhouse-server" in normal terminal
  • run "PYTHONPATH=src AIRFLOW_CONN_CLICKHOUSE_DEFAULT=clickhouse://localhost python3 -m unittest discover -t tests -s integration" in docker exec

Actual Behavior:
Integration tests fail with the following error message:
[2024-02-09T18:20:29.196+0000] {base.py:83} INFO - Using connection ID 'clickhouse_default' for task execution.
[2024-02-09T18:20:29.197+0000] {clickhouse.py:79} INFO - SELECT sum(value) * %(multiplier)s AS output FROM ext with {'multiplier': 2}
[2024-02-09T18:20:29.201+0000] {connection.py:407} WARNING - Failed to connect to localhost:9000
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/clickhouse_driver/connection.py", line 395, in connect
return self._init_connection(host, port)
File "/home/airflow/.local/lib/python3.8/site-packages/clickhouse_driver/connection.py", line 325, in _init_connection
self.socket = self._create_socket(host, port)
File "/home/airflow/.local/lib/python3.8/site-packages/clickhouse_driver/connection.py", line 297, in _create_socket
raise err
File "/home/airflow/.local/lib/python3.8/site-packages/clickhouse_driver/connection.py", line 288, in _create_socket
sock.connect(sa)
OSError: [Errno 99] Cannot assign requested address

Environment:
Operating System: macOS Sonoma Version 14.3.1
Python Version: 3.8
ClickHouse Version: latest
Airflow Version: 2.8.1
This adjustment will ensure consistency and reliability in running integration tests against ClickHouse databases.

Error with JDBC connection

Hi,

I'm using Airflow v2.1.3
I am trying to use this plugin to connect to my clickhouse(hosted on a docker) via a JDBC connection. i am using clickhouse4j for that. I have defined a JDBC connection in Airflow UI but when I try to run my DAG, it gives this error:

[2021-09-23 16:02:33,871] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow_clickhouse_plugin/operators/clickhouse_operator.py", line 33, in execute
return hook.run(self._sql, self._parameters)
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow_clickhouse_plugin/hooks/clickhouse_hook.py", line 62, in run
with disconnecting(self.get_conn()) as conn:
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow_clickhouse_plugin/hooks/clickhouse_hook.py", line 33, in get_conn
return Client(conn.host or 'localhost', **connection_kwargs)
File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/clickhouse_driver/client.py", line 87, in init
self.connection = Connection(*args, **kwargs)
TypeError: init() got an unexpected keyword argument 'extra__jdbc__drv_clsname'

Here is my task code in DAG :

to_clickhouse_task = ClickHouseOperator(
task_id='to_clickhouse_task',
database='default',
sql=(
'''
... some sql command ...
'''
),
clickhouse_conn_id='clickhouse-jdbc',
dag=dag
)

Also, I am able to connect to the clickhouse docker image on my host machine using CURL.

DAG object hass no attribute update_relative

Iv installed clickhouse plugin and am trying to run the example mysql to clickhouse

The DAG is giving error

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskmixin.py", line 238, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'

Im on python 3.7 and airflow 2.4.3

Can anyone help me figure out what the issue is?
Thanks in advance!

support for airflow 2.6.0

airflow 2.6.0 has just released and unfortunately the clickhouse plugin cannot be installed on it. It would be interesting to add support for airflow 2.6.0 then. from my understanding, since nothing was removed from 2.5.x to 2.6.x, it should be required to just loose up the requirements of the package installation to allow airflow 2.6.x and it should work out of the box no?

airflow-clickhouse-plugin breaks airflow

Apache Airflow version
2.3.4 (latest released)

What happened
I've been triying run airflow+clickhouse-plugin with docker.
Previous to install the plugin and their dependencies the stack (scheduler, worker, webserver, etc) works well.
But after import the plugin the stack doesn't init any more.
I've tried few ways to make it works:

  • adding the env var _PIP_ADDITIONAL_REQUIREMENTS to the .env file and run docker-compose
  • re-build the docker image from apache/airflow:latest while installing all the dependencies into it
  • re-build the docker image from apache/airflow:latest-python3.8 while installing all the dependencies into it
  • re-build the docker image from apache/airflow:2.2.2 while installing all the dependencies into it
  • re-build the docker image from apache/airflow:2.2.2-python3.8 while installing all the dependencies into it

In every case i've downloaded the corresponding docker-compose.yaml version.
None of these aproaches has worked yet.

How to reproduce
I will describe the procedure only for latest version of airflow but it could be extrapolated to any of the combinations of versions i've described earlier

  1. Download docker-compose file for the latest version of Airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.4/docker-compose.yaml'
  1. As documentation mentions create the default folders and add the correct user to .env file
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
  1. Run airflow-init
docker-compose up airflow-init
  1. Run airflow and test if everything its okey
docker-compose up

>>>> UNTIL HERE EVERYTHING IS WORKING FINE AND AS SPECTED TO BE <<<<

  1. Get down the stack
docker-compose down --volumes --remove-orphans
  1. To be shure everything is cleaned up here you could remove every image used by the stack with docker image rm command
  2. Create a Dockerfile which extends the original Airflow image, with the following content
FROM apache/airflow:latest

USER root

RUN apt-get update

USER airflow

RUN pip install -U pip

RUN pip install -U --no-cache-dir airflow-clickhouse-plugin[pandas]
  1. Build the new image as my-airflow:0.1
docker build . -t my-airflow:0.1
  1. Change the version var within the .env file, the file ends as follows
AIRFLOW_UID=1001
AIRFLOW_IMAGE_NAME=my-airflow:0.1
  1. Run airflow-init
docker-compose up airflow-init
  1. Run docker-compose for the whole stack
docker-compsoe up
  1. CHECK the logs
...
airflow-webserver_1  | [2022-09-09 09:06:39 +0000] [1537] [INFO] Worker exiting (pid: 1537)
airflow-webserver_1  | usage: __main__.py [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...]
airflow-webserver_1  |    or: __main__.py --help [cmd1 cmd2 ...]
airflow-webserver_1  |    or: __main__.py --help-commands
airflow-webserver_1  |    or: __main__.py cmd --help
airflow-webserver_1  | 
airflow-webserver_1  | error: option --workers not recognized
airflow-webserver_1  | [2022-09-09 09:06:40 +0000] [1540] [INFO] Booting worker with pid: 1540
airflow-scheduler_1  | usage: airflow [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...]
airflow-scheduler_1  |    or: airflow --help [cmd1 cmd2 ...]
airflow-scheduler_1  |    or: airflow --help-commands
airflow-scheduler_1  |    or: airflow cmd --help
airflow-scheduler_1  | 
airflow-scheduler_1  | error: invalid command 'scheduler'
....

Operating System
Ubuntu 18.04.4 LTS

Docker version

Client:
 Version:           20.10.7
 API version:       1.41
 Go version:        go1.13.8
 Git commit:        20.10.7-0ubuntu5~18.04.3
 Built:             Mon Nov  1 01:04:14 2021
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server:
 Engine:
  Version:          20.10.7
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.8
  Git commit:       20.10.7-0ubuntu5~18.04.3
  Built:            Fri Oct 22 00:57:37 2021
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.5.5-0ubuntu3~18.04.2
  GitCommit:        
 runc:
  Version:          1.0.0~rc95-0ubuntu1~18.04.2
  GitCommit:        
 docker-init:
  Version:          0.19.0
  GitCommit:        

Support to run sql files

Hello!
Will be great to have ability to run queries from .sql files like MySQLOperator:

mysql_task = MySqlOperator(
    task_id='create_table_mysql_external_file',
    sql='/scripts/drop_table.sql',
    dag=dag,
)

Allow configuring clickhouse-driver `server_side_params`

After upgrading the clickhouse-airflow-plugin to 1.3.0 the clickhouse-driver got updated to 0.2.7 as well.

That version of the clickhouse-driver includes a commit that disables Server Side Parameter Substitution by default, breaking all DAGs that make use of this functionality. See mymarilyn/clickhouse-driver@24e8b35#diff-ea3e168b7a6e0e795b66587636dd954c124334f970be899db78a3b949fbfead5

Could you add a parameter to the ClickHouse operator that allows either to pass in arbitrary settings to the underlying clickhouse-driver Client, or to specifically allow overriding the setting for server_side_params?

I am willing to prepare a PR for this as well if you prefer. Thanks in advance!

Bridge version for airflow 1.10.15

While versions of airflow-clickhouse-plugin that support airflow 2 and airflow 1.10.6 both exist, there ain't a version that could help with airflow migration, via the 1.10.15 path.

It would've been very useful to have a version (say, 0.5.8?), which would still work on airflow 1.10.15, yet would get rid of this warning:

.../site-packages/airflow_clickhouse_plugin/__init__.py:7: FutureWarning: Registering operators or sensors in plugins is deprecated -- these should be treated like 'plain' python modules, and imported normally in DAGs.

Thanks in advance.

Contributing a ClickHouse Provider to Airflow

Hi everyone,

I'm starting to working on this task apache/airflow#39140 in Airflow repository. The task involves creating a ClickHouse provider similar to the clickhouse-plugin.

First, I'd like to understand:

Is it feasible to develop a ClickHouse provider for Airflow?
Why wasn't the clickhouse-plugin chosen as the default provider?

Second, while I'm primarily interested in creating a provider for my own (SSHClickhouseOperator) functionalities, I believe it could be beneficial for the entire ClickHouse community. I'd be happy to collaborate on a solution that integrates with the existing work done by u guys.

Please let me know if you're open to collaboration on this project.

Add template_searchpath support

Hi!
The problem is that ClickhouseOperator doesn't support template_searchpath dag option which allows to pass filename instead of full query

Instead of:

ClickHouseOperator(
    task_id='task_1',
    clickhouse_conn_id='clickhouse_default',
    sql='SELECT * from db.some_table;' # <-- raw SQL query
)

We can write:

ClickHouseOperator(
    task_id='task_1',
    clickhouse_conn_id='clickhouse_default',
    sql='select_query.sql' # <-- filename of jinja template which is located in template_searchpath location
)

One more example how it works

Support for airflow 2.5.0?

Hi,

Airflow 2.5.0 was released earlier this month.
At the moment we can't use it because of the dependencies in this project.
Are there plans to test it with 2.5.0 and update the package?

Thanks!

ClickHouseOperator multi-statements support

Hi, I've just realised that I cannot run sql files with multi-statements using ClickHouseOperator
DB::Exception: Syntax error (Multi-statements are not allowed)

For instance,
drop table if exists table_name; create table table_name as ...

Are there any plans or thoughts about solving this issue? PostgresOperator does support this functionality

passing additional parameters to ClickHouse connection

It would be great to be able to specify additional parameters for ClickHouse driver, for example, send_receive_timeout, client_name, etc..

As a workaround, I hardcoded the necessary parameters in clickhouse_hook.py.

Do you have any idea how to implement this by correct pythonyc way?

doesn't see the library

Unable to import 'airflow.operators.clickhouse_operator'
I did everything according to the instructions, tell me what to do

option to disable sql query logging

Is there a way to disable the logging of executed sql-queries in ClickHouseOperator?
Sometimes I face the problem when the logs fill up disk space when large sql-queries are executed pretty often.

Define a special connection type for ClickHouse

Hi!
I am on this issue for a few days but am unable to complete this task.
I am using apache airflow on docker.
I have installed the click house plugins in my container.
Now the issue that I am having is I am unable to see the connection type for the click house.
I have happily connected the MySql to the airflow, but I am unable to connect the click house to the airflow.
I have also attached some SS for a better understanding of my problem statement before u people.
image_2022_01_05T09_56_33_525Z
image_2022_01_05T10_00_16_606Z
image_2022_01_05T09_56_10_887Z

Add description and topics to the repository

Hi @viktortnk

Please add some topics and the description to the repository. I do not have enough permissions. I would suggest python, airflow, clickhouse as the topics and Airflow ClickHouse Plugin based on clickhouse-driver as the description.

Setup GitHub workflow for publishing to PyPI

High-level overview

  1. Workflow is triggered when a new tag is created. The next steps:
  2. Build package.
  3. Upload to test PyPI (test allows to overwrite packages, public PyPI does not allow!).
  4. Install package from test PyPI.
  5. Run tests (remove airflow_clickhouse_plugin local package in advance to rely on installed version).
  6. Upload to public PyPI.
  7. Install from public PyPI.
  8. Run tests (also removing local package).

Details

How to upload to PyPI: https://packaging.python.org/tutorials/packaging-projects/#uploading-your-project-to-pypi

Uploading to test PyPI

rm -rf __pycache__/ airflow_clickhouse_plugin.egg-info/ dist/ build/
python3 setup.py sdist bdist_wheel
twine upload --repository testpypi dist/*
# username: __token__
# token: <generate it for yourself>

Token should be stored in GitHub secrets. Please test your contribution on a separate package in test PyPI first, then I will inject my personal token for workflow to publish to "official" test airflow_clickhouse_plugin package.

Running tests for test PyPI

I usually run the in a container with ClickHouse: docker exec -it $(docker run --rm -d yandex/clickhouse-server) bash

Install Python (should install matrix of Python and plugin versions) and run tests:

VERSION=0.8.2
apt-get update \
  && apt-get install -y python3.8 python3-pip git \
  && git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git \
  && cd airflow-clickhouse-plugin \
  && rm -rf airflow_clickhouse_plugin \
  && python3.8 -m pip install -r requirements.txt \
  && python3.8 -m pip install \
    --index-url https://test.pypi.org/simple \
    --extra-index-url https://pypi.org/simple \
    airflow-clickhouse-plugin[pandas]==${VERSION} \
  && python3.8 -m unittest discover -s tests

Upload to public PyPI

twine upload dist/*

My credentials are stored in my ~/.pypirc. But we should use GitHub secrets for the workflow.

Test public PyPI

VERSION=0.8.2
apt-get update \
  && apt-get install -y python3.8 python3-pip git \
  && git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git \
  && cd airflow-clickhouse-plugin \
  && rm -rf airflow_clickhouse_plugin \
  && python3.8 -m pip install -r requirements.txt \
  && python3.8 -m pip install airflow-clickhouse-plugin[pandas]==${VERSION} \
  && python3.8 -m unittest discover -s tests

Expected outcome

As a result, I expect a workflow which does all of these steps, so that a simple release (with tag) creation triggers the workflow and a new version is published. If the full workflow looks too complex, we may have a partial implementation with PyPI and tests only. Though, I believe once test PyPI implementation is there, a public PyPI support should not be a big trouble :)

Жесткая зависимость от версий

Я хотел воспользоваться плагином, но у нас в системе clickhouse-driver==0.1.3 и apache-airflow==1.10.8, а в requirements написано
clickhouse-driver==0.1.2 apache-airflow==1.10.6
Поэтому установка через pip невозможна.
Я очень сомневаюсь, что различия в minor делают пакеты несовместимыми. Предлагаю писать
clickhouse-driver<0.2.0 apache-airflow<1.11.0

Add airflow 2.4.x support

Its tricky to upgrade airflow version tо 2.4 with current airflow-clickhouse-plugin. Pip install forces downgrade to airflow-2.3.4.

Install is broken

Maybe I missed something significant, but I cannot install package via pip. It's very confusing for me, because someone should have stumbled upon this before.

# pip install -U airflow-clickhouse-plugin
Collecting airflow-clickhouse-plugin
  Downloading airflow-clickhouse-plugin-0.5.4.tar.gz (6.4 kB)
    ERROR: Command errored out with exit status 1:
     command: /opt/conda/bin/python3.7 -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-fqcwa4i2/airflow-clickhouse-plugin/setup.py'"'"'; __file__='"'"'/tmp/pip-install-fqcwa4i2/airflow-clickhouse-plugin/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-install-fqcwa4i2/airflow-clickhouse-plugin/pip-egg-info
         cwd: /tmp/pip-install-fqcwa4i2/airflow-clickhouse-plugin/
    Complete output (5 lines):
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-install-fqcwa4i2/airflow-clickhouse-plugin/setup.py", line 16, in <module>
        with open(os.path.join(here, 'requirements.txt')) as requirements_file:
    FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pip-install-fqcwa4i2/airflow-clickhouse-plugin/requirements.txt'
    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

Any advice is welcome.

Automate versions extraction in PyPI publishing file

In the current PyPI publishing workflow file, versions of the plugin, Airflow, and Python are hardcoded in multiple places. This introduces a risk of version mismatches, when releasing new versions. For instance, this is evident in the following section of the file:

python-version: "3.12"
- name: Install airflow-clickhouse-plugin from TestPyPI
run: |
python -m pip install --upgrade pip
python -m pip install \
--index-url https://test.pypi.org/simple \
--extra-index-url https://pypi.org/simple \
airflow-clickhouse-plugin[common.sql]==1.3.0 \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.12.txt"

Proposed Solutions

To mitigate this risk, I propose implementing one or several of the following actions:

  1. Comprehensive Version Tracking. Identify and list in the comments all locations within the workflow file where versions are hardcoded to ensure none are missed during updates.
  2. Introduction of Workflow Variables. Introduce workflow variables to define the versions once and reference them throughout the file instead of hardcoding them repeatedly.
  3. Automation of Version Extraction. Extract versions automatically to minimize manual intervention:
  • Extract plugin version from pyproject.toml.
  • Derive Airflow's latest supported version from pip install.
  • Use the latest available Python version: python: "3.x", and refer to it via python --version.

These proposed changes pertain specifically to the PyPI publishing workflow and do not affect the unit/integration tests dedicated workflow.

I'm open to hearing your thoughts and suggestions on how best to address this issue. If you have any alternative ideas or references to existing GitHub projects, please feel free to share them in the comments below.

Bug in 0.10.1 in get_pandas_df

   def get_pandas_df(self, sql: str):
        import pandas as pd
        rows, columns_defs = self.run(sql, with_column_types=True)
        columns = [column_name for column_name, _ in columns_defs]
        return pd.DataFrame(rows, columns=columns)

I need to use the jinja templeting in this but get_pandas_df doesn't accept parameters which should be passed down to run function.

Airflow ClickHouse Plugin v1.2.0 is not available on PyPI

A publishing pipeline failed: https://github.com/bryzgaloff/airflow-clickhouse-plugin/actions/runs/7716663176

It requires several adjustments:

Unable to connect to clickhouse server

Tryig to run clickhouse plugin on airflow 2.4.3. Installed the plugin using _pip_requirements in docker.

Iv given the connection parameters as sqlite. It is not able to connect to clickhouse.

Is there any connection setup needed at the clickhouse end?

Connection to clickhouse fails to even read table with buffer end error

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_clickhouse_plugin/operators/clickhouse_operator.py", line 33, in execute
    return hook.run(self._sql, self._parameters)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_clickhouse_plugin/hooks/clickhouse_hook.py", line 72, in run
    types_check=types_check,
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/client.py", line 345, in execute
    with self.disconnect_on_error(query, settings):
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/client.py", line 289, in disconnect_on_error
    self.establish_connection(settings)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/client.py", line 276, in establish_connection
    self.connection.force_connect()
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/connection.py", line 235, in force_connect
    self.connect()
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/connection.py", line 342, in connect
    return self._init_connection(host, port)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/connection.py", line 318, in _init_connection
    self.receive_hello()
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/connection.py", line 421, in receive_hello
    packet_type = read_varint(self.fin)
  File "clickhouse_driver/varint.pyx", line 62, in clickhouse_driver.varint.read_varint
  File "clickhouse_driver/bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.BufferedReader.read_one
  File "clickhouse_driver/bufferedreader.pyx", line 240, in clickhouse_driver.bufferedreader.BufferedSocketReader.read_into_buffer
EOFError: Unexpected EOF while reading bytes

ERROR - Failed to execute job 9 for task select_users (Unexpected EOF while reading bytes; 2927)

AttributeError: 'int' object has no attribute 'encode'

Insert into a clickhouse table with int data is giving an encoding error.

[2022-12-12, 07:56:58 UTC] {clickhouse_hook.py:84} INFO - INSERT INTO table VALUES with ((1, 'Default manager', None, '[email protected]', None, 1, datetime.datetime(2020, 2, 3, 10, 29, 34), 2),)
[2022-12-12, 07:56:58 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 193, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/mysqltoclickhouse.py", line 18, in mysql_to_clickhouse
    ch_hook.run('INSERT INTO fm_revive.rv_agency VALUES', records)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_clickhouse_plugin/hooks/clickhouse_hook.py", line 72, in run
    types_check=types_check,
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/client.py", line 354, in execute
    columnar=columnar
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/client.py", line 579, in process_insert_query
    types_check=types_check, columnar=columnar)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/client.py", line 631, in send_data
    self.connection.send_data(block)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/connection.py", line 604, in send_data
    self.block_out.write(block)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/streams/native.py", line 39, in write
    self.fout, types_check=block.types_check)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/columns/service.py", line 157, in write_column
    column.write_data(items, buf)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 87, in write_data
    self._write_data(items, buf)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 91, in _write_data
    self.write_items(prepared, buf)
  File "/home/airflow/.local/lib/python3.7/site-packages/clickhouse_driver/columns/stringcolumn.py", line 18, in write_items
    buf.write_strings(items, encoding=self.encoding)
  File "clickhouse_driver/bufferedwriter.pyx", line 54, in clickhouse_driver.bufferedwriter.BufferedWriter.write_strings
AttributeError: 'int' object has no attribute 'encode'

How can i bypass or handle this error?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.