Giter Site home page Giter Site logo

Comments (11)

dennishuo avatar dennishuo commented on August 27, 2024

In this case, the number of shards being computed is an estimate of the number of shards involved in the underlying BigQuery dataset, beyond which BigQuery's sharded export doesn't provide more granular incremental exports. What that means is that if there are 100 shards under the hood, the degenerate case is that we define 100 splits, and then the behavior of the sharded export is more-or-less the same as the UnshardedExportToCloudStorage, and it might be more efficient just to use the Unsharded export since then the getSplits() call is what hangs waiting for exports instead of RecordReaders consuming task slots without making any actual progress.

If we try to define more than 100 splits for 100 shards, then we essentially guarantee that those splits beyond 100 will contain 0 bytes of data, so even if they occupy a task slot they'll just be hanging without doing work (or might quickly complete if BigQuery generates an end-of-stream marker quickly for those empty splits).

Another case to consider is, let's say you have 1000 executors, but the data only has 100 shards, and those 100 shards are all very large. If we use the sharded export, then since we don't know beforehand how the data will get distributed into splits, the processing will be heavily unbalanced, where 100 of those executors have real work to do and the other 900 just wait on empty splits.

However, in that case if we use the Unsharded export instead, we actually can potentially use all 1000 executors; the Unsharded export generates 100 large files, but then the underlying FileInputFormat should then be capable of splitting into sub-shard splits, so that each of the 1000 executors processes just 1/10th of one of the 100 large files.

The key is that if we used Sharded export, we had to define split streams before the export was done, so that we can't redefine split boundaries once we actually know the byte sizes. In contrast, using Unsharded export forces us to wait to know the file sizes before computing splits, so that we can then pack sub-file splits onto workers.

TL;DR: If you have more executors than the computed number of BigQuery shards, and you know the work is heavyweight enough that you really want to do sub-shard splitting, just use Unsharded export and possibly adjust fs.gs.block.size if you need the sub-shard splits to be smaller than the default block size of 64MB.

from hadoop-connectors.

nevillelyh avatar nevillelyh commented on August 27, 2024

Cool. So I was trying unsharded export by setting the following:
conf.set(BigQueryConfiguration.ENABLE_SHARDED_EXPORT_KEY, "false")

And am getting this exception now:
java.lang.IllegalArgumentException (Split should be instance of UnshardedInputSplit.)

Is there something else I need to set?

from hadoop-connectors.

dennishuo avatar dennishuo commented on August 27, 2024

Hmm, any chance you have the full stack trace from that IllegalArgumentException?

from hadoop-connectors.

nevillelyh avatar nevillelyh commented on August 27, 2024

Here you go. Thanks for looking!

Caused by: java.lang.IllegalArgumentException: Split should be instance of UnshardedInputSplit.
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.createRecordReader(AbstractBigQueryInputFormat.java:158)
    at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.createRecordReader(AbstractBigQueryInputFormat.java:145)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    ... 3 more

from hadoop-connectors.

ranqizhu avatar ranqizhu commented on August 27, 2024

I tried to export a 700mb bigquery table to gcs in the new google Datalab, it hangs for hours(Never finished. It just died.). I guess it is using same code.

from hadoop-connectors.

ravwojdyla avatar ravwojdyla commented on August 27, 2024

+1

from hadoop-connectors.

AngusDavis avatar AngusDavis commented on August 27, 2024

I'm fairly certain Datalab does not use the bigquery connector to move data from BigQuery to GCS, but instead simply calls the BigQuery Export API (https://github.com/GoogleCloudPlatform/datalab/blob/master/sources/lib/api/gcp/bigquery/_query.py#L135).

from hadoop-connectors.

ranqizhu avatar ranqizhu commented on August 27, 2024

That is interesting. They behave exactly same way though. Hang forever and not a single byte shows up at the destination.

from hadoop-connectors.

ranqizhu avatar ranqizhu commented on August 27, 2024

My bad. Just checked again, datalab does export a table successfully. I kicked it off last night. I checked this morning, which was not there. So it took 10 hour+ to export 700mb. Still very problematic...

from hadoop-connectors.

dennishuo avatar dennishuo commented on August 27, 2024

For posterity, the "Caused by: java.lang.IllegalArgumentException: Split should be instance of UnshardedInputSplit." error will be fixed with: #17

from hadoop-connectors.

ranqizhu avatar ranqizhu commented on August 27, 2024

How long does it take for the PR to be deployed?

from hadoop-connectors.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.