Giter Site home page Giter Site logo

Comments (11)

goccy avatar goccy commented on May 22, 2024 4

@OfirCohen29 @andreas-dentech @adamszadkowski
Hi, I supported BigQuery Storage API from v0.1.27 . It was a lot of work, and I hope you will take advantage of it. Also, I'd be happy to receive feedback on how it works !

from bigquery-emulator.

goccy avatar goccy commented on May 22, 2024 1

Thank you for presenting the example.
I was thinking about cases where the streaming api is used implicitly, but for your use case you want to use it explicitly.
I understand it.

from bigquery-emulator.

andreas-lindfalk avatar andreas-lindfalk commented on May 22, 2024 1

I'm using the InsertAll API to stream things into BigQuery (making use of https://github.com/OTA-Insight/bqwriter), and it kind of works against the emulator... there are two things at least that does not work super well:

  1. the emulator hangs for around 15 seconds when the batch is flushed to the emulator
  2. if I have a record field with a repeated record field in it, then the data does not make it into the emulator (the operation times out eventually)

Really amazing work with the emulator btw!

from bigquery-emulator.

goccy avatar goccy commented on May 22, 2024 1

@adamszadkowski Thank you for your report. However, since this issue is already closed, please create a new issue as a new topic and paste this problem to it.

from bigquery-emulator.

goccy avatar goccy commented on May 22, 2024

We don't currently support the streaming api, but we have plans to do so.
However, as written in the README, I think there is a way to not use the streaming api explicitly, so I haven't given it a high priority.
If there is any reason why you cannot avoid using the streaming api, please let me know !

from bigquery-emulator.

OfirCohen29 avatar OfirCohen29 commented on May 22, 2024

Thanks for answering

Yes we cannot avoid using the streaming API so it will help a lot if the emulator will support that

Thanks again

from bigquery-emulator.

goccy avatar goccy commented on May 22, 2024

OK, I see. Could you please provide a simple and reproducible example code here ?
I would like to use it as a reference.

from bigquery-emulator.

OfirCohen29 avatar OfirCohen29 commented on May 22, 2024

Yes

Dockerfile (port is set on docker-compose):

FROM ghcr.io/goccy/bigquery-emulator:latest
COPY data/data.yaml ./server/testdata/data.yaml
CMD bigquery-emulator --project=testing --dataset=dataset1 --data-from-yaml=./server/testdata/data.yaml

Python code:


from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud.bigquery_storage import BigQueryWriteClient
from google.cloud.bigquery_storage import WriteStream
from google.cloud.bigquery_storage import CreateWriteStreamRequest
from google.cloud.bigquery_storage_v1.types import TableSchema, TableFieldSchema

# connecting with container
client_options = ClientOptions(api_endpoint="http://localhost:5042")
bigqury_write = BigQueryWriteClient(credentials=AnonymousCredentials(), client_options=client_options)

# Configuring Stream

id_field = TableFieldSchema({
    "name": "id",
    # 2 for INTEGER
    "type_": TableFieldSchema.Type(value=2),
    # 2 for REQUIRED
    "mode": TableFieldSchema.Type(value=2)
})

name_field = TableFieldSchema({
    "name": "myname",
    # 1 for STRING
    "type_": TableFieldSchema.Type(value=1),
    # 2 for REQUIRED
    "mode": TableFieldSchema.Type(value=2)
})

table_schema = TableSchema({
    "fields": [id_field, name_field]
})

write_stream = WriteStream({
        "name": "projects/testing/datasets/dataset1/tables/tests/streams/mystream",
        "type": "COMMITTED",
        "table_schema": table_schema,
        # 1 for INSERT
        "write_mode": WriteStream.WriteMode(value=1)
})

request = CreateWriteStreamRequest({
    "parent": "projects/testing/datasets/dataset1/tables/tests",
    "write_stream": write_stream
})


bigqury_write.create_write_stream(request=request)

