Giter Site home page Giter Site logo

gcp-variant-transforms's People

Contributors

allieychen avatar arostamianfar avatar bashir2 avatar bgood avatar hashkanna avatar jessime avatar kbtian avatar lawrenae avatar mbookman avatar mhsaul avatar moschetti avatar nmousavi avatar rcowin-gcp avatar samanvp avatar sbhave77 avatar slagelwa avatar snarfed avatar tneymanov avatar tsa87 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gcp-variant-transforms's Issues

Implement a resource estimator tool

It is currently not trivial to estimate the resources needed to run the pipeline (particularly amount of disk space). This is especially important when processing large (10TB+) datasets as cost becomes an important factor.

The idea is to write a tool that preprocesses the data (similar to the header merger pipeline) and outputs the minimum resource requirements for running the pipeline.

Note that estimating the disk size is not as trivial as summing up the size of all files being processed as there is an overhead for each record depending on the number of samples, info, and filter fields.

Add a flag for setting null_numeric_value_replacement

BigQuery does not allow null values in repeated fields (the entire record can be null, but values within the record must each have a value). For instance, if a VCF INFO field is 1,.,2, we cannot load 1,null,2 to BigQuery and need to use a numeric replacement for the null value. This is currently hardcoded to -sys.maxint in bigquery_vcf_schema.py. We should to make this a flag so that users can change it depending on the context.

