Comments (11)
In this case, the number of shards being computed is an estimate of the number of shards involved in the underlying BigQuery dataset, beyond which BigQuery's sharded export doesn't provide more granular incremental exports. What that means is that if there are 100 shards under the hood, the degenerate case is that we define 100 splits, and then the behavior of the sharded export is more-or-less the same as the UnshardedExportToCloudStorage, and it might be more efficient just to use the Unsharded export since then the getSplits() call is what hangs waiting for exports instead of RecordReaders consuming task slots without making any actual progress.
If we try to define more than 100 splits for 100 shards, then we essentially guarantee that those splits beyond 100 will contain 0 bytes of data, so even if they occupy a task slot they'll just be hanging without doing work (or might quickly complete if BigQuery generates an end-of-stream marker quickly for those empty splits).
Another case to consider is, let's say you have 1000 executors, but the data only has 100 shards, and those 100 shards are all very large. If we use the sharded export, then since we don't know beforehand how the data will get distributed into splits, the processing will be heavily unbalanced, where 100 of those executors have real work to do and the other 900 just wait on empty splits.
However, in that case if we use the Unsharded export instead, we actually can potentially use all 1000 executors; the Unsharded export generates 100 large files, but then the underlying FileInputFormat should then be capable of splitting into sub-shard splits, so that each of the 1000 executors processes just 1/10th of one of the 100 large files.
The key is that if we used Sharded export, we had to define split streams before the export was done, so that we can't redefine split boundaries once we actually know the byte sizes. In contrast, using Unsharded export forces us to wait to know the file sizes before computing splits, so that we can then pack sub-file splits onto workers.
TL;DR: If you have more executors than the computed number of BigQuery shards, and you know the work is heavyweight enough that you really want to do sub-shard splitting, just use Unsharded export and possibly adjust fs.gs.block.size if you need the sub-shard splits to be smaller than the default block size of 64MB.
from hadoop-connectors.
Cool. So I was trying unsharded export by setting the following:
conf.set(BigQueryConfiguration.ENABLE_SHARDED_EXPORT_KEY, "false")
And am getting this exception now:
java.lang.IllegalArgumentException (Split should be instance of UnshardedInputSplit.)
Is there something else I need to set?
from hadoop-connectors.
Hmm, any chance you have the full stack trace from that IllegalArgumentException?
from hadoop-connectors.
Here you go. Thanks for looking!
Caused by: java.lang.IllegalArgumentException: Split should be instance of UnshardedInputSplit.
at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.createRecordReader(AbstractBigQueryInputFormat.java:158)
at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.createRecordReader(AbstractBigQueryInputFormat.java:145)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
... 3 more
from hadoop-connectors.
I tried to export a 700mb bigquery table to gcs in the new google Datalab, it hangs for hours(Never finished. It just died.). I guess it is using same code.
from hadoop-connectors.
+1
from hadoop-connectors.
I'm fairly certain Datalab does not use the bigquery connector to move data from BigQuery to GCS, but instead simply calls the BigQuery Export API (https://github.com/GoogleCloudPlatform/datalab/blob/master/sources/lib/api/gcp/bigquery/_query.py#L135).
from hadoop-connectors.
That is interesting. They behave exactly same way though. Hang forever and not a single byte shows up at the destination.
from hadoop-connectors.
My bad. Just checked again, datalab does export a table successfully. I kicked it off last night. I checked this morning, which was not there. So it took 10 hour+ to export 700mb. Still very problematic...
from hadoop-connectors.
For posterity, the "Caused by: java.lang.IllegalArgumentException: Split should be instance of UnshardedInputSplit." error will be fixed with: #17
from hadoop-connectors.
How long does it take for the PR to be deployed?
from hadoop-connectors.
Related Issues (20)
- BQ storage libray blocked on update to grpc v1.56 HOT 1
- GoogleCloudStorageFileSystem#delete recursive does not page
- Memory issues while running Apache Spark streaming applications on Google Dataproc cluster | OutOfMemoryError Java heap space
- flumk sink hdfs to gcs, all gcs write thread blocked
- how to transfer file from local to gcs bucket using dataproc hadoop in intellij
- GCS Connector fails with StackOverflowError during accessing hadoop credentials
- GhfsStorageStatistics cannot be cast ERROR HOT 9
- Support disabling automatic decompression of gzip files in GCS connector
- gcs-connector 3.0 not working with pyspark HOT 5
- gcs-connector:3.0.0 failing due to certificate when accessing to GCS from Github runner with WIF configuration HOT 7
- Feature request: automatic identity deduction a la google.auth.default()
- gcs-connector-3.0.0-shaded CVEs HOT 1
- How can I sink GCS connector metrics into GCP Cloud Monitor? HOT 2
- globStatus should prioritize server-side filtering over listing all files and performing local matches
- Conversion from InputStream -> ByteBuffer on gRPC writes creates many byte[] allocations. HOT 2
- Bug in `GoogleCloudStorageReadChannel` can cause an infinite loop
- hadoop3-2.2.22 and hadoop3-2.2.23 throws NoSuchMethodError at ServiceOptions.getService
- gcs-connector- CVE
- GCS connector throws rate limit errors
- Could not initialize class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hadoop-connectors.