icgc-argo / donor-submission-aggregator Goto Github PK
View Code? Open in Web Editor NEWEvent-driven donor aggregation service for the ICGC-ARGO Submission System
License: GNU Affero General Public License v3.0
Event-driven donor aggregation service for the ICGC-ARGO Submission System
License: GNU Affero General Public License v3.0
Rule #1: the total Number of alignment
runs should not be greater than the total number of samples.
sequencing_alignment
analysis as an example (if the dashboard is in fact reporting the correct number)Rule #2: the total Number of Sanger VC
runs should be less than or equal to the number of T Samples. As a rule right now, more rather than less should be 1:1.
Rule #3: If Sanger runs exist, then Alignments should not be 0
A sheet of running issues that have been observed are here: https://docs.google.com/spreadsheets/d/1zdf2Xy5L4ggWASBzPbsCXSQ0bIG5McH4O7UhClu1mXk/edit#gid=0
Prod RDPC data has been indexed into QA to test this. This query has been used for QA, looking at the QA Platform but the production RDPC API:
While the dashboard is counting runs, for the QA @rosibaj has been looking at analyses, as this is the information that should be consistent with the numbers reported on the dashboard.
Prod API: https://api.rdpc.cancercollaboratory.org/graphql
Query:
query test_num_in_dash{
analyses(filter: {donorId: "DO35424", analysisState: PUBLISHED, analysisType: "sequencing_alignment"}) {
analysisId
#analysisState
#analysisType
#analysisVersion
experiment
donors{
donorId
submitterDonorId
specimens{
specimenId
submitterSpecimenId
tumourNormalDesignation
samples{
sampleId
submitterSampleId
matchedNormalSubmitterSampleId
}
}
}
files{
dataType
}
inputForRuns{
runId
}
workflow {
runId
workflowName
run{
runId
state
inputAnalyses{
analysisId
analysisType
}
}
}
}
}
Before deployment to production,
Currently, if there is anything failing, we are ignoring the error. For this to be production ready, we need to build some fault-tolerance (retries) to the flow. A couple places:
A new change has been made to rdpc gateway api which involves api structure change: icgc-argo/rdpc-gateway#58 (comment), in order to be compatible with the latest api, donor aggregator query needs to be updated.
donor dashboard aggregator
service to populate the index based on the spec'd data needs: https://wiki.oicr.on.ca/display/icgcargotech/Program+Dashboard+Page+Specs#ProgramDashboardPageSpecs-2.4.2MolecularDataChart Line | Source of Data |
---|---|
Raw Reads | This is a count the number of donors with the minimum number of raw reads submitted. The minimum accepted number means the donor has at least 1 Tumour/Normal pair. Count the number of donors in the time interval with analyses that fits this criteria: at least 1 analysis of (analysis_type = sequencing_experiment AND tumour_normal_designation = Tumour) AND at least 1 analysis of (analysis_type = sequencing_experiment AND tumour_normal_designation = Normal) |
raw-reads
portion is populate with the correct data as per the logic specified above.Devops and all that jazz:
add logic to aggregator to support Mutect2
GatK Mutect 2
wf is the one to filter on for this set of dataExpected Behavior
The aggregator got stuck on consuming incorrectly formatted messages i.e. missing studyId or programId,
When consuming an invalid message, it's throwing an error and does nothing so the event is never marked as processed, causing the aggregator to stall.
In the event that an invalid message is passed to the aggregator:
log invalid messages out to slack, indicating the topic, partition and offset of the event,
pass invalid messages to dlq. Example implementation: https://github.com/icgc-argo/files-service/blob/master/src/kafka.ts
Have the aggregator move on to the next message
In the event that an invalid message is passed to the aggregator:
FOLLOWUP FOR NEXT TICKET
pass invalid messages to dlq. Example implementation: https://github.com/icgc-argo/files-service/blob/master/src/kafka.ts
The file src/indexProgram/transformToEsDonor.ts
contains the transformation logic from Mongo to Elasticsearch documents. Currently this just contains some placeholder values.
Tests will need to be updated
Follow-up to #113
src/index.ts
is the main entry point to the indexer. It is currently set up as a script for manual trigger. This will need to be turned into a web service with kafka integration.
Requirements:
Currently we have a pretty extreme exponential retry policy that looks like this:
{
factor: 2,
retries: 100,
minTimeout: 1000,
maxTimeout: Infinity,
}
This means if indexing fails for a given program, it will keep retrying that one program practically forever. Then if it exhausts all the retries, it doesn't do anything but completely ignore that program forever and move on...
Problems that arrises from this:
Solutions:
Take the query from the the RDPC and transform the data into the needed shape
Donor aggregator is taking more than 5 minutes to start, to reduce long start up time, we need to adjust these kafka consumer settings:
sessionTimeout
rebalanceTimeout
heartbeatInterval
prod-submission-song
with TEST-INTL
program2021-03-31T17:31:15.110Z info: starts processing RDPC event for program TEST-INTL
2021-03-31T17:31:15.118Z info: Existing index settings match default settings, obtaining a new index name from rollcall, clone=true.
2021-03-31T17:31:16.409Z info: obtained new index name: donor_centric_program_testintl_re_8
2021-03-31T17:31:16.804Z info: Enabled WRITE to index : donor_centric_program_testintl_re_8
2021-03-31T17:31:16.804Z info: Processing program: TEST-INTL from https://api.rdpc.cancercollaboratory.org/graphql.
2021-03-31T17:31:16.804Z info: fetching ego public key...
2021-03-31T17:31:17.355Z warn: No document to index for program TEST-INTL
2021-03-31T17:31:17.355Z info: releasing index donor_centric_program_testintl_re_8 to alias donor_submission_summary
logs when publishing an analysis:
info: starts processing RDPC event for program TEST-INTL
2021-03-31T17:33:03.267Z info: Existing index settings match default settings, obtaining a new index name from rollcall, clone=true.
2021-03-31T17:33:04.687Z info: obtained new index name: donor_centric_program_testintl_re_9
2021-03-31T17:33:05.133Z info: Enabled WRITE to index : donor_centric_program_testintl_re_9
2021-03-31T17:33:05.134Z info: Processing program: TEST-INTL from https://api.rdpc.cancercollaboratory.org/graphql.
2021-03-31T17:33:05.134Z info: fetching ego public key...
2021-03-31T17:33:06.240Z info: streaming analyses with Specimens for donor DO250552
2021-03-31T17:33:06.240Z info: fetching ego public key...
2021-03-31T17:33:06.269Z info: Fetching sequencing experiment analyses with specimens from rdpc.....
2021-03-31T17:33:06.307Z info: Streaming 5 of sequencing experiment analyses with specimens...
2021-03-31T17:33:06.307Z info: fetching ego public key...
2021-03-31T17:33:06.331Z info: Fetching sequencing experiment analyses with specimens from rdpc.....
2021-03-31T17:33:06.706Z info: streaming analyses for donor DO250552
2021-03-31T17:33:06.707Z info: fetching ego public key...
2021-03-31T17:33:06.733Z info: Starting to query sequencing_experiment analyses for alignment workflow runs
2021-03-31T17:33:06.823Z info: Streaming 5 of sequencing_experiment analyses...
2021-03-31T17:33:06.823Z info: fetching ego public key...
2021-03-31T17:33:06.845Z info: Starting to query sequencing_experiment analyses for alignment workflow runs
2021-03-31T17:33:06.878Z info: streaming analyses for donor DO250552
2021-03-31T17:33:06.879Z info: fetching ego public key...
2021-03-31T17:33:06.900Z info: Starting to query sequencing_alignment analyses for sanger variant calling workflow runs
2021-03-31T17:33:06.931Z info: streaming analyses for donor DO250552
2021-03-31T17:33:06.931Z info: fetching ego public key...
2021-03-31T17:33:06.955Z info: Starting to query sequencing_alignment analyses for mutect2 workflow runs
2021-03-31T17:33:06.994Z info: Begin bulk indexing donors of program TEST-INTL...
2021-03-31T17:33:07.692Z info: Successfully indexed all donors of program TEST-INTL to index: donor_centric_program_testintl_re_9
2021-03-31T17:33:07.693Z info: releasing index donor_centric_program_testintl_re_9 to alias donor_submission_summary
2021-03-31T17:33:08.631Z info: TEST-INTL duration: 110761
UNPUBLISH
and SUPPRESS
SHOULD edit the document.
Changes being made in Rollcall here for vault support: overture-stack/rollcall#25
Make sure Rollcall is pointed to the NEW Elasticsearch as part of this configuration
The aggregator connects to the rdpc-api, which now requires AUTH headers to allow for data extraction. We need to add support for authenticated requests to rdpc using app to app tokens. The aggregator needs to:
send auth headers to rdpc-api on all calls for authorization
be registered as an application in ego
be able to generate an app token from ego that is sent in the headers
register aggregator as a application in ego; store secrets in vault
write some code/use a library to refresh the auth token regularly.
hold onto the jwt in memory; use it normally;
each time we try and reach out to rdpc-api, do a preflight check on duration of jwt as a self-validation before reaching out to jwt., if it has not expires use it, it it has not
issue an application token and use it in the auth headers to the rdpc-gateway-api
donor dashboard aggregator
service to populate the index based on the spec'd data needs: https://wiki.oicr.on.ca/display/icgcargotech/Program+Dashboard+Page+Specs#ProgramDashboardPageSpecs-2.4.2MolecularDataChart Line | Source of Data |
---|---|
Alignment | This is a count the number of donors with the minimum number of alignments that have completed processing. The minimum accepted number means the donor has at least 1 Tumour/Normal pair worth of alignments. Count the number of donors in the time interval with analyses that fits this criteria: at least 1 analysis of (analysis_type = sequencing_alignment AND tumour_normal_designation = Tumour) AND at least 1 analysis of (analysis_type = sequencing_alignment AND tumour_normal_designation = Normal) |
The donor-dashboard-aggregator mapping has a new section/fields that deliver the time data for the spec'd lines of the chart.
molecular data chart
I was looking at the dashboard for another reason, but noticed some data anomalies.
Steps to reproduce the behavior:
Sanger VC column descending
DO234422
. On the dashboard you can see the have 1 in progress Sanger, and 1 in progress Mutect 2:query test_num_in_dash {
analyses(
filter: {
donorId: "DO234422"
analysisState: PUBLISHED
analysisType: "variant_calling"
#analysisId: "8ebe3a61-e06c-4616-be3a-61e06ca616ce"
}
) {
info{
totalHits
}
content {
analysisId
analysisState
analysisType
#analysisVersion
updatedAt
firstPublishedAt
studyId
experiment
donors {
# donorId
# submitterDonorId
specimens {
# specimenId
# submitterSpecimenId
tumourNormalDesignation
# samples {
# sampleId
# submitterSampleId
# matchedNormalSubmitterSampleId
# }
}
}
repositories{
code
}
files {
dataType
}
inputForRuns {
runId
state
}
workflow{
runId
run{
runId
state
startTime
completeTime
}
inputs
}
}
}
}
You can see there the run that generated these is listed as COMPLTED on sanger for runid wes-1e647943c32e4f6a92a2b0d62631ec22
Looking at the epoch time in the results, I can see it was completed a few days ago:
Completed Sunday March 9, 2021:
This should show a completed run.
Blocked by: icgc-argo/rdpc-gateway#44
Time to wrap up and write down the complex logic!
We have a swagger for the donor-submission-aggregator
Add an endpoint that takes a program-id, that queues a task to update that program.
For on-demand update of a program (instead of triggered by an event)
Endpoint in a swagger that can be tested with a Program id in QA.
investigate RDPC queries needed for the dashboard
write a query to talk to the RDPC gateway
--- consider the query load out to the rdpcs while designing the queries.
--- there are not too many requests
--- batch requests in smaller chunks
transform the data from the RDPC into the correct shape needed for the platform
populate the data in the (https://github.com/icgc-argo/donor-submission-aggregator/blob/develop/src/elasticsearch/donorIndexMapping.json )
The aggregator index was initialized in camelCase. For consistency across the platform, we want all mappings/indices to be the same.
Convert the donor submission summary indices to snake case
Current there is no read step in the indexing. For future indexing steps, we need to have the indexing flow be this:
Update aggregator and have it working the same as it currently does, but with the new format.
Currently all the index created are going on one shard. We might want to increase this for better workload distribution
Set the default to 3
Shards allow distribution of workload across cluster node. Multiple shards will prevent big programs from putting stress on one ES node.
Rollcall has a feature to accept index setting for index creation. We should pass something there. @jaserud has more info for rollcall.
Integrate with new Elasticsearch using secret in rollcall
The secrets have all been configured in Vault, just need to pull from there.
The problem: recently we updated the default es index settings in this ticket: #54, this resulted in rollcall unable to clone existing indices because of the mismatch in es settings, as es only allows index clone when old and new index have the same settings.
old index with number_of_shards = 1, numer_of_replicas= 1
:
Expected results:
all donor-aggregator indices in qa and dev should be updated to new settings:
"settings": {
"index.number_of_shards": 3,
"index.number_of_replicas": 2
}
Possible Solution:
PUT /_settings
endpointCreateResolvableIndexRequest
request to cloneFromReleasedIndex = false
which disables clonedonor dashboard aggregator
service to populate the index based on the spec'd data needs: https://wiki.oicr.on.ca/display/icgcargotech/Program+Dashboard+Page+Specs#ProgramDashboardPageSpecs-2.4.2MolecularDataChart Line | Source of Data |
---|---|
Sanger VC | This is a count the number of donors with the minimum number of sanger variant callings that have completed processing.Count the number of donors in the interval with at least one analysis that fits this criteria: analysis_type = variant_calling AND workflow_name =Sanger WGS Variant Calling OR workflow_name =Sanger WXS Variant Calling |
Mutect2 | This is a count the number of donors with the minimum number of mutect2 variant callings that have completed processing.Count the number of donors in the interval with at least one analysis that fits this criteria: analysis_type = variant_calling AND workflow_name = GATK Mutect2 Variant Calling |
The donor-dashboard-aggregator mapping has a new section/fields that deliver the time data for the spec'd lines of the chart.
The RDPC issues events for different items.
Discuss which rdpc events may need to be subscribed to to get the information that we need. maybe need to subscribe to a new topic (@lepsalex will know which topics and can discuss this with us)
Consider the case of multiple RDPCs - can events self identify the RDPC that the event originated from?
Verify if we can tell the donor/program from the RDPC event as input to the donor aggregator
Get song publish event
Reach out to rdpc api
initiate calculation of RDPC metrics for the dashboard
publish dashboard index
For this POC, just show one number calculating on the dashboard:
This is the count of donors with a RUNNING workflow.
The bug here is that it asks rollcall for a cloned index before deciding what to do: https://github.com/icgc-argo/donor-submission-aggregator/blob/7c09127a21f867e19074[…]5f539e7cde79fd17fe8/src/programQueueProcessor/eventProcessor.ts, when it should ask for the index based on the type of the event which for SYNC should be new (edite
On re-index, the index should actually be re-done. Not done from clone. The steps above will result in the removed donor no longer being in the index.
Take the query from the the RDPC
and transform the data into the needed shape
This column shows how many sequencing reads that have been registered are ACTUALLY submitted. For that donor, count:
Wiki Specs describing content: https://wiki.oicr.on.ca/display/icgcargotech/Program+Dashboard+Page+Specs#ProgramDashboardPageSpecs-SummaryTableDescription
Given a program_name,
We are subscribing to analysis update events, that comes with an analysis id. We are looping through all analysis. We can reduce the amount of processing done by adding an ANALYSIS filter
Update by ANALYSIS rather than by the whole program.
We need to add incremental updates to the donor-submission-aggregator
A bug was found when indexing test-pr
in qa:
2021-06-23T20:01:53.112Z info: Begin processing event: SYNC - TEST-PR
2021-06-23T20:01:54.826Z info: Obtaining new index, first for program.
2021-06-23T20:01:59.990Z info: Obtained new index name: donor_centric_program_testpr_re_10963
2021-06-23T20:02:02.209Z info: Enabled index writing for: donor_centric_program_testpr_re_10963
2021-06-23T20:02:02.868Z info: streaming 2 donor(s) from chunk #0 of program TEST-PR duration: 458
2021-06-23T20:02:02.874Z info: Processing program: TEST-PR from https://api.rdpc-qa.cancercollaboratory.org/graphql.
2021-06-23T20:02:02.879Z info: Fetching sequencing experiment analyses with specimens from rdpc.....
2021-06-23T20:02:03.799Z info: Streaming 14 of sequencing_experiment analyses with specimens and samples...
2021-06-23T20:02:03.801Z info: Fetching sequencing experiment analyses with specimens from rdpc.....
2021-06-23T20:02:03.881Z info: Streaming 50 of variant calling analyses for sanger/mutect first published dates...
2021-06-23T20:02:03.883Z warn: Failed to index program TEST-PR on attempt #1: TypeError: Cannot read property 'toLocaleLowerCase' of null
2021-06-23T20:02:06.666Z info: Index was removed: donor_centric_program_testpr_re_10963
2021-06-23T20:02:07.674Z info: Obtaining new index, first for program.
2021-06-23T20:02:11.068Z info: Obtained new index name: donor_centric_program_testpr_re_10963
2021-06-23T20:02:12.608Z info: Enabled index writing for: donor_centric_program_testpr_re_10963
2021-06-23T20:02:13.105Z info: streaming 2 donor(s) from chunk #0 of program TEST-PR duration: 487
2021-06-23T20:02:13.114Z info: Processing program: TEST-PR from https://api.rdpc-qa.cancercollaboratory.org/graphql.
2021-06-23T20:02:13.114Z info: Fetching sequencing experiment analyses with specimens from rdpc.....
2021-06-23T20:02:13.162Z info: Streaming 14 of sequencing_experiment analyses with specimens and samples...
2021-06-23T20:02:13.163Z info: Fetching sequencing experiment analyses with specimens from rdpc.....
2021-06-23T20:02:13.251Z info: Streaming 50 of variant calling analyses for sanger/mutect first published dates...
2021-06-23T20:02:13.252Z warn: Failed to index program TEST-PR on attempt #2: TypeError: Cannot read property 'toLocaleLowerCase' of null
2021-06-23T20:02:13.945Z info: Index was removed: donor_centric_program_testpr_re_10963
2021-06-23T20:02:15.965Z info: Obtaining new index, first for program.
The cause was that workflowname
was null in rdpc:
{
"analysisId": "08d245c8-caf0-4264-9245-c8caf07264a0",
"analysisType": "variant_calling",
"firstPublishedAt": "1606929512353",
"workflow": {
"workflowName": null
},
"donors": [
{
"donorId": "DO250183"
}
]
},
This broke aggregator as it was expecting a value:
const workflowName = analysis.workflow.workflowName.toLocaleLowerCase();
As we expand the aggregator to handle multiple topics, we need to maintain proper processing queue for events that relate to the same programs.
basically implement the Kafka event aggregation design here: https://drive.google.com/file/d/1dAJL7yre96sxA0WWMLklAujp-xCeJ_-O/view?usp=sharing
src/rollCall/index.ts
contains the rollcall proxy, this needs to be updated to expose an actual rollcall client.
Please include integration tests with a rollcal image.
Issue is observed in dev and qa, when a CREATE
event is received, donor aggregator fails to index analysis as it was expecting either publish
, unpublish
, or suppress
event.
Message that broke indexing:
{
"topic": "song_analysis",
"key": null,
"value": {
"analysisId": "dfb1d6c3-c21a-47f4-b1d6-c3c21a47f483",
"studyId": "ROSI-RU",
"state": "UNPUBLISHED",
"action": "CREATE",
"songServerId": "song.collab",
"analysis": {
"analysisId": "dfb1d6c3-c21a-47f4-b1d6-c3c21a47f483",
"studyId": "ROSI-RU",
"analysisState": "UNPUBLISHED",
"createdAt": "2021-06-25T14:21:20.239744",
"updatedAt": "2021-06-25T14:21:20.239774",
"firstPublishedAt": null,
"publishedAt": null,
"analysisStateHistory": [],
"samples": [
{
"sampleId": "SA622678",
"specimenId": "SP222655",
"submitterSampleId": "sample-6.1",
"matchedNormalSubmitterSampleId": null,
"sampleType": "Amplified DNA",
"specimen": {
"specimenId": "SP222655",
"donorId": "DO262424",
"submitterSpecimenId": "specimen-6.1",
"tumourNormalDesignation": "Normal",
"specimenTissueSource": "Blood derived",
"specimenType": "Normal"
},
"donor": {
"donorId": "DO262424",
"studyId": "ROSI-RU",
"gender": "Male",
"submitterDonorId": "Donor-6"
}
}
],
"files": [
{
"info": {
"analysis_tools": [
"BWA-MEM",
"biobambam2:bammarkduplicates2"
],
"data_category": "Sequencing Reads"
},
"objectId": "78f92452-6abf-59d0-b673-f5d692891b21",
"studyId": "ROSI-RU",
"analysisId": "dfb1d6c3-c21a-47f4-b1d6-c3c21a47f483",
"fileName": "ROSI-RU.DO262424.SA622678.wxs.20210625.aln.cram",
"fileSize": 1971126,
"fileType": "CRAM",
"fileMd5sum": "0056c5d7f00e5c3f3466c6982b7eb8da",
"fileAccess": "controlled",
"dataType": "Aligned Reads"
},
{
"info": {
"analysis_tools": [
"BWA-MEM",
"biobambam2:bammarkduplicates2"
],
"data_category": "Sequencing Reads"
},
"objectId": "fd16cee0-11e1-538a-abb3-4432d56d140a",
"studyId": "ROSI-RU",
"analysisId": "dfb1d6c3-c21a-47f4-b1d6-c3c21a47f483",
"fileName": "ROSI-RU.DO262424.SA622678.wxs.20210625.aln.cram.crai",
"fileSize": 509,
"fileType": "CRAI",
"fileMd5sum": "6eccbecad3f97563117a6d6a4d769a82",
"fileAccess": "controlled",
"dataType": "Aligned Reads Index"
}
],
"analysisType": {
"name": "sequencing_alignment",
"version": 11
},
"experiment": {
"experimental_strategy": "WXS",
"platform": "ILLUMINA",
"platform_model": "HiSeq 2000",
"sequencing_center": "EXT",
"sequencing_date": "2014-12-12",
"submitter_sequencing_experiment_id": "TEST_EXP"
},
"read_group_count": 67,
"workflow": {
"genome_build": "GRCh38_hla_decoy_ebv",
"inputs": [
{
"analysis_type": "sequencing_experiment",
"input_analysis_id": "c52c6e97-2b13-451c-ac6e-972b13751c86"
}
],
"run_id": "wes-9669b00a389d472c98562720b839c195",
"session_id": "d8573c63-a72f-402c-8476-466ac0cfac4b",
"workflow_name": "DNA Seq Alignment",
"workflow_version": "1.5.1"
}
}
},
"partition": 0,
"offset": 11279
},
Expected bebaviour: aggregator should fetch the latest analysis and index the donor whenever an event is received.
Two things to pull:
Needs to a "useVault" option for each data store
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.