Giter Site home page Giter Site logo

mohaseeb / beam-nuggets Goto Github PK

View Code? Open in Web Editor NEW
86.0 11.0 38.0 6.33 MB

Collection of transforms for the Apache beam python SDK.

Home Page: http://mohaseeb.com/beam-nuggets/

License: MIT License

Python 98.66% Shell 1.34%
apache-beam relational-databases python

beam-nuggets's People

Contributors

alfredo avatar astrocox avatar mohaseeb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

beam-nuggets's Issues

Issues with Dataflow

Works when using the interactive runner but not when using dataflow.

Using the sample code.

Any ideas?

relational db failed to write data

The relational_db_api.py file is not working fine on python3 the issue is the code at line number 457:for col, value in record.iteritems()
the iteritems() is used in only python version2 not compatible with version3 and above,
kindly change the same with "record.items()"

I am using the IO in my development for now wanted to take it in production can't debug it manually every time as it will be running on docker and take the api from pip install not from my local.

pg8000 1.16.6 or greater is required [while running 'Writing to DB/ParDo(_WriteToRelationalDBFn)']

I am running write_to_relational_db.py and I am facing a dependency problem :

Error message from worker: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1257, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 502, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle File "apache_beam/runners/common.py", line 508, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py", line 177, in start_bundle self._db = SqlAlchemyDB(self.source_config) File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py", line 249, in __init__ self._SessionClass = sessionmaker(bind=create_engine(self._source.url)) File "<string>", line 2, in create_engine File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/deprecations.py", line 298, in warned return fn(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/create.py", line 561, in create_engine dialect = dialect_cls(**dialect_args) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/pg8000.py", line 395, in __init__ raise NotImplementedError("pg8000 1.16.6 or greater is required") NotImplementedError: pg8000 1.16.6 or greater is required During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 706, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/common.py", line 1274, in apache_beam.runners.common.DoFnRunner.start File "apache_beam/runners/common.py", line 1259, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1257, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 502, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle File "apache_beam/runners/common.py", line 508, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py", line 177, in start_bundle self._db = SqlAlchemyDB(self.source_config) File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py", line 249, in __init__ self._SessionClass = sessionmaker(bind=create_engine(self._source.url)) File "<string>", line 2, in create_engine File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/deprecations.py", line 298, in warned return fn(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/create.py", line 561, in create_engine dialect = dialect_cls(**dialect_args) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/pg8000.py", line 395, in __init__ raise NotImplementedError("pg8000 1.16.6 or greater is required") NotImplementedError: pg8000 1.16.6 or greater is required [while running 'Writing to DB/ParDo(_WriteToRelationalDBFn)']

Can we change the version of pg8000 ?

SQL query support

I noticed SQL query support is on your backlog. I would find this an extremely useful feature. For example, I could then run a Batch pipeline to only load new DB entries rather than full tables each time. Happy to attempt a contribution with some guidance as new to Python.

AttributeError: 'Read' object has no attribute 'source'

New to python and new to dataflow....

I'm at a loss what I'm missing. Is this potentially an error with beam-nuggets?

$ python run_pipeline.py --requirements_file requirements.txt --runner DataflowRunner --staging_location [redacted]/staging --temp_location [redacted]/temp --template_location [redacted]/template --project [redacted]
Traceback (most recent call last):
  File "run_pipeline.py", line 73, in <module>
    run()
  File "run_pipeline.py", line 58, in run
    table_name=rdb_table_name,
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 887, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 510, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 480, in apply
    return self.apply(transform, pvalueish)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 516, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
    return m(transform, input, options)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 845, in apply_Read
    if hasattr(transform.source, 'format'):
AttributeError: 'Read' object has no attribute 'source'
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
from beam_nuggets.io import relational_db

PROJECT = ""

class CopyRDBToBQOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    """
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--tablename',
            type=str,
            help='Table Name'
        )

def run():
    """
    Pipeline entry point, runs the all the necessary processes
    - Read from Relational DB
    - Save dict to text file in a Storage Bucket
    - Commit to BigQuery
    """

    # Retrieve project Id and append to PROJECT form GoogleCloudOptions
    global PROJECT

    # Initialize runtime parameters as object
    rdbtobq_options = PipelineOptions().view_as(CopyRDBToBQOptions)

    pipeline_options = PipelineOptions()
    # Save main session state so pickled functions and classes
    # defined in __main__ can be unpickled
    pipeline_options.view_as(SetupOptions).save_main_session = True

    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='[redacted]',
        port=0000,
        username='[redacted]',
        password='[redacted]',
        database='[redacted]',
    )
    rdb_table_name=rdbtobq_options.tablename
    bq_table = '[redacted].{}'.format(rdbtobq_options.tablename)

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name=rdb_table_name,
    )

    records | beam.io.gcp.bigquery.WriteToBigQuery(
        bq_table,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        method='FILE_LOADS'
    )

if __name__ == '__main__':
    run()
# requirements.txt
apache-beam[gcp]
google-cloud
google-cloud-bigquery
google-cloud-dataflow
google-cloud-storage
beam-nuggets

KafkaConsume forces UTF-8 decoding

Description

The KafkaConsume DoFn uses the default decoder for each message it receives (code here). This forces messages being pulled from a topic to be decoded using utf-8.
While this is unlikely to cause an issue for most use cases, the function is rendered unusable if the encoding is anything but utf-8.

Proposal

Using utf-8 decoding as a default is still a great idea, but users should have the ability to override this default and provide their own decoder function which can be invoked before returning the message.

Support for Postgres citext datatype

Great work on these beam nuggets!

I successfully used the RelationalDB read to connect to my PostgreSQL DB and save the table contents to a JSON file.

One thing I noticed, however, is that this doesn't support the Postgres citext datatype. I found a workaround by using this: https://github.com/mahmoudimus/sqlalchemy-citext

I wanted to bring to your attention in case you can incorporate into the nugget. I would attempt a contribution / pull request but I am new to Python.

Support configuring additional properties on SQLAlchemy engine

CloudSQL documentation on managing connections (https://cloud.google.com/sql/docs/postgres/manage-connections) gives guidance on managing connection pool size and connection timeout/duration. Code sample here: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/cloud-sql/postgres/sqlalchemy/main.py

Pasting important part here:

# [START cloud_sql_postgres_sqlalchemy_create]
# The SQLAlchemy engine will help manage interactions, including automatically
# managing a pool of connections to your database
db = sqlalchemy.create_engine(
    # Equivalent URL:
    # postgres+pg8000://<db_user>:<db_pass>@/<db_name>?unix_sock=/cloudsql/<cloud_sql_instance_name>/.s.PGSQL.5432
    sqlalchemy.engine.url.URL(
        drivername='postgres+pg8000',
        username=db_user,
        password=db_pass,
        database=db_name,
        query={
            'unix_sock': '/cloudsql/{}/.s.PGSQL.5432'.format(
                cloud_sql_connection_name)
        }
    ),
    # ... Specify additional properties here.
    # [START_EXCLUDE]

    # [START cloud_sql_postgres_sqlalchemy_limit]
    # Pool size is the maximum number of permanent connections to keep.
    pool_size=5,
    # Temporarily exceeds the set pool_size if no connections are available.
    max_overflow=2,
    # The total number of concurrent connections for your application will be
    # a total of pool_size and max_overflow.
    # [END cloud_sql_postgres_sqlalchemy_limit]

    # [START cloud_sql_postgres_sqlalchemy_backoff]
    # SQLAlchemy automatically uses delays between failed connection attempts,
    # but provides no arguments for configuration.
    # [END cloud_sql_postgres_sqlalchemy_backoff]

    # [START cloud_sql_postgres_sqlalchemy_timeout]
    # 'pool_timeout' is the maximum number of seconds to wait when retrieving a
    # new connection from the pool. After the specified amount of time, an
    # exception will be thrown.
    pool_timeout=30,  # 30 seconds
    # [END cloud_sql_postgres_sqlalchemy_timeout]

    # [START cloud_sql_postgres_sqlalchemy_lifetime]
    # 'pool_recycle' is the maximum number of seconds a connection can persist.
    # Connections that live longer than the specified amount of time will be
    # reestablished
    pool_recycle=1800,  # 30 minutes
    # [END cloud_sql_postgres_sqlalchemy_lifetime]

    # [END_EXCLUDE]
)
# [END cloud_sql_postgres_sqlalchemy_create]

Issue with Dataflow: create() missing 1 required positional argument: 'drivername'

versions

beam-nuggets==0.18.0
pg8000==1.16.5
SQLAlchemy==1.4.0

configuration in the code

source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host=known_args.host,
        port=known_args.port,
        username=known_args.username,
        password=known_args.password,
        database='geospatial',
 )
   file = (p | 'States' >> beam.Create(['Vermont'])
            | 'Links' >> beam.ParDo(Links())
            | 'Download from Github to GCS' >> beam.ParDo(DownloadAsItIs())
            | 'Read the file from GCS' >> beam.io.ReadAllFromText()
            | 'Transform lines' >> beam.Map(lambda s: data_ingestion.parse_line(s))
            | 'Filter lines' >> beam.Filter(is_validGeometry)
            | 'Transform for Postgres' >> beam.Map(lambda s: postgres_ingestion.parse_line(s))
            | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config)
    )

Error

Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/url.py", line 100, in __new__ return URL.create(*arg, **kw) TypeError: create() missing 1 required positional argument: 'drivername' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 695, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 697, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 698, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 308, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 314, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 644, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 645, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 289, in apache_beam.runners.worker.operations.Operation.setup File "apache_beam/runners/worker/operations.py", line 303, in apache_beam.runners.worker.operations.Operation.setup File "apache_beam/runners/worker/operations.py", line 779, in apache_beam.runners.worker.operations.DoOperation._get_runtime_performance_hints File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/url.py", line 100, in __new__ return URL.create(*arg, **kw) TypeError: create() missing 1 required positional argument: 'drivername'

Speed up SelectFromNestedDict

Hello! Would you accept a PR if I submit a change to speed up SelectFromNestedDict at the expense of initialization time?

It would look like this:

from convtools import conversion as c

class SelectFromNestedDict:
    def __init__(self, keys, deepest_key_as_name=False, *args, **kwargs):
        super(SelectFromNestedDict, self).__init__(*args, **kwargs)
        self._compiled_keys = self._compile_keys(keys, deepest_key_as_name)

        self._element_to_dict = c(
            {
                out_key: c.item(*nested_keys)
                for nested_keys, out_key in self._compile_keys(
                    keys, deepest_key_as_name
                )
            }
        ).gen_converter()

It generates the following function under the hood:

def converter(data_):
    try:
        return {"a_b": data_["a"]["b"], "c_d_e": data_["c"]["d"]["e"]}
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

# CODE GENERATION DOESN'T TAKE TOO LONG
# 61.1 ยตs ยฑ 69.7 ns per loop (mean ยฑ std. dev. of 7 runs, 10,000 loops each)

The results are:

def _compile_keys(keys, deepest_key_as_name):
    def _get_out_dict_key(nested_keys):
        if deepest_key_as_name:
            return nested_keys[-1]
        else:
            return "_".join(nested_keys)

    return [
        (
            nested_keys,  # ['a', 'b'] used for retrieving nested values
            _get_out_dict_key(nested_keys),  # 'a_b' or 'b'
        )
        for nested_keys in [key.split(".") for key in keys]  # ['a.b']
    ]


keys = _compile_keys(["a.b", "c.d.e"], False)


def _retrieve(nested_keys, element):
    for key in nested_keys:
        element = element[key]
    return element


def process_as_is(element):
    """
    Args:
        element(dict):
    """

    yield {
        out_key: _retrieve(nested_keys, element)
        for nested_keys, out_key in keys
    }


from convtools import conversion as c


element_to_dict = c(
    {out_key: c.item(*nested_keys) for nested_keys, out_key in keys}
).gen_converter()


def process_with_convtools(element):
    yield element_to_dict(element)


d = {"a": {"b": 1}, "c": {"d": {"e": 2}}}
assert list(process_with_convtools(d)) == list(process_as_is(d))

# In [31]: %timeit list(process_with_convtools(d))
# 289 ns ยฑ 0.709 ns per loop (mean ยฑ std. dev. of 7 runs, 1,000,000 loops each)
#
# In [32]: %timeit list(process_as_is(d))
# 555 ns ยฑ 1.3 ns per loop (mean ยฑ std. dev. of 7 runs, 1,000,000 loops each)

feature request: pass parameters to query to prevent SQL injection

Current methods like ReadFromDB don't provide the option to pass query parameters in directly. This opens code up to SQL injection as code that uses beam nuggets might only have query parameters available at runtime or in the beam pipeline and be unable to additionally validate them.

One solution would be to extend current capabilities to allow passing in a parameter dictionary. This would involve modifying the following, for example:

One caveat to this solution is that the parameters available in a dictionary may not be easily serializable.

Fail to read data from tables in different schema (Postgres)

Hi,

I'm trying to read data from a table in a different schema than public but I get the error message that the table can't be found. I've tried to use the schema name both as the table name and in the query but to no success. I think it could be quite good to include the possibility to use different schemas, not only the default one. So I did research.. :)

I read that SQLAlchemy accepts the schema name and after some digging I found that i could supply it in the load_table function in relational_db_api.py. After I added it (see below) and also added it in all the calling places it seemed to work:

def load_table(session, name, schema):
    table_class = None
    engine = session.bind
    if engine.dialect.has_table(engine, name, schema=schema):
        metadata = MetaData(bind=engine)
        table_class = create_table_class(
            Table(name, metadata, autoload=True, schema=schema)
        )
    return table_class

I set the default value to public (in the ReadFromDB function) but I'm not sure if that will only work for Postgres. And I didn't look at the write functions at all...

//Mattias

Unable to read MySQL table without Primary Key

I created a simple table and ran a Dataflow job. But I got hit by this error:

sqlalchemy.exc.ArgumentError: Mapper mapped class TableClass->data could not assemble any primary key columns for mapped table 'data' [while running 'Reading from CloudSQLMySQL/ParDo(_ReadFromRelationalDBFn)']

Then I alter my table, I added a primary key then ran the job again and got succeeded.

It appears that beam_nuggets cannot read a table without primary key. If so, why is that? Is there a way to bypass this limitation since I'm trying to dump some tcp data into a table and there's no way to generate a primary key for that data.

Here is the full stacktrace of the error:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db.py", line 96, in process
    for record in db.query(table_name, query):
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 269, in query
    table = self._open_table_for_read(table_name)
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 304, in _open_table_for_read
    get_table_f=load_table
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 319, in _open_table
    self._get_table(name, get_table_f, **get_table_f_params)
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 325, in _get_table
    table_class = get_table_f(self._session, name, **get_table_f_params)
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 382, in load_table
    table_class = create_table_class(Table(name, metadata, autoload=True))
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 420, in create_table_class
    class TableClass(declarative_base()):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/api.py", line 75, in __init__
    _as_declarative(cls, classname, cls.__dict__)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 131, in _as_declarative
    _MapperConfig.setup_mapping(cls, classname, dict_)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 160, in setup_mapping
    cfg_cls(cls_, classname, dict_)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 194, in __init__
    self._early_mapping()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 199, in _early_mapping
    self.map()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 696, in map
    self.cls, self.local_table, **self.mapper_args
  File "<string>", line 2, in mapper
  File "<string>", line 2, in __init__
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/deprecations.py", line 128, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/mapper.py", line 716, in __init__
    self._configure_pks()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/mapper.py", line 1397, in _configure_pks
    % (self, self.persist_selectable.description)
sqlalchemy.exc.ArgumentError: Mapper mapped class TableClass->data could not assemble any primary key columns for mapped table 'data'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 650, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 176, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db.py", line 96, in process
    for record in db.query(table_name, query):
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 269, in query
    table = self._open_table_for_read(table_name)
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 304, in _open_table_for_read
    get_table_f=load_table
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 319, in _open_table
    self._get_table(name, get_table_f, **get_table_f_params)
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 325, in _get_table
    table_class = get_table_f(self._session, name, **get_table_f_params)
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 382, in load_table
    table_class = create_table_class(Table(name, metadata, autoload=True))
  File "/usr/local/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 420, in create_table_class
    class TableClass(declarative_base()):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/api.py", line 75, in __init__
    _as_declarative(cls, classname, cls.__dict__)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 131, in _as_declarative
    _MapperConfig.setup_mapping(cls, classname, dict_)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 160, in setup_mapping
    cfg_cls(cls_, classname, dict_)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 194, in __init__
    self._early_mapping()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 199, in _early_mapping
    self.map()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/base.py", line 696, in map
    self.cls, self.local_table, **self.mapper_args
  File "<string>", line 2, in mapper
  File "<string>", line 2, in __init__
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/deprecations.py", line 128, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/mapper.py", line 716, in __init__
    self._configure_pks()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/mapper.py", line 1397, in _configure_pks
    % (self, self.persist_selectable.description)
sqlalchemy.exc.ArgumentError: Mapper mapped class TableClass->data could not assemble any primary key columns for mapped table 'data' [while running 'Reading from CloudSQLMySQL/ParDo(_ReadFromRelationalDBFn)']

AttributeError: 'OracleDialect_cx_oracle' object has no attribute 'default_schema_name'

I am trying to use beam-nuggets to access Oracle, and I am receiving:

AttributeError: 'OracleDialect_cx_oracle' object has no attribute 'default_schema_name'

Code:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='oracle+cx_oracle',
host='...',
port=1521,
username='',
password='
'
)
records = p | "Reading records from db" >> relational_db.ReadFromDB(
source_config=source_config,
table_name = 'CONTACTS',
query='select * from EHR.CONTACTS' # optional. When omitted, all table records are returned.
)
records | 'Writing to stdout' >> beam.Map(print)

Stack trace:

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py in process(self, element)
94 try:
95 if query:
---> 96 for record in db.query(table_name, query):
97 yield record
98 else:

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in query(self, table_name, query)
267
268 def query(self, table_name, query):
--> 269 table = self._open_table_for_read(table_name)
270 for record in table.query_records(self._session, query):
271 yield record

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in _open_table_for_read(self, name)
302 return self._open_table(
303 name=name,
--> 304 get_table_f=load_table
305 )
306

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in _open_table(self, name, get_table_f, **get_table_f_params)
317 if not table:
318 self._name_to_table[name] = (
--> 319 self._get_table(name, get_table_f, **get_table_f_params)
320 )
321 table = self._name_to_table[name]

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in _get_table(self, name, get_table_f, **get_table_f_params)
323
324 def _get_table(self, name, get_table_f, **get_table_f_params):
--> 325 table_class = get_table_f(self._session, name, **get_table_f_params)
326 if table_class:
327 table = _Table(table_class=table_class, name=name)

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in load_table(session, name)
378 table_class = None
379 engine = session.bind
--> 380 if engine.dialect.has_table(engine, name):
381 metadata = MetaData(bind=engine)
382 table_class = create_table_class(Table(name, metadata, autoload=True))

/opt/conda/lib/python3.7/site-packages/sqlalchemy/dialects/oracle/base.py in has_table(self, connection, table_name, schema)
1356 def has_table(self, connection, table_name, schema=None):
1357 if not schema:
-> 1358 schema = self.default_schema_name
1359 cursor = connection.execute(
1360 sql.text(

AttributeError: 'OracleDialect_cx_oracle' object has no attribute 'default_schema_name'

Error when writing transformed data

When I transform a collection of json rows to extract some data to write to the database, I get the error below when executing the pipeline. If I create an in memory copy of the same data, it works as expected. Is there something about the collection that breaks the write to database transform?

File "/pipeline/venv/lib/python3.7/site-packages/future/utils/init.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 441, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/pipeline/venv/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py", line 181, in process
assert isinstance(element, dict)
RuntimeError: AssertionError [while running 'Write to Database/ParDo(_WriteToRelationalDBFn)']
make: *** [run] Error 1

This works:

        organisations = (p 
                         | 'Reading Organisations'
                         >> beam.Create([
                                {"name": "MARTINEZ GROUP", "ref": 854},
                                {"name": "DOUGHERTY-LEE", "ref": 955},
                                {"name": "BENSON LLC", "ref": 209},
                                {"name": "RILEY GROUP", "ref": 824},
                                {"name": "KING, PINEDA AND ROBERTS", "ref": 244}
                            ])
                         | 'Write to Database'
                         >> relational_db.Write(
                              source_config=source_config,
                              table_config=table_config))

Whereas this fails:

      organisations = (normalised_organisations
                                 | 'Write to Database'
                                 >> relational_db.Write(
                                       source_config=source_config,
                                       table_config=table_config))

Crash while writing to mySQL

I get the following crash while writing blob to a relational database (cloudSQL). Any suggestion on what could be issue here.

2020-12-16 16:55:13.027 PSTError message from worker: generic::unknown: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1204, in _execute_context context = constructor(dialect, self, conn, *args) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 865, in _init_compiled for key in compiled_params File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 865, in for key in compiled_params File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 915, in process return DBAPIBinary(value) File "/usr/local/lib/python3.7/site-packages/pymysql/init.py", line 85, in Binary return bytes(x) TypeError: string argument without an encoding The above exception was the direct cause of the following exception: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py", line 182, in process self._db.write_record(self.table_config, element) File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py", line 288, in write_record record_dict=record_dict File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py", line 360, in write_record session.execute(insert_stmt) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1292, in execute clause, params or {} File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute return meth(self, multiparams, params) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement distilled_params, File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1207, in execute_context e, util.text_type(statement), parameters, None, None File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in handle_dbapi_exception sqlalchemy_exception, with_traceback=exc_info[2], from=e File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise raise exception File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1204, in _execute_context context = constructor(dialect, self, conn, *args) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 865, in _init_compiled for key in compiled_params File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 865, in for key in compiled_params File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 915, in process return DBAPIBinary(value) File "/usr/local/lib/python3.7/site-packages/pymysql/init.py", line 85, in Binary return bytes(x) sqlalchemy.exc.StatementError: (builtins.TypeError) string argument without an encoding [SQL: INSERT INTO region_master (edge_location_region_id, edge_location_region_name, cloud_provider, fwaas_display_name, public_subnets) VALUES (%(edge_location_region_id)s, %(edge_location_region_name)s, %(cloud_provider)s, %(fwaas_display_name)s, %(public_subnets)s) ON DUPLICATE KEY UPDATE edge_location_region_id = %(param_1)s, edge_location_region_name = %(param_2)s, cloud_provider = %(param_3)s, fwaas_display_name = %(param_4)s, public_subnets = %(param_5)s] [parameters: [{}]] During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 860, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.7/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py", line 182, in process self._db.write_record(self.table_config, element) File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py", line 288, in write_record record_dict=record_dict File "/usr/local/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py", line 360, in write_record session.execute(insert_stmt) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1292, in execute clause, params or {} File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute return meth(self, multiparams, params) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement distilled_params, File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1207, in execute_context e, util.text_type(statement), parameters, None, None File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in handle_dbapi_exception sqlalchemy_exception, with_traceback=exc_info[2], from=e File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise raise exception File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1204, in _execute_context context = constructor(dialect, self, conn, *args) File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 865, in _init_compiled for key in compiled_params File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 865, in for key in compiled_params File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 915, in process return DBAPIBinary(value) File "/usr/local/lib/python3.7/site-packages/pymysql/init.py", line 85, in Binary return bytes(x) RuntimeError: sqlalchemy.exc.StatementError: (builtins.TypeError) string argument without an encoding [SQL: INSERT INTO region_master (edge_location_region_id, edge_location_region_name, cloud_provider, fwaas_display_name, public_subnets) VALUES (%(edge_location_region_id)s, %(edge_location_region_name)s, %(cloud_provider)s, %(fwaas_display_name)s, %(public_subnets)s) ON DUPLICATE KEY UPDATE edge_location_region_id = %(param_1)s, edge_location_region_name = %(param_2)s, cloud_provider = %(param_3)s, fwaas_display_name = %(param_4)s, public_subnets = %(param_5)s] [parameters: [{}]] [while running 'write_to_cloudsql_region_master/ParDo(_WriteToRelationalDBFn)-ptransform-25']

Data:
b'[{"edge_location_region_id": 1, "edge_location_region_name": "us-west-2", "cloud_provider": "aws", "fwaas_display_name": "US Northwest"}, {"edge_location_region_id": 2, "edge_location_region_name": "us-west-1", "cloud_provider": "aws", "fwaas_display_name": "US West"}, {"edge_location_region_id": 3, "edge_location_region_name": "us-east-2", "cloud_provider": "aws", "fwaas_display_name": "US Central"}, {"edge_location_region_id": 4, "edge_location_region_name": "ap-south-1", "cloud_provider": "aws", "fwaas_display_name": "India West"}, {"edge_location_region_id": 5, "edge_location_region_name": "ap-northeast-2", "cloud_provider": "aws", "fwaas_display_name": "South Korea"}, {"edge_location_region_id": 6, "edge_location_region_name": "ap-southeast-1", "cloud_provider": "aws", "fwaas_display_name": "Singapore"}, {"edge_location_region_id": 7, "edge_location_region_name": "ap-southeast-2", "cloud_provider": "aws", "fwaas_display_name": "Australia Southeast"}, {"edge_location_region_id": 8, "edge_location_region_name": "ap-northeast-1", "cloud_provider": "aws", "fwaas_display_name": "Japan Central"}, {"edge_location_region_id": 9, "edge_location_region_name": "eu-central-1", "cloud_provider": "aws", "fwaas_display_name": "Germany Central"}, {"edge_location_region_id": 10, "edge_location_region_name": "eu-west-1", "cloud_provider": "aws", "fwaas_display_name": "Ireland"}, {"edge_location_region_id": 11, "edge_location_region_name": "sa-east-1", "cloud_provider": "aws", "fwaas_display_name": "Brazil South"}, {"edge_location_region_id": 36, "edge_location_region_name": "us-east-1", "cloud_provider": "aws", "fwaas_display_name": "US East"}, {"edge_location_region_id": 61, "edge_location_region_name": "eu-west-2", "cloud_provider": "aws", "fwaas_display_name": "UK"}, {"edge_location_region_id": 64, "edge_location_region_name": "ca-central-1", "cloud_provider": "aws", "fwaas_display_name": "Canada East"}, {"edge_location_region_id": 98, "edge_location_region_name": "me-south-1", "cloud_provider": "aws", "fwaas_display_name": "Bahrain"}, {"edge_location_region_id": 100, "edge_location_region_name": "eu-west-3", "cloud_provider": "aws", "fwaas_display_name": "France North"}, {"edge_location_region_id": 104, "edge_location_region_name": "south-africa-west", "cloud_provider": "aws", "fwaas_display_name": "South Africa West"}, {"edge_location_region_id": 200, "edge_location_region_name": "us-west-2", "cloud_provider": "gcp", "fwaas_display_name": "US Northwest"}, {"edge_location_region_id": 201, "edge_location_region_name": "us-west-201", "cloud_provider": "gcp", "fwaas_display_name": "US Southwest"}, {"edge_location_region_id": 203, "edge_location_region_name": "us-west-1", "cloud_provider": "gcp", "fwaas_display_name": "US West"}, {"edge_location_region_id": 208, "edge_location_region_name": "canada-west", "cloud_provider": "gcp", "fwaas_display_name": "Canada West"}, {"edge_location_region_id": 209, "edge_location_region_name": "ap-southeast-2", "cloud_provider": "gcp", "fwaas_display_name": "Australia Southeast"}, {"edge_location_region_id": 210, "edge_location_region_name": "ap-south-1", "cloud_provider": "gcp", "fwaas_display_name": "India West"}, {"edge_location_region_id": 211, "edge_location_region_name": "ap-northeast-1", "cloud_provider": "gcp", "fwaas_display_name": "Japan Central"}, {"edge_location_region_id": 212, "edge_location_region_name": "eu-west-2", "cloud_provider": "gcp", "fwaas_display_name": "UK"}, {"edge_location_region_id": 213, "edge_location_region_name": "eu-central-1", "cloud_provider": "gcp", "fwaas_display_name": "Germany Central"}, {"edge_location_region_id": 214, "edge_location_region_name": "us-east-1", "cloud_provider": "gcp", "fwaas_display_name": "US East"}, {"edge_location_region_id": 215, "edge_location_region_name": "taiwan", "cloud_provider": "gcp", "fwaas_display_name": "Taiwan"}, {"edge_location_region_id": 216, "edge_location_region_name": "netherlands-central", "cloud_provider": "gcp", "fwaas_display_name": "Netherlands Central", "public_subnets": "[\"192.168.16.0/22\",\"193.15.10.0/24\"]"}, {"edge_location_region_id": 217, "edge_location_region_name": "us-southeast", "cloud_provider": "gcp", "fwaas_display_name": "US Southeast"}, {"edge_location_region_id": 218, "edge_location_region_name": "finland", "cloud_provider": "gcp", "fwaas_display_name": "Finland"}, {"edge_location_region_id": 219, "edge_location_region_name": "hong-kong", "cloud_provider": "gcp", "fwaas_display_name": "Hong Kong"}, {"edge_location_region_id": 220, "edge_location_region_name": "us-east-2", "cloud_provider": "gcp", "fwaas_display_name": "US Central"}, {"edge_location_region_id": 221, "edge_location_region_name": "ca-central-1", "cloud_provider": "gcp", "fwaas_display_name": "Canada East"}, {"edge_location_region_id": 222, "edge_location_region_name": "ap-southeast-1", "cloud_provider": "gcp", "fwaas_display_name": "Singapore"}, {"edge_location_region_id": 224, "edge_location_region_name": "ap-northeast-2", "cloud_provider": "gcp", "fwaas_display_name": "South Korea"}, {"edge_location_region_id": 225, "edge_location_region_name": "eu-west-1", "cloud_provider": "gcp", "fwaas_display_name": "Ireland"}, {"edge_location_region_id": 229, "edge_location_region_name": "eu-west-3", "cloud_provider": "gcp", "fwaas_display_name": "France North"}, {"edge_location_region_id": 233, "edge_location_region_name": "kenya", "cloud_provider": "gcp", "fwaas_display_name": "Kenya"}, {"edge_location_region_id": 235, "edge_location_region_name": "nigeria", "cloud_provider": "gcp", "fwaas_display_name": "Nigeria"}, {"edge_location_region_id": 237, "edge_location_region_name": "south-africa-central", "cloud_provider": "gcp", "fwaas_display_name": "South Africa Central"}, {"edge_location_region_id": 239, "edge_location_region_name": "south-africa-west", "cloud_provider": "gcp", "fwaas_display_name": "South Africa West"}, {"edge_location_region_id": 241, "edge_location_region_name": "australia-east", "cloud_provider": "gcp", "fwaas_display_name": "Australia East"}, {"edge_location_region_id": 245, "edge_location_region_name": "australia-south", "cloud_provider": "gcp", "fwaas_display_name": "Australia South"}, {"edge_location_region_id": 247, "edge_location_region_name": "bangladesh", "cloud_provider": "gcp", "fwaas_display_name": "Bangladesh"}, {"edge_location_region_id": 249, "edge_location_region_name": "cambodia", "cloud_provider": "gcp", "fwaas_display_name": "Cambodia"}, {"edge_location_region_id": 253, "edge_location_region_name": "india-north", "cloud_provider": "gcp", "fwaas_display_name": "India North"}, {"edge_location_region_id": 255, "edge_location_region_name": "india-south", "cloud_provider": "gcp", "fwaas_display_name": "India South"}, {"edge_location_region_id": 259, "edge_location_region_name": "indonesia", "cloud_provider": "gcp", "fwaas_display_name": "Indonesia"}, {"edge_location_region_id": 263, "edge_location_region_name": "japan-south", "cloud_provider": "gcp", "fwaas_display_name": "Japan South"}, {"edge_location_region_id": 265, "edge_location_region_name": "malaysia", "cloud_provider": "gcp", "fwaas_display_name": "Malaysia"}, {"edge_location_region_id": 267, "edge_location_region_name": "myanmar", "cloud_provider": "gcp", "fwaas_display_name": "Myanmar"}, {"edge_location_region_id": 269, "edge_location_region_name": "new-zealand", "cloud_provider": "gcp", "fwaas_display_name": "New Zealand"}, {"edge_location_region_id": 271, "edge_location_region_name": "pakistan-south", "cloud_provider": "gcp", "fwaas_display_name": "Pakistan South"}, {"edge_location_region_id": 273, "edge_location_region_name": "pakistan-west", "cloud_provider": "gcp", "fwaas_display_name": "Pakistan West"}, {"edge_location_region_id": 275, "edge_location_region_name": "papua-new-guinea", "cloud_provider": "gcp", "fwaas_display_name": "Papua New Guinea"}, {"edge_location_region_id": 277, "edge_location_region_name": "philippines", "cloud_provider": "gcp", "fwaas_display_name": "Philippines"}, {"edge_location_region_id": 285, "edge_location_region_name": "thailand", "cloud_provider": "gcp", "fwaas_display_name": "Thailand"}, {"edge_location_region_id": 287, "edge_location_region_name": "vietnam", "cloud_provider": "gcp", "fwaas_display_name": "Vietnam"}, {"edge_location_region_id": 289, "edge_location_region_name": "andorra", "cloud_provider": "gcp", "fwaas_display_name": "Andorra"}, {"edge_location_region_id": 291, "edge_location_region_name": "austria", "cloud_provider": "gcp", "fwaas_display_name": "Austria"}, {"edge_location_region_id": 293, "edge_location_region_name": "belarus", "cloud_provider": "gcp", "fwaas_display_name": "Belarus"}, {"edge_location_region_id": 295, "edge_location_region_name": "belgium", "cloud_provider": "gcp", "fwaas_display_name": "Belgium"}, {"edge_location_region_id": 297, "edge_location_region_name": "bulgaria", "cloud_provider": "gcp", "fwaas_display_name": "Bulgaria"}, {"edge_location_region_id": 299, "edge_location_region_name": "croatia", "cloud_provider": "gcp", "fwaas_display_name": "Croatia"}, {"edge_location_region_id": 301, "edge_location_region_name": "czech-republic", "cloud_provider": "gcp", "fwaas_display_name": "Czech Republic"}, {"edge_location_region_id": 303, "edge_location_region_name": "denmark", "cloud_provider": "gcp", "fwaas_display_name": "Denmark"}, {"edge_location_region_id": 305, "edge_location_region_name": "egypt", "cloud_provider": "gcp", "fwaas_display_name": "Egypt"}, {"edge_location_region_id": 309, "edge_location_region_name": "france-south", "cloud_provider": "gcp", "fwaas_display_name": "France South"}, {"edge_location_region_id": 315, "edge_location_region_name": "germany-north", "cloud_provider": "gcp", "fwaas_display_name": "Germany North"}, {"edge_location_region_id": 317, "edge_location_region_name": "germany-south", "cloud_provider": "gcp", "fwaas_display_name": "Germany South"}, {"edge_location_region_id": 319, "edge_location_region_name": "greece", "cloud_provider": "gcp", "fwaas_display_name": "Greece"}, {"edge_location_region_id": 321, "edge_location_region_name": "hungary", "cloud_provider": "gcp", "fwaas_display_name": "Hungary"}, {"edge_location_region_id": 325, "edge_location_region_name": "israel", "cloud_provider": "gcp", "fwaas_display_name": "Israel"}, {"edge_location_region_id": 327, "edge_location_region_name": "italy", "cloud_provider": "gcp", "fwaas_display_name": "Italy"}, {"edge_location_region_id": 329, "edge_location_region_name": "jordan", "cloud_provider": "gcp", "fwaas_display_name": "Jordan"}, {"edge_location_region_id": 331, "edge_location_region_name": "kuwait", "cloud_provider": "gcp", "fwaas_display_name": "Kuwait"}, {"edge_location_region_id": 333, "edge_location_region_name": "liechtenstein", "cloud_provider": "gcp", "fwaas_display_name": "Liechtenstein"}, {"edge_location_region_id": 335, "edge_location_region_name": "lithuania", "cloud_provider": "gcp", "fwaas_display_name": "Lithuania"}, {"edge_location_region_id": 337, "edge_location_region_name": "luxembourg", "cloud_provider": "gcp", "fwaas_display_name": "Luxembourg"}, {"edge_location_region_id": 339, "edge_location_region_name": "moldova", "cloud_provider": "gcp", "fwaas_display_name": "Moldova"}, {"edge_location_region_id": 341, "edge_location_region_name": "monaco", "cloud_provider": "gcp", "fwaas_display_name": "Monaco"}, {"edge_location_region_id": 345, "edge_location_region_name": "netherlands-south", "cloud_provider": "gcp", "fwaas_display_name": "Netherlands South"}, {"edge_location_region_id": 347, "edge_location_region_name": "norway", "cloud_provider": "gcp", "fwaas_display_name": "Norway"}, {"edge_location_region_id": 349, "edge_location_region_name": "poland", "cloud_provider": "gcp", "fwaas_display_name": "Poland"}, {"edge_location_region_id": 351, "edge_location_region_name": "portugal", "cloud_provider": "gcp", "fwaas_display_name": "Portugal"}, {"edge_location_region_id": 353, "edge_location_region_name": "romania", "cloud_provider": "gcp", "fwaas_display_name": "Romania"}, {"edge_location_region_id": 355, "edge_location_region_name": "russia-central", "cloud_provider": "gcp", "fwaas_display_name": "Russia Central"}, {"edge_location_region_id": 357, "edge_location_region_name": "russia-northwest", "cloud_provider": "gcp", "fwaas_display_name": "Russia Northwest"}, {"edge_location_region_id": 359, "edge_location_region_name": "saudi-arabia", "cloud_provider": "gcp", "fwaas_display_name": "Saudi Arabia"}, {"edge_location_region_id": 361, "edge_location_region_name": "slovakia", "cloud_provider": "gcp", "fwaas_display_name": "Slovakia"}, {"edge_location_region_id": 363, "edge_location_region_name": "slovenia", "cloud_provider": "gcp", "fwaas_display_name": "Slovenia"}, {"edge_location_region_id": 365, "edge_location_region_name": "spain-central", "cloud_provider": "gcp", "fwaas_display_name": "Spain Central"}, {"edge_location_region_id": 367, "edge_location_region_name": "spain-east", "cloud_provider": "gcp", "fwaas_display_name": "Spain East"}, {"edge_location_region_id": 369, "edge_location_region_name": "sweden", "cloud_provider": "gcp", "fwaas_display_name": "Sweden"}, {"edge_location_region_id": 371, "edge_location_region_name": "switzerland", "cloud_provider": "gcp", "fwaas_display_name": "Switzerland"}, {"edge_location_region_id": 373, "edge_location_region_name": "turkey", "cloud_provider": "gcp", "fwaas_display_name": "Turkey"}, {"edge_location_region_id": 377, "edge_location_region_name": "ukraine", "cloud_provider": "gcp", "fwaas_display_name": "Ukraine"}, {"edge_location_region_id": 379, "edge_location_region_name": "uae", "cloud_provider": "gcp", "fwaas_display_name": "United Arab Emirates"}, {"edge_location_region_id": 381, "edge_location_region_name": "uzbekistan", "cloud_provider": "gcp", "fwaas_display_name": "Uzbekistan"}, {"edge_location_region_id": 383, "edge_location_region_name": "canada-central", "cloud_provider": "gcp", "fwaas_display_name": "Canada Central"}, {"edge_location_region_id": 389, "edge_location_region_name": "costa-rica", "cloud_provider": "gcp", "fwaas_display_name": "Costa Rica"}, {"edge_location_region_id": 391, "edge_location_region_name": "mexico-central", "cloud_provider": "gcp", "fwaas_display_name": "Mexico Central"}, {"edge_location_region_id": 393, "edge_location_region_name": "mexico-west", "cloud_provider": "gcp", "fwaas_display_name": "Mexico West"}, {"edge_location_region_id": 395, "edge_location_region_name": "panama", "cloud_provider": "gcp", "fwaas_display_name": "Panama"}, {"edge_location_region_id": 401, "edge_location_region_name": "us-northeast", "cloud_provider": "gcp", "fwaas_display_name": "US Northeast"}, {"edge_location_region_id": 407, "edge_location_region_name": "us-south", "cloud_provider": "gcp", "fwaas_display_name": "US South"}, {"edge_location_region_id": 415, "edge_location_region_name": "argentina", "cloud_provider": "gcp", "fwaas_display_name": "Argentina"}, {"edge_location_region_id": 417, "edge_location_region_name": "bolivia", "cloud_provider": "gcp", "fwaas_display_name": "Bolivia"}, {"edge_location_region_id": 419, "edge_location_region_name": "sa-east-1", "cloud_provider": "gcp", "fwaas_display_name": "Brazil South"}, {"edge_location_region_id": 421, "edge_location_region_name": "brazil-east", "cloud_provider": "gcp", "fwaas_display_name": "Brazil East"}, {"edge_location_region_id": 423, "edge_location_region_name": "brazil-central", "cloud_provider": "gcp", "fwaas_display_name": "Brazil Central"}, {"edge_location_region_id": 425, "edge_location_region_name": "chile", "cloud_provider": "gcp", "fwaas_display_name": "Chile"}, {"edge_location_region_id": 427, "edge_location_region_name": "columbia", "cloud_provider": "gcp", "fwaas_display_name": "Colombia"}, {"edge_location_region_id": 429, "edge_location_region_name": "ecuador", "cloud_provider": "gcp", "fwaas_display_name": "Ecuador"}, {"edge_location_region_id": 431, "edge_location_region_name": "paraguay", "cloud_provider": "gcp", "fwaas_display_name": "Paraguay"}, {"edge_location_region_id": 433, "edge_location_region_name": "peru", "cloud_provider": "gcp", "fwaas_display_name": "Peru"}, {"edge_location_region_id": 435, "edge_location_region_name": "venezuela", "cloud_provider": "gcp", "fwaas_display_name": "Venezuela"}]'

Performance issue with the write operations

The write operation creates a connection per bundle which can very small for streaming causing a flood of short lived connections and without connection pooling, this could become a performance bottleneck.
Also, having a batch write option would significantly reduce the number of writes.

sqlite beam_nuggets.io.relational_db_api.SqlAlchemyDbException:

I am running the following simple python script:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='sqlite',
database='/sqlite/dbs/chinook'
)

print(source_config.url)

records = p | "Reading records from db" >> relational_db.ReadFromDB(
        source_config=source_config,
        table_name='customers',
        query='select * from customers'  # optional. When omitted, all table records are returned.
    )

records | 'Writing to stdout' >> beam.Map(print)

I receive this error, what am I doing wrong??

C:\Users\chuck.brooks\Miniconda3\envs\base38\python.exe C:/PycharmProjects/base38/beam_nuggets_test.py
sqlite:////sqlite/dbs/chinook
Traceback (most recent call last):
File "apache_beam\runners\common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs
File "C:\Users\chuck.brooks\Miniconda3\envs\base38\lib\site-packages\beam_nuggets\io\relational_db.py", line 96, in process
for record in db.query(table_name, query):
File "C:\Users\chuck.brooks\Miniconda3\envs\base38\lib\site-packages\beam_nuggets\io\relational_db_api.py", line 272, in query
table = self._open_table_for_read(table_name)
File "C:\Users\chuck.brooks\Miniconda3\envs\base38\lib\site-packages\beam_nuggets\io\relational_db_api.py", line 305, in _open_table_for_read
return self._open_table(
File "C:\Users\chuck.brooks\Miniconda3\envs\base38\lib\site-packages\beam_nuggets\io\relational_db_api.py", line 322, in _open_table
self._get_table(name, get_table_f, **get_table_f_params)
File "C:\Users\chuck.brooks\Miniconda3\envs\base38\lib\site-packages\beam_nuggets\io\relational_db_api.py", line 332, in _get_table
raise SqlAlchemyDbException('Failed to get table {}'.format(name))
beam_nuggets.io.relational_db_api.SqlAlchemyDbException: Failed to get table customers

During handling of the above exception, another exception occurred:

sqlalchemy.exc.CompileError: Unconsumed column names

I am getting this error when I try 2 relational_db.Write from the same Pcollection.

Its very strange that sometimes it works, sometimes doesnt.

From persons_raw to persons_stage I added 3 new columns to the dict: tra_personalEmail, tra_companyDomain, tra_linkedInProfile

Does any one can help me?

Code:
with beam.Pipeline(options=pipeline_options) as p:
persons_raw = (
p
|'Create pipeline' >> beam.Create(['data.csv'])
|'Read multiline CSV' >> beam.FlatMap(lambda filename: csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(input_path),encoding='utf-8')))
|'Remove header' >> beam.Filter(lambda x: x[0] != 'FIRST NAME')
|'Raw schema' >> beam.Map(schema_persons_raw)
)

    persons_raw_to_postgres = (
        persons_raw
        |'Persons_raw to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_raw)
    )
    persons_stage = (
        persons_raw
        |'Personal E-mail' >> beam.Map(tra_personal_email)
        |'Company domain' >> beam.Map(tra_company_domain)
        |'Linkedin url' >> beam.Map(tra_linkedin_url)
    )
    persons_stage_to_postgres = (
        persons_stage
        |'Persons_stage to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_stage)
    )

Error:
sqlalchemy.exc.CompileError: Unconsumed column names: tra_personalEmail, tra_companyDomain, tra_linkedInProfile [while running 'Persons_raw to DB/ParDo(_WriteToRelationalDBFn)']

_ConsumeKafkaTopic never terminates.

The _ConsumeKafkaTopic DoFn will read forever (unless the topic is closed), which will not allow the bundle to finish and elements to be checkpointed/processed downstream. This should be re-written as an SFD. (Note also that there's ongoing work to expose Java's KafkaIO in Python as a cross-laungage transform).

for msg in consumer:

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.