data.yaml:

projects:
- id: testing
  datasets:
    - id: dataset1
      tables:
        - id: tests
          columns:
            - name: id
              type: INTEGER
            - name: myname
              type: STRING
          data:
            - id: 1
              myname: alice
            - id: 2
              myname: bob

Please write here if something else is needed.

Thanks

from bigquery-emulator.

goccy avatar goccy commented on May 22, 2024

I've supported for the Read API for the time being. Please wait a little longer for the Write API. I'm also looking for sponsors. Please consider sponsoring me :)

from bigquery-emulator.

OfirCohen29 avatar OfirCohen29 commented on May 22, 2024

Thanks a lot

from bigquery-emulator.

adamszadkowski avatar adamszadkowski commented on May 22, 2024

@goccy thank you very much for making this emulator and providing us with such good support :) I have been able to test streaming API with Spark integration using Java BigQuery libraries. Unfortunately I have found some other issues which I would like to share with you. I will note them here, but let me know if you would rather like them in separate issues.

This time also everything which is not working is "documented" in the repository https://github.com/adamszadkowski/bigquery-emulator-issue

Handling of multiple read streams

Unfortunately spark integration with BigQuery requires reading multiple streams.

val rows = sparkSession.read
  .format("bigquery")
  .load(s"$projectId.$datasetId.$tableId")
  .collectAsList()

Code above doesn't work. There is a workaround, when parallelism can be set to 1, but it would require change in
production code.

val rows = sparkSession.read
  .format("bigquery")
  .option("parallelism", 1) // required by bigquery-emulator
  .load(s"$projectId.$datasetId.$tableId")
  .collectAsList()

Even if technically this is possible to change this value - in practice it is very hard, to make that change in every
possible place. Additionally, it might cause some other issues.

Support for partitioned tables

It looks like bigquery-emulator is not adding _PARTITIONDATE and _PARTITIONTIME columns to partitioned tables.
When table is created like this:

service.create(TableInfo.of(
  TableId.of(projectId, datasetId, tableId),
  StandardTableDefinition.newBuilder()
    .setSchema(schema)
    .setTimePartitioning(TimePartitioning.of(DAY))
    .build()))

Spark tries to read additional columns. It can be spotted in bigquery-emulator logs:

2022-12-14T11:08:47.941+0100	INFO	contentdata/repository.go:135		{"query": "SELECT `id`,`otherProp`,`_PARTITIONTIME`,`_PARTITIONDATE` FROM `mytablename` ", "values": []}

In spark on the other hand there is an error passed from bigquery-emulator:

Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: UNKNOWN: failed to analyze: INVALID_ARGUMENT: Unrecognized name: _PARTITIONTIME [at 1:25]
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.Status.asRuntimeException(Status.java:535)
	... 14 more

Problems with streaming write

It looks like there should be default stream for writing. Currently, error is returned:

com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: failed to append rows: failed to get stream from projects/test/datasets/testingbq/tables/mytablename/_default
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:119)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)

Creating stream before write with below code doesn't work either:

val createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
	.setParent(TableName.of(projectId, datasetId, tableId).toString())
	.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
	.build()

val writeStream = client.createWriteStream(createWriteStreamRequest)

Execution of create stream request for the first time when bigquery-emulator has been started causes error:

com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Project test is not found. Make sure it references valid GCP project that hasn't been deleted.; Project id: test
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)

Consecutive executions causes test code to hang on timeout after which there is another error:

com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 1199.955284179s. [closed=[], open=[[buffered_nanos=233286500, buffered_nanos=7866891, remote_addr=localhost/127.0.0.1:9060]]]

	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:94)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	a

After that it is impossible to close gracefully bigquery-emulator. Ctrl + C in console causes hang:

^C[bigquery-emulator] receive interrupt. shutdown gracefully

Only kill -9 pid helps.

from bigquery-emulator.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.