Giter Site home page Giter Site logo

Comments (25)

lidavidm avatar lidavidm commented on August 28, 2024

CC @zeroshade

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

@pkit Currently, decimal is not supported for using arrow record batch data to be inserted into a query via bind params such as doing SELECT * FROM table WHERE col = ? and having a decimal replace this.

If you're doing a bulk insert, you should utilize cursor.adbc_ingest which has no issues with decimal data:

>>> tbl
pyarrow.Table
NUMBERTYPE: decimal128(38, 0)
NUMBERFLOAT: decimal128(15, 2)
----
NUMBERTYPE: [[1, 12345678901234567890123456789012345678]]
NUMBERFLOAT: [[1234567.89,9876543210.99]]
>>> conn = adbc_driver_snowflake.dbapi.connect(uri)
>>> cur = conn.cursor()
>>> cur.adbc_ingest('NUMBER_TEST', tbl)
0
>>> cur.execute('SELECT * FROM NUMBER_TEST')
>>> cur.fetch_arrow_table()
pyarrow.Table
NUMBERTYPE: decimal128(38, 0)
NUMBERFLOAT: decimal128(15, 2)
----
NUMBERTYPE: [[1, 12345678901234567890123456789012345678]]
NUMBERFLOAT: [[1234567.89,9876543210.99]]

At the same token, by default I believe we return all NUMBER(38, 0) as decimal128, but there is an option adbc.snowflake.sql.client_option.use_high_precision which can be set to "false" to have fixed-point data with a scale of 0 returned as int64 columns.

Could you share the code you were getting that error from if this doesn't answer your issue sufficiently?

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

cursor.adbc_ingest has additional role requirements: CREATE STAGE
More than that, it can be pretty sub-optimal if I do a lot of asynchronous inserts before commit.
Unless I don't understand how it works underneath.
Obviously I would prefer not to manage my own serialization, or to keep all the RecordBatches in memory.
Essentially my use case is akin to classic ETL: stream in batches of data (SELECT), transform, stream back (INSERT). While keeping it all under strict memory requirements (for example 4x16MB batches max at the same time).

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@zeroshade It would be also nice to know where the limitation comes from? Snowflake ADBC-server implementation?

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

cursor.adbc_ingest is going to use CREATE STAGE and COPY INTO ... FROM @state ... as documented in Snowflake's documentation at https://docs.snowflake.com/en/user-guide/data-load-local-file-system for efficient bulk loading of data. It will be significantly more performant than using INSERT INTO with bind parameters due to the way Snowflake's API works.

Essentially my use case is akin to classic ETL: stream in batches of data (SELECT), transform, stream back (INSERT). While keeping it all under strict memory requirements (for example 4x16MB batches max at the same time).

That's precisely what the adbc_ingest functionality is designed to be optimal for. Essentially the record batches are written to Parquet files in parallel (both the level of concurrency and the size of the parquet files are configurable) which are uploaded to Snowflake directly for staging before then being loaded into the table. (Again, following the steps in the above linked documentation, but with concurrency.

It would be also nice to know where the limitation comes from? Snowflake ADBC-server implementation?