Note: this should be done after the initial refactoring is done (Issue #59)

Refactor bigquery_vcf_schema.py to use a class

We currently use basic functions for getting the schema and converting variants to BQ rows. This has made the code less readable especially when making subtle changes (e.g. Issue #48 ). We should first refactor the file to use a class. This also makes checking the schema while the rows are being generated much easier as the schema would be a member variable of that class.

Add an option to "flatten" the BigQuery table

There are some use cases where a "flattened" BigQuery table is easier to process (i.e. does not contain repeated records). We can provide such an option as part of the pipeline and output a separate row for each record.

Traceback error in output of ./deploy_and_run_tests.sh

last few line of output:

...
valid-4-2 ... ok
valid-4-0-gz ... ok
valid-4-1-gz ... ok
valid-4-2-gz ... ok
valid-4-1 ... ok
valid-4-0 ... ok
valid-4-0-bz2 ... ok
./deploy_and_run_tests.sh succeeded!
Removing integration test environment /tmp/tmp.qVgcbOTlZZ
Deleting the test image from the registry
Traceback (most recent call last):
File "/usr/bin/../lib/google-cloud-sdk/lib/gcloud.py", line 88, in
main()
File "/usr/bin/../lib/google-cloud-sdk/lib/gcloud.py", line 64, in main
import traceback
ImportError: No module named traceback

Add native SV (structural variant) support when loading variants to BigQuery

Even though the VCF spec natively supports SVs, it's common to have them in other TSV/CSV formats. We should add native support for loading such data to BigQuery through Variant Transforms.

Proposal: add an option to map column names to a Variant object (e.g. "CopyNumber (int) -> Variant.info.CN", "start (int) -> Variant.start_position", etc).

Related:
https://groups.google.com/d/msg/google-genomics-discuss/AeR34tJpMds/PIpvE3roBgAJ

Add a "decompress and load" option

Even though the pipeline supports files in gzip and bzip2 formats, such files cannot be sharded. There are also other compression formats such as bgzip (http://www.htslib.org/doc/tabix.html) that are not supported.

It would be ideal to provide a "decompress and load" option as part of the pipeline. We can use the Pipelines API to do this in a scalable manner.

Desirable enhancements of the continuous integration.

Background: Issue #62.

  • It would be nice to force an integration test run by Travis, when a pull request is sent. One issue is credentials to access Google Cloud projects by Travis. This tutorial might help.

  • Upon a commit, if the continuous integration build/test fails, no notification is currently sent. One should look at the "Build history" portion of the Container Registry dashboard for project gcp-variant-transforms-test to check if CI for that commit succeeded or not. There is a Pub/Sub channel created for the CI and it can be used to set up such notifications (details here).

Native support for loading multiple datasets with different persmissions

Currently, we require a BigQuery dataset to be present prior to loading the VCF files. Therefore, we do not control the permissions on the BigQuery dataset as they are set by the user outside of the pipeline (note: BigQuery permissions are set on the dataset level and not on the table level).

If the VCF files being imported have different read permissions, then there is a risk of exposing data to unauthorized users if they are merged together. Thus, the user needs to ensure that both the BigQuery dataset and Cloud Storage files have the same permissions.

It would be easier if we create BigQuery dataset(s) as part of the pipeline and dynamically set the same permission as the Cloud Storage files being imported. Of course, an "admin" user needs to run the pipeline, but this ensures that VCF files with different read permissions do not end up together in one BigQuery dataset.

This is a more involved task and requires more thought on what the user experience would look like. It's also not trivial to determine how to split up the files to ensure those with the same permissions end up in the same BigQuery dataset.

Add better argument validation

We currently some argument validation (e.g. valid BigQuery dataset), but we should add more. Particularly, we should catch any exceptions when matching the input_pattern and provide a user-friendly error (e.g. invalid or inaccessible input_pattern currently displays a cryptic HttpError). We can do the same for temp_location and staging_location.

JSON ValueError when importing gnomAD variants

I am trying to import the gnomAD variants into BigQuery, but I consistently get an error that makes the pipeline fail on chr1. I was able to successfully import some other chromosomes (chr21 and chr10).

The error I'm getting is ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow'].

I tried the --allow_malformed_records True flag, but that made no difference.

The input file is from https://storage.googleapis.com/gnomad-public/release/2.0.2/vcf/genomes/gnomad.genomes.r2.0.2.sites.chr1.vcf.bgz and I decompressed it before running the pipeline.

My pipeline configuration is

name: gnomad-genomes-to-bigquery-pipeline
docker:
  imageName: gcr.io/gcp-variant-transforms/gcp-variant-transforms
  cmd: |
    ./opt/gcp_variant_transforms/bin/vcf_to_bq \
      --project XXX \
      --input_pattern gs://XXX/gnomad/vcf_decompressed/genomes/gnomad.genomes.r2.0.2.sites.chr1.vcf \
      --allow_malformed_records True \
      --output_table dg-platform:GnomAD.gnomad_genomes_chr1 \
      --staging_location gs://XXX/staging \
      --temp_location gs://XXX/temp \
      --job_name gnomad-genomes-to-bigquery-pipeline-chr1 \
      --runner DataflowRunner

The full log is

2018/01/10 04:34:27 I: Switching to status: pulling-image
2018/01/10 04:34:27 I: Calling SetOperationStatus(pulling-image)
2018/01/10 04:34:27 I: SetOperationStatus(pulling-image) succeeded
2018/01/10 04:34:27 I: Pulling image "gcr.io/gcp-variant-transforms/gcp-variant-transforms"
2018/01/10 04:35:20 I: Pulled image "gcr.io/gcp-variant-transforms/gcp-variant-transforms" successfully.
2018/01/10 04:35:20 I: Done copying files.
2018/01/10 04:35:20 I: Switching to status: running-docker
2018/01/10 04:35:20 I: Calling SetOperationStatus(running-docker)
2018/01/10 04:35:20 I: SetOperationStatus(running-docker) succeeded
2018/01/10 04:35:20 I: Setting these data volumes on the docker container: [-v /tmp/ggp-305484772:/tmp/ggp-305484772]
2018/01/10 04:35:20 I: Running command: docker run -v /tmp/ggp-305484772:/tmp/ggp-305484772 gcr.io/gcp-variant-transforms/gcp-variant-transforms /tmp/ggp-305484772
2018/01/10 05:11:04 E: command failed: No handlers could be found for logger "oauth2client.contrib.multistore_file"
/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:122: DeprecationWarning: object() takes no parameters
  super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0635919570923 seconds
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0466470718384 seconds
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0660800933838 seconds
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0607531070709 seconds
/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:134: UserWarning: Using fallback coder for typehint: Any.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)
INFO:root:Executing command: ['/opt/gcp_variant_transforms/venv/bin/python', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpFiWctv']
warning: check: missing required meta-data: url

warning: check: missing meta-data: if 'author' supplied, 'author_email' must be supplied too

INFO:root:Starting GCS upload to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/workflow.tar.gz...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Completed GCS upload to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/workflow.tar.gz
INFO:root:Staging the SDK tarball from PyPI to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/dataflow_python_sdk.tar
INFO:root:Executing command: ['/opt/gcp_variant_transforms/venv/bin/python', '-m', 'pip', 'install', '--download', '/tmp/tmpFiWctv', 'apache-beam==2.2.0', '--no-binary', ':all:', '--no-deps']
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
INFO:root:file copy from /tmp/tmpFiWctv/apache-beam-2.2.0.zip to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/dataflow_python_sdk.tar.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Create job: <Job
 createTime: u'2018-01-10T04:35:37.539665Z'
 currentStateTime: u'1970-01-01T00:00:00Z'
 id: u'2018-01-09_20_35_36-6246558984112825034'
 location: u'us-central1'
 name: u'gnomad-genomes-to-bigquery-pipeline-chr1'
 projectId: u'XXX'
 stageStates: []
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:root:Created job with id: [2018-01-09_20_35_36-6246558984112825034]
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-01-09_20_35_36-6246558984112825034?project=XXX
INFO:root:Job 2018-01-09_20_35_36-6246558984112825034 is in state JOB_STATE_PENDING
INFO:root:2018-01-10T04:35:36.979Z: JOB_MESSAGE_DETAILED: (56b03c4ce491e46b): Autoscaling is enabled for job 2018-01-09_20_35_36-6246558984112825034. The number of workers will be between 1 and 15.
INFO:root:2018-01-10T04:35:37.009Z: JOB_MESSAGE_DETAILED: (56b03c4ce491ee58): Autoscaling was automatically enabled for job 2018-01-09_20_35_36-6246558984112825034.
INFO:root:2018-01-10T04:35:39.256Z: JOB_MESSAGE_DETAILED: (52771809aae76d0e): Checking required Cloud APIs are enabled.
INFO:root:2018-01-10T04:35:40.098Z: JOB_MESSAGE_DETAILED: (52771809aae766b0): Expanding CoGroupByKey operations into optimizable parts.
INFO:root:2018-01-10T04:35:40.122Z: JOB_MESSAGE_DETAILED: (52771809aae769e5): Expanding GroupByKey operations into optimizable parts.
INFO:root:2018-01-10T04:35:40.148Z: JOB_MESSAGE_DETAILED: (52771809aae766b3): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:2018-01-10T04:35:40.171Z: JOB_MESSAGE_DEBUG: (52771809aae76381): Annotating graph with Autotuner information.
INFO:root:2018-01-10T04:35:40.199Z: JOB_MESSAGE_DETAILED: (52771809aae76d1d): Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:root:2018-01-10T04:35:40.226Z: JOB_MESSAGE_DETAILED: (52771809aae769eb): Fusing consumer FilterVariants/ApplyFilters into ReadFromVcf/Read
INFO:root:2018-01-10T04:35:40.248Z: JOB_MESSAGE_DETAILED: (52771809aae766b9): Fusing consumer VariantToBigQuery/ConvertToBigQueryTableRow into FilterVariants/ApplyFilters
INFO:root:2018-01-10T04:35:40.281Z: JOB_MESSAGE_DETAILED: (52771809aae76387): Fusing consumer VariantToBigQuery/WriteToBigQuery/NativeWrite into VariantToBigQuery/ConvertToBigQueryTableRow
INFO:root:2018-01-10T04:35:40.304Z: JOB_MESSAGE_DEBUG: (52771809aae76055): Workflow config is missing a default resource spec.
INFO:root:2018-01-10T04:35:40.331Z: JOB_MESSAGE_DEBUG: (52771809aae76d23): Adding StepResource setup and teardown to workflow graph.
INFO:root:2018-01-10T04:35:40.363Z: JOB_MESSAGE_DEBUG: (52771809aae769f1): Adding workflow start and stop steps.
INFO:root:2018-01-10T04:35:40.390Z: JOB_MESSAGE_DEBUG: (52771809aae766bf): Assigning stage ids.
INFO:root:2018-01-10T04:35:40.511Z: JOB_MESSAGE_DEBUG: (280200dbfaf6cac0): Executing wait step start3
INFO:root:2018-01-10T04:35:40.565Z: JOB_MESSAGE_BASIC: (12e360a044595f8): Executing operation ReadFromVcf/Read+FilterVariants/ApplyFilters+VariantToBigQuery/ConvertToBigQueryTableRow+VariantToBigQuery/WriteToBigQuery/NativeWrite
INFO:root:2018-01-10T04:35:40.599Z: JOB_MESSAGE_DEBUG: (35a4b300ff329644): Starting worker pool setup.
INFO:root:2018-01-10T04:35:40.628Z: JOB_MESSAGE_BASIC: (35a4b300ff3297da): Starting 10 workers in us-central1-f...
INFO:root:Job 2018-01-09_20_35_36-6246558984112825034 is in state JOB_STATE_RUNNING
INFO:root:2018-01-10T04:35:47.357Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fdd0): Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:35:57.734Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54ffab): Autoscaling: Raised the number of workers to 3 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:35:57.764Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fe01): Resized worker pool to 3, though goal was 10.  This could be a quota issue.
INFO:root:2018-01-10T04:36:02.987Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f759): Autoscaling: Raised the number of workers to 9 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:36:03.018Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f5af): Resized worker pool to 9, though goal was 10.  This could be a quota issue.
INFO:root:2018-01-10T04:36:14.570Z: JOB_MESSAGE_DETAILED: (2c7b5f62246857c2): Workers have started successfully.
INFO:root:2018-01-10T04:36:18.590Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f436): Autoscaling: Raised the number of workers to 10 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:39:14.929Z: JOB_MESSAGE_BASIC: (280200dbfaf6cb22): Autoscaling: Resizing worker pool from 10 to 15.
INFO:root:2018-01-10T04:39:20.293Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fed4): Autoscaling: Raised the number of workers to 14 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:39:20.320Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fd2a): Resized worker pool to 14, though goal was 15.  This could be a quota issue.
INFO:root:2018-01-10T04:39:25.575Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f5ad): Autoscaling: Raised the number of workers to 15 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:57:04.250Z: JOB_MESSAGE_ERROR: (8f917a73bb67a9d9): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
    self.writer.Write(o.value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
    super(TextFileWriter, self).Write(value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
    self.file.write(self.sink.coder.encode(value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
    raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']

INFO:root:2018-01-10T04:59:20.080Z: JOB_MESSAGE_ERROR: (b0f417b262714c09): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
    self.writer.Write(o.value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
    super(TextFileWriter, self).Write(value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
    self.file.write(self.sink.coder.encode(value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
    raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']

INFO:root:2018-01-10T05:01:35.890Z: JOB_MESSAGE_ERROR: (a4b435c70fc68065): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
    self.writer.Write(o.value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
    super(TextFileWriter, self).Write(value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
    self.file.write(self.sink.coder.encode(value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
    raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']

INFO:root:2018-01-10T05:08:18.598Z: JOB_MESSAGE_ERROR: (a4b435c70fc689e8): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
    self.writer.Write(o.value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
    super(TextFileWriter, self).Write(value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
    self.file.write(self.sink.coder.encode(value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
    raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']

INFO:root:2018-01-10T05:09:41.397Z: JOB_MESSAGE_ERROR: (b0f417b262714adf): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
    self.writer.Write(o.value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
    super(TextFileWriter, self).Write(value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
    self.file.write(self.sink.coder.encode(value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
    raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']

INFO:root:2018-01-10T05:09:41.410Z: JOB_MESSAGE_BASIC: (42fdb1ab4fd8cff2): Executing BigQuery import job "dataflow_job_8333640118018302870". You can check its status with the bq tool: "bq show -j --project_id=XXX dataflow_job_8333640118018302870".
INFO:root:2018-01-10T05:09:41.435Z: JOB_MESSAGE_WARNING: (42fdb1ab4fd8cb68): Unable to delete temp files: "gs://XXX/temp/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/8333640118018303863/dax-tmp-2018-01-09_20_35_36-6246558984112825034-S01-0-207f78e951a6f2a/@DAX.json."
INFO:root:2018-01-10T05:09:41.509Z: JOB_MESSAGE_DEBUG: (12e360a04459cb7): Executing failure step failure2
INFO:root:2018-01-10T05:09:41.533Z: JOB_MESSAGE_ERROR: (12e360a04459df1): Workflow failed. Causes: (12e360a04459a43): S01:ReadFromVcf/Read+FilterVariants/ApplyFilters+VariantToBigQuery/ConvertToBigQueryTableRow+VariantToBigQuery/WriteToBigQuery/NativeWrite failed., (1f26fcd604aa07c5): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
  gnomad-genomes-to-bigquer-01092035-ea7e-harness-dw84,
  gnomad-genomes-to-bigquer-01092035-ea7e-harness-dw84,
  gnomad-genomes-to-bigquer-01092035-ea7e-harness-dw84,
  gnomad-genomes-to-bigquer-01092035-ea7e-harness-q4nv
INFO:root:2018-01-10T05:09:41.670Z: JOB_MESSAGE_DETAILED: (52771809aae7605e): Cleaning up.
INFO:root:2018-01-10T05:09:41.774Z: JOB_MESSAGE_DEBUG: (52771809aae769fa): Starting worker pool teardown.
INFO:root:2018-01-10T05:09:41.797Z: JOB_MESSAGE_BASIC: (52771809aae766c8): Stopping worker pool...
INFO:root:2018-01-10T05:10:55.180Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f475): Autoscaling: Resized worker pool from 15 to 0.
INFO:root:2018-01-10T05:10:55.208Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f2cb): Autoscaling: Would further reduce the number of workers but reached the minimum number allowed for the job.
INFO:root:2018-01-10T05:10:55.263Z: JOB_MESSAGE_DEBUG: (52771809aae766ce): Tearing down pending resources...
INFO:root:Job 2018-01-09_20_35_36-6246558984112825034 is in state JOB_STATE_FAILED
Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 162, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/opt/gcp_variant_transforms/src/gcp_variant_transforms/vcf_to_bq.py", line 223, in <module>
    run()
  File "/opt/gcp_variant_transforms/src/gcp_variant_transforms/vcf_to_bq.py", line 218, in run
    append=known_args.append))
  File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 346, in __exit__
    self.run().wait_until_finish()
  File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 966, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
(b0f417b262714adf): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
    self.writer.Write(o.value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
    super(TextFileWriter, self).Write(value)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
    self.file.write(self.sink.coder.encode(value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
    raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']

 (exit status 1)

Implement gVCF merging logic (SNPs only)

Currently, the only supported merge strategy is MOVE_TO_CALLS, which only merges samples that have the exact reference_name:start_position:end_position:reference_bases:alternate_bases.

For gVCF files, we like to add 0/0 to samples that don't have a variant. For instance, "sample1" has a non-variant from position 1-100, "sample2" has a variant 1/0 in position 55. We should output a single row at position 55 for both "sample1" and "sample2" that has 0/0 and 1/0 for "sample1" and "sample2", respectively.

We currently have a merging logic for gVCF files (merge_with_non_variants_strategy.py). It can handle non-variants, but needs to be cleaned up and tested further. It also depends on a fix in the intervaltree library (see chaimleib/intervaltree#63).

This task is only for supporting merging SNPs with non-variants.

Optimize merging logic to handle massive number of records (300B+)

The pipeline is currently very slow when merging massive number of records (300B+).
Optimization strategies:

  • Experiment with Cloud Dataflow Shuffle: it allegedly gives 5x performance improvement over the standard method.
  • Experiment with sharding records by chromosome: We currently throw everything in one giant PCollection and let Dataflow sort them out. We may be able optimize the pipeline by first sharding by chromosome.

Optimizing this step will be more important especially when we add more advanced merging strategies (e.g. merge with non-variants).

Catch header schema mismatch errors earlier

Currently, if there is a header schema mismatch (e.g. a FORMAT field is missing from the header), the pipeline will proceed all the way until the very end and only fails when it tries to write to BigQuery. It also does not give a representative error message (complains that a field is missing in BQ JSON schema, see Issue #61). We should catch such errors sooner and give proper error. Depends on Issue #59 as we need to first refactor that part of the code.

Ideally we may be able to 'detect and correct' such errors, but this is more difficult currently as the BQ schema cannot be dynamically created/changed.

Refactor PipelineOptions

Create new class(es) descending from PipelineOptions that handle arguments parsing and validation for all vcf_to_bq arguments.

Add native support for table partitions by chromosome

For large (1TB+) datasets, we should provide an option to output multiple BigQuery tables for each chromosome. This makes querying the data cheaper for certain types of queries (e.g. querying variants in a particular gene). Users can use BigQuery wildcards to query over all of the data.

Things to consider:

  • We currently cannot dynamically change the number of outputs in Beam, so we need to ask the user about the nature and number of partitions prior to running the pipeline. We can provide common ones (e.g. homo sapiens).
  • Need to analyze the performance impact (if any) of having 24+ tables vs. 1 table. This should be documented if there is a significant performance hit (e.g. users may want to create a unified table for queries that span all of the genome).
  • Doing a simple partition by "reference name" won't work as some datasets may have a large number (1000s) of reference names. We need a "catch all" partition.

Related:

  • BigQuery pricing model (https://cloud.google.com/bigquery/pricing): Pricing is based on the columns accessed and not the number of records from each column. In other words, adding "where reference_name = chr1" does not affect pricing.

Fix and enable continuous run of the integration tests.

There is currently a version of integration tests implemented (see run_tests.py) and at some point in the past it has worked. But currently it does not run. We need to fix it, improve the tests and add some sort of validation (currently it does not validate the generated table). Finally we should implement some sort of continuous run for this integration test (it is not a small test).

Create large integration tests.

The current integration tests we have are small and do not cover enough variations in VCF inputs. To automate our release process, we need to add larger cases and do both deeper validation and performance analysis to make sure we are not introducing any regression. One potential dataset to use is 1000 Genomes (see Issue #61 too).

Enhance filtering logic

We currently have basic filtering options.

More filtering options are useful in some scenarios especially when the data size is large. For instance, only loading variants that have filter=PASS may considerably reduce the data size and cost (especially if there is no use case for analyzing non-passing variants).

Automatically modify the BigQuery schema when appending data to an existing table

We currently provide a "--append" option to append data to an existing table. However, it requires the schema to be exactly the same. We can use the BigQuery API to update the schema prior to import (in cases where the schema is compatible) or throw an error when the schema is incompatible (e.g. the same field is defined with a different type).

"variant merging" documentation

It would be great to have some more information about variant merging -- I have sometimes as many as 12 VCF files for a single tumor/normal pair and I'd like to understand how variants are merged. I also noted that the main Google documentation page instructions suggests using --variant_merge_strategy MOVE_TO_CALLS while the documentation here on github doesn't mention that. When I tried to import a set of 23 VCF files representing two tumor/normal pairs, I got the following error:

    ERROR:root:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f64a98d5450>, due to an exception.
     Traceback (most recent call last):
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 312, in call
        side_input_values)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 347, in attempt_call
        evaluator.process_element(value)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 551, in process_element
        self.runner.process(element)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 390, in process
        self._reraise_augmented(exn)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 388, in process
        self.do_fn_invoker.invoke_process(windowed_value)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 281, in invoke_process
        self._invoke_per_window(windowed_value)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 307, in _invoke_per_window
        windowed_value, self.process_method(*args_for_process))
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 63, in process
        return self.wrapper(self.dofn.process, args, kwargs)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 81, in wrapper
        result = method(*args, **kwargs)
      File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/helper_transforms.py", line 60, in process
        vi)
      File "gcp_variant_transforms/transforms/merge_headers.py", line 175, in add_input
        return self.merge_accumulators([source, to_merge])
      File "gcp_variant_transforms/transforms/merge_headers.py", line 180, in merge_accumulators
        self._header_merger.merge(merged_headers, to_merge)
      File "gcp_variant_transforms/transforms/merge_headers.py", line 127, in merge
        self._merge_header_fields(first.formats, second.formats)
      File "gcp_variant_transforms/transforms/merge_headers.py", line 160, in _merge_header_fields
        first_value, second_value, str(e)))
    ValueError: Incompatible number or types in header fields:OrderedDict([('id', 'GQ'), ('num', 1), ('type', 'Integer'), ('desc', 'Genotype Quality')]), OrderedDict([('id', 'GQ'), ('num', None), ('type', 'Integer'), ('desc', 'Conditional Phred-scaled genotype quality')])
    . Error: Incompatible numbers cannot be resolved: 1, None [while running 'MergeHeaders/MergeHeaders/CombinePerKey/LiftedCombinePerKey/ParDo(PartialGroupByKeyCombiningValues)']

I tried removing the --variant_merge_strategy MOVE_TO_CALLS option but I got the same error.

Resolve conflicts in VCF header files

Example of resolvable conflicts:

Type:

  • Float vs Integer

Number:

  • dot vs 2, 3, ...
  • R, G vs 2, 3, ...
  • A vs dot (only if flag split_alternate_allele_info_fields is off)

....

Resolve dependency issues in setup.py.

Currently we get some dependency conflicts after installing setup.py and that is because pip cannot resolve version dependencies properly. IOW, instead of picking the intersection of version specifiers between different dependencies, it seems pip installs packages one by one independently (see PR #63 for an example). After installation, pipdeptree shows these issues, for example:

$ pipdeptree -p apache-beam

We need to eventually fix this problem either by adding other non-direct dependencies explicitly (which PR #63 does but is ugly) or finding a tool that can resolve all version restrictions together.

Support multi-reference data analysis

The current merging logic assumes that all VCF files have the same reference. This task is to convert all files to a common reference before merging. This feature can actually be used independent of the merging logic as well (i.e. a fast way of changing all reference bases in a large number of VCF files).

Thins to consider:

  • Reference header is not always present (or accessible) in the VCF file.
  • May need to only support common references initially (hg18 and hg19).
  • The GA4GH is also working on providing a reference retrieval API, which may be relevant in our design.

More details need to be outlined in a design doc :)

Protobuf incompatibility issue in Beam 2.2.0

There's a protobuf incompatibility issue in the Beam 2.2.0 pip package. See https://issues.apache.org/jira/browse/BEAM-3357. This will be fixed in Beam 2.3.0, but until then we have introduced some workarounds in our code. Particularly, we've added explicit dependency of 'grpcio>=1.0,<=1.7.3' in setup.py. While this fixes the setup/test issues, it will actually complain again when running the Dataflow pipeline. The workaround for that is to run 'pip install --upgrade apache_beam[gcp]' again.

This issue is to just keep track of these workarounds and remove them once Beam 2.3.0 is released.

Support merging from different variant callers

While debugging a user issue (see thread), we came across non-standard fields from VarScan2 (e.g. the "AD" field is split into "AD" and "RD" fields, each having Number=1 instead of a single field with Number=R as specified in the VCF spec).

While the ideal solution is to change VarScan2 to output the AD field according to the VCF spec, this approach may not be ideal and would not work for existing VCF files.

Idea: provide a "transformation plugin" for converting output from non-standard variant callers (e.g. in the VarScan2 case, it would merge AD and RD fields into a single field with Number=R). We can write a few common plugins ourselves, but it should be easy enough for users to write their own plugins as well (either in code or through a config file), since we may not be able to cover all corner cases.

Use a hash for reference/alternate_bases when making the merge key

We currently just concatenate reference/alternate_bases together as the merge key (see here).

This may break Dataflow if the length of reference/alternate_bases are large (larger than 1MB limit for the key). We should create a hash of these fields and use that instead. This ensures that our keys do not get arbitrarily large.

As a result, the key would be reference_name:start_position:end_position:<hash of reference_name:alternate_bases>. Given that we include reference/start/end as part of the key (excluding the hash) then the risk of collision is extremely low. However, we can still add an assert here just to be sure.

Add an option to 'detect and correct' VCF header mismatch errors

It is surprisingly common for VCF files to have missing header fields (see Issue #61 for a 1000 genomes file). The pipeline currently fails as it is unable to map the missing field(s) to the BigQuery column(s). Note that we need to generate the schema prior to running the pipeline, which is why we need to rely on complete and valid headers.

The current workaround is to manually specify the missing header fields through --representative_header_file, but this is painful and not scalable.

An alternative approach is to do two passes on the data: the first one parses all of the fields in all records just to find the missing headers and the 2nd one actually processes the data. We'd use the data from the first pass to generate the BigQuery schema. Of course, this adds additional computation (roughly 30% more), so we should provide this as an optional feature for "robust" imports. As an optimization, we could provide a sampling logic if the data is expected to be uniform (e.g. sample 30% of the data and assume that everything else follows that schema).

Add native annotation support

This task is to add native annotation support (e.g. gene ID, variant harmfulness, clinvar, dbSNP, etc) as part of the VCF->BigQuery pipeline.

Initially, we can have a pre-processing step that adds VEP or SnpEff annotations to the VCF file prior to import. The annotations would be added to a new INFO field that can then be parsed to load each annotation as a separate column to BigQuery. The annotation field presumably follows this schema.

Having a more generic INFO parser will also be beneficial for other pre-annotated VCF files such as gnomAD.

We can integrate Verily's annotation pipeline to load annotations via VEP.

Note: adding annotations as part of the pipeline without pre-processing is outside the scope of this task. We can consider that option if the pre-processing step is not feasible for certain use cases.

Add pip package

We should add a pip package for easier use of the pipeline in other python libraries. This depends on Issue #77 for adding proper release process and versioning.

Add BigQuery->VCF pipeline

There are certain use cases where users like to transform variants from BigQuery to VCF (e.g. to use tools that expect a VCF file). This task is to write such a pipeline.

As an extension, once this is completed, we can provide VCF->VCF and BQ->BQ transforms as well (e.g. just use our merging logic to merge a large number of VCF files and output a single merged VCF file).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.