googlegenomics / gcp-variant-transforms Goto Github PK
View Code? Open in Web Editor NEWGCP Variant Transforms
License: Apache License 2.0
GCP Variant Transforms
License: Apache License 2.0
It is currently not trivial to estimate the resources needed to run the pipeline (particularly amount of disk space). This is especially important when processing large (10TB+) datasets as cost becomes an important factor.
The idea is to write a tool that preprocesses the data (similar to the header merger pipeline) and outputs the minimum resource requirements for running the pipeline.
Note that estimating the disk size is not as trivial as summing up the size of all files being processed as there is an overhead for each record depending on the number of samples, info, and filter fields.
BigQuery does not allow null values in repeated fields (the entire record can be null, but values within the record must each have a value). For instance, if a VCF INFO field is 1,.,2
, we cannot load 1,null,2
to BigQuery and need to use a numeric replacement for the null value. This is currently hardcoded to -sys.maxint in bigquery_vcf_schema.py. We should to make this a flag so that users can change it depending on the context.
Note: this should be done after the initial refactoring is done (Issue #59)
We currently use basic functions for getting the schema and converting variants to BQ rows. This has made the code less readable especially when making subtle changes (e.g. Issue #48 ). We should first refactor the file to use a class. This also makes checking the schema while the rows are being generated much easier as the schema would be a member variable of that class.
There are some use cases where a "flattened" BigQuery table is easier to process (i.e. does not contain repeated records). We can provide such an option as part of the pipeline and output a separate row for each record.
last few line of output:
...
valid-4-2 ... ok
valid-4-0-gz ... ok
valid-4-1-gz ... ok
valid-4-2-gz ... ok
valid-4-1 ... ok
valid-4-0 ... ok
valid-4-0-bz2 ... ok
./deploy_and_run_tests.sh succeeded!
Removing integration test environment /tmp/tmp.qVgcbOTlZZ
Deleting the test image from the registry
Traceback (most recent call last):
File "/usr/bin/../lib/google-cloud-sdk/lib/gcloud.py", line 88, in
main()
File "/usr/bin/../lib/google-cloud-sdk/lib/gcloud.py", line 64, in main
import traceback
ImportError: No module named traceback
Even though the VCF spec natively supports SVs, it's common to have them in other TSV/CSV formats. We should add native support for loading such data to BigQuery through Variant Transforms.
Proposal: add an option to map column names to a Variant object (e.g. "CopyNumber (int) -> Variant.info.CN", "start (int) -> Variant.start_position", etc).
Related:
https://groups.google.com/d/msg/google-genomics-discuss/AeR34tJpMds/PIpvE3roBgAJ
Even though the pipeline supports files in gzip and bzip2 formats, such files cannot be sharded. There are also other compression formats such as bgzip (http://www.htslib.org/doc/tabix.html) that are not supported.
It would be ideal to provide a "decompress and load" option as part of the pipeline. We can use the Pipelines API to do this in a scalable manner.
Background: Issue #62.
It would be nice to force an integration test run by Travis, when a pull request is sent. One issue is credentials to access Google Cloud projects by Travis. This tutorial might help.
Upon a commit, if the continuous integration build/test fails, no notification is currently sent. One should look at the "Build history" portion of the Container Registry dashboard for project gcp-variant-transforms-test
to check if CI for that commit succeeded or not. There is a Pub/Sub channel created for the CI and it can be used to set up such notifications (details here).
Currently, we require a BigQuery dataset to be present prior to loading the VCF files. Therefore, we do not control the permissions on the BigQuery dataset as they are set by the user outside of the pipeline (note: BigQuery permissions are set on the dataset level and not on the table level).
If the VCF files being imported have different read permissions, then there is a risk of exposing data to unauthorized users if they are merged together. Thus, the user needs to ensure that both the BigQuery dataset and Cloud Storage files have the same permissions.
It would be easier if we create BigQuery dataset(s) as part of the pipeline and dynamically set the same permission as the Cloud Storage files being imported. Of course, an "admin" user needs to run the pipeline, but this ensures that VCF files with different read permissions do not end up together in one BigQuery dataset.
This is a more involved task and requires more thought on what the user experience would look like. It's also not trivial to determine how to split up the files to ensure those with the same permissions end up in the same BigQuery dataset.
We currently some argument validation (e.g. valid BigQuery dataset), but we should add more. Particularly, we should catch any exceptions when matching the input_pattern and provide a user-friendly error (e.g. invalid or inaccessible input_pattern currently displays a cryptic HttpError). We can do the same for temp_location and staging_location.
To merge headers in a scalable way without requiring the user to provide --representative_header_file.
Ensure both github and docker images have proper release process (includes running integration tests) and are versioned appropriately.
I am trying to import the gnomAD variants into BigQuery, but I consistently get an error that makes the pipeline fail on chr1. I was able to successfully import some other chromosomes (chr21 and chr10).
The error I'm getting is ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
.
I tried the --allow_malformed_records True
flag, but that made no difference.
The input file is from https://storage.googleapis.com/gnomad-public/release/2.0.2/vcf/genomes/gnomad.genomes.r2.0.2.sites.chr1.vcf.bgz and I decompressed it before running the pipeline.
My pipeline configuration is
name: gnomad-genomes-to-bigquery-pipeline
docker:
imageName: gcr.io/gcp-variant-transforms/gcp-variant-transforms
cmd: |
./opt/gcp_variant_transforms/bin/vcf_to_bq \
--project XXX \
--input_pattern gs://XXX/gnomad/vcf_decompressed/genomes/gnomad.genomes.r2.0.2.sites.chr1.vcf \
--allow_malformed_records True \
--output_table dg-platform:GnomAD.gnomad_genomes_chr1 \
--staging_location gs://XXX/staging \
--temp_location gs://XXX/temp \
--job_name gnomad-genomes-to-bigquery-pipeline-chr1 \
--runner DataflowRunner
The full log is
2018/01/10 04:34:27 I: Switching to status: pulling-image
2018/01/10 04:34:27 I: Calling SetOperationStatus(pulling-image)
2018/01/10 04:34:27 I: SetOperationStatus(pulling-image) succeeded
2018/01/10 04:34:27 I: Pulling image "gcr.io/gcp-variant-transforms/gcp-variant-transforms"
2018/01/10 04:35:20 I: Pulled image "gcr.io/gcp-variant-transforms/gcp-variant-transforms" successfully.
2018/01/10 04:35:20 I: Done copying files.
2018/01/10 04:35:20 I: Switching to status: running-docker
2018/01/10 04:35:20 I: Calling SetOperationStatus(running-docker)
2018/01/10 04:35:20 I: SetOperationStatus(running-docker) succeeded
2018/01/10 04:35:20 I: Setting these data volumes on the docker container: [-v /tmp/ggp-305484772:/tmp/ggp-305484772]
2018/01/10 04:35:20 I: Running command: docker run -v /tmp/ggp-305484772:/tmp/ggp-305484772 gcr.io/gcp-variant-transforms/gcp-variant-transforms /tmp/ggp-305484772
2018/01/10 05:11:04 E: command failed: No handlers could be found for logger "oauth2client.contrib.multistore_file"
/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:122: DeprecationWarning: object() takes no parameters
super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0635919570923 seconds
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0466470718384 seconds
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0660800933838 seconds
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.0607531070709 seconds
/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:134: UserWarning: Using fallback coder for typehint: Any.
warnings.warn('Using fallback coder for typehint: %r.' % typehint)
INFO:root:Executing command: ['/opt/gcp_variant_transforms/venv/bin/python', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpFiWctv']
warning: check: missing required meta-data: url
warning: check: missing meta-data: if 'author' supplied, 'author_email' must be supplied too
INFO:root:Starting GCS upload to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/workflow.tar.gz...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Completed GCS upload to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/workflow.tar.gz
INFO:root:Staging the SDK tarball from PyPI to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/dataflow_python_sdk.tar
INFO:root:Executing command: ['/opt/gcp_variant_transforms/venv/bin/python', '-m', 'pip', 'install', '--download', '/tmp/tmpFiWctv', 'apache-beam==2.2.0', '--no-binary', ':all:', '--no-deps']
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
INFO:root:file copy from /tmp/tmpFiWctv/apache-beam-2.2.0.zip to gs://XXX/staging/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/dataflow_python_sdk.tar.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:root:Create job: <Job
createTime: u'2018-01-10T04:35:37.539665Z'
currentStateTime: u'1970-01-01T00:00:00Z'
id: u'2018-01-09_20_35_36-6246558984112825034'
location: u'us-central1'
name: u'gnomad-genomes-to-bigquery-pipeline-chr1'
projectId: u'XXX'
stageStates: []
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:root:Created job with id: [2018-01-09_20_35_36-6246558984112825034]
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-01-09_20_35_36-6246558984112825034?project=XXX
INFO:root:Job 2018-01-09_20_35_36-6246558984112825034 is in state JOB_STATE_PENDING
INFO:root:2018-01-10T04:35:36.979Z: JOB_MESSAGE_DETAILED: (56b03c4ce491e46b): Autoscaling is enabled for job 2018-01-09_20_35_36-6246558984112825034. The number of workers will be between 1 and 15.
INFO:root:2018-01-10T04:35:37.009Z: JOB_MESSAGE_DETAILED: (56b03c4ce491ee58): Autoscaling was automatically enabled for job 2018-01-09_20_35_36-6246558984112825034.
INFO:root:2018-01-10T04:35:39.256Z: JOB_MESSAGE_DETAILED: (52771809aae76d0e): Checking required Cloud APIs are enabled.
INFO:root:2018-01-10T04:35:40.098Z: JOB_MESSAGE_DETAILED: (52771809aae766b0): Expanding CoGroupByKey operations into optimizable parts.
INFO:root:2018-01-10T04:35:40.122Z: JOB_MESSAGE_DETAILED: (52771809aae769e5): Expanding GroupByKey operations into optimizable parts.
INFO:root:2018-01-10T04:35:40.148Z: JOB_MESSAGE_DETAILED: (52771809aae766b3): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:2018-01-10T04:35:40.171Z: JOB_MESSAGE_DEBUG: (52771809aae76381): Annotating graph with Autotuner information.
INFO:root:2018-01-10T04:35:40.199Z: JOB_MESSAGE_DETAILED: (52771809aae76d1d): Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:root:2018-01-10T04:35:40.226Z: JOB_MESSAGE_DETAILED: (52771809aae769eb): Fusing consumer FilterVariants/ApplyFilters into ReadFromVcf/Read
INFO:root:2018-01-10T04:35:40.248Z: JOB_MESSAGE_DETAILED: (52771809aae766b9): Fusing consumer VariantToBigQuery/ConvertToBigQueryTableRow into FilterVariants/ApplyFilters
INFO:root:2018-01-10T04:35:40.281Z: JOB_MESSAGE_DETAILED: (52771809aae76387): Fusing consumer VariantToBigQuery/WriteToBigQuery/NativeWrite into VariantToBigQuery/ConvertToBigQueryTableRow
INFO:root:2018-01-10T04:35:40.304Z: JOB_MESSAGE_DEBUG: (52771809aae76055): Workflow config is missing a default resource spec.
INFO:root:2018-01-10T04:35:40.331Z: JOB_MESSAGE_DEBUG: (52771809aae76d23): Adding StepResource setup and teardown to workflow graph.
INFO:root:2018-01-10T04:35:40.363Z: JOB_MESSAGE_DEBUG: (52771809aae769f1): Adding workflow start and stop steps.
INFO:root:2018-01-10T04:35:40.390Z: JOB_MESSAGE_DEBUG: (52771809aae766bf): Assigning stage ids.
INFO:root:2018-01-10T04:35:40.511Z: JOB_MESSAGE_DEBUG: (280200dbfaf6cac0): Executing wait step start3
INFO:root:2018-01-10T04:35:40.565Z: JOB_MESSAGE_BASIC: (12e360a044595f8): Executing operation ReadFromVcf/Read+FilterVariants/ApplyFilters+VariantToBigQuery/ConvertToBigQueryTableRow+VariantToBigQuery/WriteToBigQuery/NativeWrite
INFO:root:2018-01-10T04:35:40.599Z: JOB_MESSAGE_DEBUG: (35a4b300ff329644): Starting worker pool setup.
INFO:root:2018-01-10T04:35:40.628Z: JOB_MESSAGE_BASIC: (35a4b300ff3297da): Starting 10 workers in us-central1-f...
INFO:root:Job 2018-01-09_20_35_36-6246558984112825034 is in state JOB_STATE_RUNNING
INFO:root:2018-01-10T04:35:47.357Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fdd0): Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:35:57.734Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54ffab): Autoscaling: Raised the number of workers to 3 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:35:57.764Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fe01): Resized worker pool to 3, though goal was 10. This could be a quota issue.
INFO:root:2018-01-10T04:36:02.987Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f759): Autoscaling: Raised the number of workers to 9 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:36:03.018Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f5af): Resized worker pool to 9, though goal was 10. This could be a quota issue.
INFO:root:2018-01-10T04:36:14.570Z: JOB_MESSAGE_DETAILED: (2c7b5f62246857c2): Workers have started successfully.
INFO:root:2018-01-10T04:36:18.590Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f436): Autoscaling: Raised the number of workers to 10 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:39:14.929Z: JOB_MESSAGE_BASIC: (280200dbfaf6cb22): Autoscaling: Resizing worker pool from 10 to 15.
INFO:root:2018-01-10T04:39:20.293Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fed4): Autoscaling: Raised the number of workers to 14 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:39:20.320Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54fd2a): Resized worker pool to 14, though goal was 15. This could be a quota issue.
INFO:root:2018-01-10T04:39:25.575Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f5ad): Autoscaling: Raised the number of workers to 15 based on the rate of progress in the currently running step(s).
INFO:root:2018-01-10T04:57:04.250Z: JOB_MESSAGE_ERROR: (8f917a73bb67a9d9): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
with self.scoped_process_state:
File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
self.writer.Write(o.value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
super(TextFileWriter, self).Write(value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
self.file.write(self.sink.coder.encode(value))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
INFO:root:2018-01-10T04:59:20.080Z: JOB_MESSAGE_ERROR: (b0f417b262714c09): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
with self.scoped_process_state:
File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
self.writer.Write(o.value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
super(TextFileWriter, self).Write(value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
self.file.write(self.sink.coder.encode(value))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
INFO:root:2018-01-10T05:01:35.890Z: JOB_MESSAGE_ERROR: (a4b435c70fc68065): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
with self.scoped_process_state:
File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
self.writer.Write(o.value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
super(TextFileWriter, self).Write(value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
self.file.write(self.sink.coder.encode(value))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
INFO:root:2018-01-10T05:08:18.598Z: JOB_MESSAGE_ERROR: (a4b435c70fc689e8): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
with self.scoped_process_state:
File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
self.writer.Write(o.value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
super(TextFileWriter, self).Write(value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
self.file.write(self.sink.coder.encode(value))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
INFO:root:2018-01-10T05:09:41.397Z: JOB_MESSAGE_ERROR: (b0f417b262714adf): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
with self.scoped_process_state:
File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
self.writer.Write(o.value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
super(TextFileWriter, self).Write(value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
self.file.write(self.sink.coder.encode(value))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
INFO:root:2018-01-10T05:09:41.410Z: JOB_MESSAGE_BASIC: (42fdb1ab4fd8cff2): Executing BigQuery import job "dataflow_job_8333640118018302870". You can check its status with the bq tool: "bq show -j --project_id=XXX dataflow_job_8333640118018302870".
INFO:root:2018-01-10T05:09:41.435Z: JOB_MESSAGE_WARNING: (42fdb1ab4fd8cb68): Unable to delete temp files: "gs://XXX/temp/gnomad-genomes-to-bigquery-pipeline-chr1.1515558922.910737/8333640118018303863/dax-tmp-2018-01-09_20_35_36-6246558984112825034-S01-0-207f78e951a6f2a/@DAX.json."
INFO:root:2018-01-10T05:09:41.509Z: JOB_MESSAGE_DEBUG: (12e360a04459cb7): Executing failure step failure2
INFO:root:2018-01-10T05:09:41.533Z: JOB_MESSAGE_ERROR: (12e360a04459df1): Workflow failed. Causes: (12e360a04459a43): S01:ReadFromVcf/Read+FilterVariants/ApplyFilters+VariantToBigQuery/ConvertToBigQueryTableRow+VariantToBigQuery/WriteToBigQuery/NativeWrite failed., (1f26fcd604aa07c5): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
gnomad-genomes-to-bigquer-01092035-ea7e-harness-dw84,
gnomad-genomes-to-bigquer-01092035-ea7e-harness-dw84,
gnomad-genomes-to-bigquer-01092035-ea7e-harness-dw84,
gnomad-genomes-to-bigquer-01092035-ea7e-harness-q4nv
INFO:root:2018-01-10T05:09:41.670Z: JOB_MESSAGE_DETAILED: (52771809aae7605e): Cleaning up.
INFO:root:2018-01-10T05:09:41.774Z: JOB_MESSAGE_DEBUG: (52771809aae769fa): Starting worker pool teardown.
INFO:root:2018-01-10T05:09:41.797Z: JOB_MESSAGE_BASIC: (52771809aae766c8): Stopping worker pool...
INFO:root:2018-01-10T05:10:55.180Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f475): Autoscaling: Resized worker pool from 15 to 0.
INFO:root:2018-01-10T05:10:55.208Z: JOB_MESSAGE_DETAILED: (20f2c6bf2c54f2cb): Autoscaling: Would further reduce the number of workers but reached the minimum number allowed for the job.
INFO:root:2018-01-10T05:10:55.263Z: JOB_MESSAGE_DEBUG: (52771809aae766ce): Tearing down pending resources...
INFO:root:Job 2018-01-09_20_35_36-6246558984112825034 is in state JOB_STATE_FAILED
Traceback (most recent call last):
File "/usr/lib/python2.7/runpy.py", line 162, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/opt/gcp_variant_transforms/src/gcp_variant_transforms/vcf_to_bq.py", line 223, in <module>
run()
File "/opt/gcp_variant_transforms/src/gcp_variant_transforms/vcf_to_bq.py", line 218, in run
append=known_args.append))
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 346, in __exit__
self.run().wait_until_finish()
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 966, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
(b0f417b262714adf): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/native_operations.py", line 98, in dataflow_worker.native_operations.NativeWriteOperation.process
with self.scoped_process_state:
File "dataflow_worker/native_operations.py", line 104, in dataflow_worker.native_operations.NativeWriteOperation.process
self.writer.Write(o.value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 577, in Write
super(TextFileWriter, self).Write(value)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 462, in Write
self.file.write(self.sink.coder.encode(value))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 162, in encode
raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
ValueError: Out of range float values are not JSON compliant. NAN, INF and -INF values are not JSON compliant. [while running 'VariantToBigQuery/ConvertToBigQueryTableRow']
(exit status 1)
Currently, the only supported merge strategy is MOVE_TO_CALLS, which only merges samples that have the exact reference_name:start_position:end_position:reference_bases:alternate_bases
.
For gVCF files, we like to add 0/0
to samples that don't have a variant. For instance, "sample1" has a non-variant from position 1-100, "sample2" has a variant 1/0
in position 55. We should output a single row at position 55 for both "sample1" and "sample2" that has 0/0
and 1/0
for "sample1" and "sample2", respectively.
We currently have a merging logic for gVCF files (merge_with_non_variants_strategy.py). It can handle non-variants, but needs to be cleaned up and tested further. It also depends on a fix in the intervaltree library (see chaimleib/intervaltree#63).
This task is only for supporting merging SNPs with non-variants.
The pipeline is currently very slow when merging massive number of records (300B+).
Optimization strategies:
Optimizing this step will be more important especially when we add more advanced merging strategies (e.g. merge with non-variants).
There is both region [1] and zone arguments (and in two places! one in the pipelines API and one in the Dataflow API). We should document how to use these features.
[1] https://cloud.google.com/dataflow/docs/concepts/regional-endpoints#supported_regional_endpoints
Currently, if there is a header schema mismatch (e.g. a FORMAT field is missing from the header), the pipeline will proceed all the way until the very end and only fails when it tries to write to BigQuery. It also does not give a representative error message (complains that a field is missing in BQ JSON schema, see Issue #61). We should catch such errors sooner and give proper error. Depends on Issue #59 as we need to first refactor that part of the code.
Ideally we may be able to 'detect and correct' such errors, but this is more difficult currently as the BQ schema cannot be dynamically created/changed.
Create new class(es) descending from PipelineOptions that handle arguments parsing and validation for all vcf_to_bq arguments.
For large (1TB+) datasets, we should provide an option to output multiple BigQuery tables for each chromosome. This makes querying the data cheaper for certain types of queries (e.g. querying variants in a particular gene). Users can use BigQuery wildcards to query over all of the data.
Things to consider:
Related:
There is currently a version of integration tests implemented (see run_tests.py) and at some point in the past it has worked. But currently it does not run. We need to fix it, improve the tests and add some sort of validation (currently it does not validate the generated table). Finally we should implement some sort of continuous run for this integration test (it is not a small test).
The current integration tests we have are small and do not cover enough variations in VCF inputs. To automate our release process, we need to add larger cases and do both deeper validation and performance analysis to make sure we are not introducing any regression. One potential dataset to use is 1000 Genomes (see Issue #61 too).
Possibilities:
1 ) field FT is missing in the dataset
2) field FT is present in the dataset, but there is a bug in merge headers logic
Beam 2.3.0 has fixed some issues (e.g. https://github.com/googlegenomics/gcp-variant-transforms/blob/master/setup.py#L21). We can now use the modified TextSource in vcfio.py that's part of the Beam SDK.
We should clean up our code and also ensure it works fine with 2.3.0.
We currently have basic filtering options.
More filtering options are useful in some scenarios especially when the data size is large. For instance, only loading variants that have filter=PASS may considerably reduce the data size and cost (especially if there is no use case for analyzing non-passing variants).
We currently provide a "--append" option to append data to an existing table. However, it requires the schema to be exactly the same. We can use the BigQuery API to update the schema prior to import (in cases where the schema is compatible) or throw an error when the schema is incompatible (e.g. the same field is defined with a different type).
Essentially requires passing a flag to change line [1] to be beam.io.BigQueryDisposition.WRITE_APPEND instead of beam.io.BigQueryDisposition.WRITE_TRUNCATE
In issue #49, we resolve the number conflict, dot vs A, only when flag split_alternate_allele_info_fields is off. However, it would be nice to give an option to the user to force dot to A conversion even when the flag is on.
It would be great to have some more information about variant merging -- I have sometimes as many as 12 VCF files for a single tumor/normal pair and I'd like to understand how variants are merged. I also noted that the main Google documentation page instructions suggests using --variant_merge_strategy MOVE_TO_CALLS
while the documentation here on github doesn't mention that. When I tried to import a set of 23 VCF files representing two tumor/normal pairs, I got the following error:
ERROR:root:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f64a98d5450>, due to an exception.
Traceback (most recent call last):
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 312, in call
side_input_values)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 347, in attempt_call
evaluator.process_element(value)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 551, in process_element
self.runner.process(element)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 390, in process
self._reraise_augmented(exn)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 388, in process
self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 281, in invoke_process
self._invoke_per_window(windowed_value)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 307, in _invoke_per_window
windowed_value, self.process_method(*args_for_process))
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 63, in process
return self.wrapper(self.dofn.process, args, kwargs)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 81, in wrapper
result = method(*args, **kwargs)
File "/opt/gcp_variant_transforms/venv/local/lib/python2.7/site-packages/apache_beam/runners/direct/helper_transforms.py", line 60, in process
vi)
File "gcp_variant_transforms/transforms/merge_headers.py", line 175, in add_input
return self.merge_accumulators([source, to_merge])
File "gcp_variant_transforms/transforms/merge_headers.py", line 180, in merge_accumulators
self._header_merger.merge(merged_headers, to_merge)
File "gcp_variant_transforms/transforms/merge_headers.py", line 127, in merge
self._merge_header_fields(first.formats, second.formats)
File "gcp_variant_transforms/transforms/merge_headers.py", line 160, in _merge_header_fields
first_value, second_value, str(e)))
ValueError: Incompatible number or types in header fields:OrderedDict([('id', 'GQ'), ('num', 1), ('type', 'Integer'), ('desc', 'Genotype Quality')]), OrderedDict([('id', 'GQ'), ('num', None), ('type', 'Integer'), ('desc', 'Conditional Phred-scaled genotype quality')])
. Error: Incompatible numbers cannot be resolved: 1, None [while running 'MergeHeaders/MergeHeaders/CombinePerKey/LiftedCombinePerKey/ParDo(PartialGroupByKeyCombiningValues)']
I tried removing the --variant_merge_strategy MOVE_TO_CALLS
option but I got the same error.
Example of resolvable conflicts:
Type:
Number:
....
We have a flag for resolving header conflicts, consider doing it by default and possibly removing the flag.
Flag:
--force_merge_header_conflicts to resolve
Issue #80 tracks the task of adding gVCF merging logic for SNPs. This task is to implement a more advanced version to support indels.
Currently we get some dependency conflicts after installing setup.py and that is because pip
cannot resolve version dependencies properly. IOW, instead of picking the intersection of version specifiers between different dependencies, it seems pip
installs packages one by one independently (see PR #63 for an example). After installation, pipdeptree
shows these issues, for example:
$ pipdeptree -p apache-beam
We need to eventually fix this problem either by adding other non-direct dependencies explicitly (which PR #63 does but is ugly) or finding a tool that can resolve all version restrictions together.
Apparently, Cython is not used by default when just installing through pip. See this thread: https://lists.apache.org/thread.html/90eb2605b42f5f8ce9d729fdac43af7a914a6e70f4f1cf8ae16e8ce4@%3Cdev.beam.apache.org%3E
I verified that cython is not installed in our docker image (through pip freeze). We should install it prior to installing Beam and determine if there is a performance boost as a result.
Background:
#45 fixed many Pylint related issues and enabled it as a presubmit check.
#57 fixed various kinds of warnings exposed by IntelliJ Python plugings including some PEP 8 issues but has not added any of these static analysis as a presubmit check.
Ask:
We need to evaluate tools like pep8 and Pycharm inspections and potentially enable them as presubmit checks.
The current merging logic assumes that all VCF files have the same reference. This task is to convert all files to a common reference before merging. This feature can actually be used independent of the merging logic as well (i.e. a fast way of changing all reference bases in a large number of VCF files).
Thins to consider:
More details need to be outlined in a design doc :)
This is due to a bug in (re-)setting pipeline args
Now that we have the continuous integration testing working, we should add at least one integration test to cover each option: https://github.com/googlegenomics/gcp-variant-transforms/blob/master/gcp_variant_transforms/options/variant_transform_options.py
We currently use python type hints in docstrings (e.g. arg_name (int): some argument
). For class types, we use ``Class``
or sometimes :class:`Class`
. This is to make sphinx understand the types and is a convention used in Beam as well.
We should first setup python doc generator using Sphinx (see Beam's implementation) and fix all broken type references.
To natively support processing a large number of files:
There's a protobuf incompatibility issue in the Beam 2.2.0 pip package. See https://issues.apache.org/jira/browse/BEAM-3357. This will be fixed in Beam 2.3.0, but until then we have introduced some workarounds in our code. Particularly, we've added explicit dependency of 'grpcio>=1.0,<=1.7.3' in setup.py. While this fixes the setup/test issues, it will actually complain again when running the Dataflow pipeline. The workaround for that is to run 'pip install --upgrade apache_beam[gcp]' again.
This issue is to just keep track of these workarounds and remove them once Beam 2.3.0 is released.
While debugging a user issue (see thread), we came across non-standard fields from VarScan2 (e.g. the "AD" field is split into "AD" and "RD" fields, each having Number=1 instead of a single field with Number=R as specified in the VCF spec).
While the ideal solution is to change VarScan2 to output the AD field according to the VCF spec, this approach may not be ideal and would not work for existing VCF files.
Idea: provide a "transformation plugin" for converting output from non-standard variant callers (e.g. in the VarScan2 case, it would merge AD and RD fields into a single field with Number=R). We can write a few common plugins ourselves, but it should be easy enough for users to write their own plugins as well (either in code or through a config file), since we may not be able to cover all corner cases.
We still need to add documentation for the merging logic and handling invalid data.
We currently just concatenate reference/alternate_bases together as the merge key (see here).
This may break Dataflow if the length of reference/alternate_bases are large (larger than 1MB limit for the key). We should create a hash of these fields and use that instead. This ensures that our keys do not get arbitrarily large.
As a result, the key would be reference_name:start_position:end_position:<hash of reference_name:alternate_bases>
. Given that we include reference/start/end as part of the key (excluding the hash) then the risk of collision is extremely low. However, we can still add an assert here just to be sure.
It is surprisingly common for VCF files to have missing header fields (see Issue #61 for a 1000 genomes file). The pipeline currently fails as it is unable to map the missing field(s) to the BigQuery column(s). Note that we need to generate the schema prior to running the pipeline, which is why we need to rely on complete and valid headers.
The current workaround is to manually specify the missing header fields through --representative_header_file
, but this is painful and not scalable.
An alternative approach is to do two passes on the data: the first one parses all of the fields in all records just to find the missing headers and the 2nd one actually processes the data. We'd use the data from the first pass to generate the BigQuery schema. Of course, this adds additional computation (roughly 30% more), so we should provide this as an optional feature for "robust" imports. As an optimization, we could provide a sampling logic if the data is expected to be uniform (e.g. sample 30% of the data and assume that everything else follows that schema).
This task is to add native annotation support (e.g. gene ID, variant harmfulness, clinvar, dbSNP, etc) as part of the VCF->BigQuery pipeline.
Initially, we can have a pre-processing step that adds VEP or SnpEff annotations to the VCF file prior to import. The annotations would be added to a new INFO field that can then be parsed to load each annotation as a separate column to BigQuery. The annotation field presumably follows this schema.
Having a more generic INFO parser will also be beneficial for other pre-annotated VCF files such as gnomAD.
We can integrate Verily's annotation pipeline to load annotations via VEP.
Note: adding annotations as part of the pipeline without pre-processing is outside the scope of this task. We can consider that option if the pre-processing step is not feasible for certain use cases.
We currently have both HeaderFields and VcfHeader.
HeaderFields is an older object that was written prior to having VcfHeader. We should remove HeaderFields in favor of VcfHeader.
We should add a pip package for easier use of the pipeline in other python libraries. This depends on Issue #77 for adding proper release process and versioning.
There are multiple ways of representing the same variant in a VCF file and some formats provide redundant info. We can provide common normalization transforms (e.g. left/right trimming, removing unnecessary bases from indels, etc) as part of the pipeline. This will be essential for properly merging variants across files as well.
Related: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4481842/
There are certain use cases where users like to transform variants from BigQuery to VCF (e.g. to use tools that expect a VCF file). This task is to write such a pipeline.
As an extension, once this is completed, we can provide VCF->VCF and BQ->BQ transforms as well (e.g. just use our merging logic to merge a large number of VCF files and output a single merged VCF file).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.