The Snowflake ADBC driver utilizes Snowflake's Go client for communication, which does not support any decimal type as a bind parameter, and ADBC doesn't make any attempt currently to perform a cast for binding (we can optionally perform casting on receiving). Snowflake's Server implementation, currently, does not accept Arrow data directly for bind parameter input. That's part of why adbc_ingest uploads as Parquet files (aside from the fact that also performs compression etc. which is another reason why it's more performant than using INSERT INTO with bind parameters.

If at all possible, my recommendation here would be to either use adbc_ingest for your inserts if possible. Otherwise, you may need to cast your decimal data to int64/float64 in the Arrow record batches before you bind the stream if you can't use adbc_ingest.

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@zeroshade Thanks. It all makes sense.
I've already found out that actual implementation lives in the go/adbc dir. I would prefer to not ser/de just for the sake of decimal support, mostly because I usually passthrough these columns anyway.

Some questions: at which point the data is considered sent to SF when adbc_ingest is used? How the backpressure is handled?
If I call adbc_ingest 1000 times with 4KB batches, is there a way to know how many actual parquets/copy streams were created?

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

Some questions: at which point the data is considered sent to SF when adbc_ingest is used? How the backpressure is handled?

Backpressure and concurrency are handled in two ways:

  1. The RecordBatchReader which is passed to adbc_ingest is read from a single thread that will continuously call next on the reader and then push the record batch onto a channel. The buffer queue size (i.e. the max number of records queued for writing) is determined by the number of writers, controlled by the adbc.snowflake.statement.ingest_writer_concurrency option, which defaults to the number of CPUs.
  2. The number of concurrent file uploads and copy tasks on the Snowflake side, controlled by the adbc.snowflake.statement.ingest_upload_concurrency and adbc.snowflake.statement.ingest_copy_concurrency options.

If I call adbc_ingest 1000 times with 4KB batches, is there a way to know how many actual parquets/copy streams were created?

My personal recommendation would be to consolidate batches into fewer streams and call adbc_ingest with a consolidated streams of those batches rather than calling it 1000 times with 4KB batches which would also enable to you to have fewer batches in memory at a single time, etc. That said, you should be able to see how many actual parquet files / copy streams were created from your Snowflake monitoring which will show you all the copy tasks and files that are uploaded for the stage if you examine the queries.

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

Nice!
Interestingly though, the following simple code fails:

def main():
    conn_read = connect(f"{user}:{password}@{account}/{database}", db_kwargs={
        "adbc.snowflake.sql.schema": "PUBLIC",
    })
    conn_write = connect(f"{user}:{password}@{account}/{database}", db_kwargs={
        "adbc.snowflake.sql.schema": "PUBLIC",
    })
    with conn_read.cursor() as cursor_read:
        with conn_write.cursor() as cursor_write:
            cursor_read.adbc_statement.set_options(**{"adbc.snowflake.rpc.prefetch_concurrency": 2, "adbc.rpc.result_queue_size": 10})
            cursor_read.execute("SELECT * FROM T1")
            for batch in cursor_read.fetch_record_batch():
                print(batch)
                cursor_write.adbc_ingest("T2", batch, mode="append")

The failure is:

ERRO[0005]connection.go:275 gosnowflake.(*snowflakeConn).Close context canceled                              LOG_SESSION_ID=1685662994679674

And that's it.

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

try doing:

cursor_read.execute("SELECT * FROM T1")
cursor_write.adbc_ingest("T2", cursor_read.fetch_record_batch(), mode="append")

Also: We have an upstream PR waiting to be merged to address that specific issue snowflakedb/gosnowflake#1196

from arrow-adbc.

lidavidm avatar lidavidm commented on August 28, 2024

@zeroshade can we get some of these recommendations documented?

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

@lidavidm Does our documentation not already reccomend using adbc_ingest over repeated calls to cursor.execute('INSERT INTO ..... (?, ?, ?)') using bind? If not then yea, we definitely should get that documented

from arrow-adbc.

lidavidm avatar lidavidm commented on August 28, 2024

Also any clarifications about backpressure or data types

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@zeroshade
cursor_write.adbc_ingest("T2", cursor_read.fetch_record_batch(), mode="append") that defeats the purpose.
I do need to preprocess the batch before sending it to adbc_ingest

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

Okay, while the multiple calls to adbc_ingest would work, depending on various factors (how many sources, how large the records are, and so on) my recommendation would be to use a generator/iterator to construct a RecordBatchReader you can pass to adbc_ingest instead.

ie. something like the following:

def process_record_batches(input):
    for batch in input:
        # whatever pre-processing you want to perform on the batch
        print(batch)
        yield batch

with conn_read.cursor() as cursor_read:
        with conn_write.cursor() as cursor_write:
            cursor_read.adbc_statement.set_options(**{"adbc.snowflake.rpc.prefetch_concurrency": 2, "adbc.rpc.result_queue_size": 10})
            cursor_read.execute("SELECT * FROM T1")
            input = cursor_read.fetch_record_batch()
            reader = pyarrow.RecordBatchReader.from_batches(input.schema, process_record_batches(input))
            cursor_write.adbc_ingest("T2", reader, mode="append")

This way you don't have to pay the overhead multiple times, you can only pay the overhead once and it effectively creates a full push/pull pipeline that will handle backpressure as each part will wait for the previous stage (determined by buffer and queue sizes using the options I mentioned earlier).

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@zeroshade Ok, makes sense. Although I'm here at the mercy of adbc_ingest pull for the next generator value.
Although I would prefer to front-load the pre-processing. Because sometimes it can take quite a lot of time. I hope the stage creation or other stuff it does underneath will not time out.
But I suppose it will work, I will check it out.

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@zeroshade Just fyi. Calling adbc_ingest once, like in your example with generator, still produces context error.

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

@joellubi is there a workaround for that context error until your upstream change is merged?

from arrow-adbc.

joellubi avatar joellubi commented on August 28, 2024

@zeroshade The simplest change we could make ourselves would be to set db.SetMaxIdleConns(0) on the database/sql DB instance, but this may increase time opening/closing connections as it effectively disables the connection pool. It does appear that there's some activity though on the upstream PR. It just got a second approval a few hours ago. I'll be keeping a close eye on it, making sure a fix is included in the next release one way or another.

@pkit Is the data still ingested when the context error is produced? In all our reproductions the error comes from a log rather than an exception, and the ingestion itself is still successful.

from arrow-adbc.

joellubi avatar joellubi commented on August 28, 2024

@zeroshade Ok, makes sense. Although I'm here at the mercy of adbc_ingest pull for the next generator value. Although I would prefer to front-load the pre-processing. Because sometimes it can take quite a lot of time. I hope the stage creation or other stuff it does underneath will not time out. But I suppose it will work, I will check it out.

By "front-load the pre-processing" do you mean that you would like to process the next batch while the current batch is being uploaded by adbc_ingest? Regardless of whether adbc_ingest "pulls" the batch or it is "pushed" as in your example, python will inherently only do one thing at a time assuming your pre-processing work is CPU-bound. In either case, you would need to use threading or multiprocessing to have python pre-process the next batch while the current batch is ingesting. One potential way to do this would be to use the python generator to wrap a Queue which can offload the pre-processing to another thread or process.

from arrow-adbc.

joellubi avatar joellubi commented on August 28, 2024

Upstream PR (snowflakedb/gosnowflake#1196) was just merged.

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@joellubi Although I see "Success" for the COPY operation from the stage in SF log. The data is not ingested.

python will inherently only do one thing at a time

Yes. But in this case I just send it for preprocessing somewhere else. So it's purely I/O wait.
But you're right. adbc_ingest is a blocking call, so it's probably better to decouple read and write cursors into multiple processes and do preprocessing while reading.

from arrow-adbc.

zeroshade avatar zeroshade commented on August 28, 2024

Upstream PR (snowflakedb/gosnowflake#1196) was just merged.

@joellubi let's bump our version of gosnowflake to pull in the fix so we can see if this fixes @pkit's issue.

But you're right. adbc_ingest is a blocking call, so it's probably better to decouple read and write cursors into multiple processes and do preprocessing while reading.

We are in the process of working out an async interface that we can implement to allow for a non-blocking adbc_ingest call, but it'll be a while for that.

from arrow-adbc.

joellubi avatar joellubi commented on August 28, 2024

@zeroshade I've opened PR: #2091

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

@zeroshade I can confirm that the fix works for disabling the error (I've built python module and friends from the PR 2901)
Unfortunately nothing gets inserted. Although I see 10 parquet files created, all looks fine in query log.

from arrow-adbc.

pkit avatar pkit commented on August 28, 2024

Ok, it works, I've noticed that COMMIT was not sent. Sending commit explicitly worked.

from arrow-adbc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.