cerner / bunsen Goto Github PK
View Code? Open in Web Editor NEWExplore, transform, and analyze FHIR data with Apache Spark
Home Page: https://engineering.cerner.com/bunsen
License: Apache License 2.0
Explore, transform, and analyze FHIR data with Apache Spark
Home Page: https://engineering.cerner.com/bunsen
License: Apache License 2.0
I've tried to use the R4 version functions from PySpark but it seems it cannot find the java classes for R4 on the jar. I've checked the project code and it seems that the R4 module is not included.
When I try 0.4.6 version, R4 functions are present and working.
It will be support for R4 in the near future? Why is R4 support disabled on the latest versions?
#46 recently added support for Contained resources. The listing of contained resources could be supplied by a variadic parameter in Java. However, in consumption of the API in other projects, calls to of
that provided no Contained resources could not be properly resolved. This issue seemed to affect the ValueSets
and ConceptMaps
classes in particular: the classes would fail to initialize over the private Encoder variable for the resource type.
This issue did not present itself during unit/integration testing in Bunsen proper.
The fix would be to add an additional method that unambiguously supports a single Resource argument, without any Contained arguments.
When a StructureDefinition defines an "extension" element and its max cardinality is set to "*" then the corresponding Avro class is not generated with a "List" type.
Bunsen 0.5.0 release
A lot of systems continue to use DSTU2, so we should add it as an option to Bunsen.
The latest_version
method in ConceptMaps and ValueSets is broken with missing references to self
.
We should fix the function body and also add tests over the method.
The add_mappings
function is missing a java_package argument in its call to _add_mappings_to_map
.
Additionally, the function will fail in any case, since all ConceptMaps must have a unique URI&Version, and attempting to add new mappings to an existing ConceptMap without updating the version violates this requirement.
We should fix the body of the function and additionally add a new_version
parameter that will give a new version to the updated map.
The add_values
method against ValueSets
currently fails when adding additional values to an existing ValueSet instance by ValueSetUri.
A reproducing case,
from bunsen.stu3.codes import create_value_sets
version = '0.1'
url_1 = 'urn:test:valueset:testvalueset1'
initial_values_1 = [('urn:test:system1', 'urn:code:a'),
('urn:test:system1', 'urn:code:b'),
('urn:test:system2', 'urn:code:1')]
url_2 = 'urn:test:valueset:testvalueset2'
initial_values_2 = [('urn:test:system1', 'urn:code:a')]
initial = create_value_sets(spark) \
.with_new_value_set(url_1, version, values=initial_values_1) \
.with_new_value_set(url_2, version, values=initial_values_2)
appended_values = [('urn:test:system1', 'urn:code:c'),
('urn:test:system2', 'urn:code:2')]
appended = initial.add_values(url_1, version, appended_values)
fails with
Py4JJavaError: An error occurred while calling o128.withValueSets.
: java.lang.IllegalArgumentException: Cannot add value sets having duplicate valueSetUri and valueSetVersion
at com.cerner.bunsen.stu3.codes.ValueSets.withValueSets(ValueSets.java:116)
This issue can be resolved by aligning the add_values
method with its counterpart in ConceptMaps
, add_mappings
, which takes a new_version
as an additional parameter, keeping the ConceptMaps distinct and immutable along ConceptMapUri and Version.
Following exception is thrown while converting a HAPI object that has contained resource to an Avro SpecificRecord object
Caused by: java.lang.ClassCastException: com.cerner.bunsen.stu3.avro.Provenance cannot be cast to org.apache.avro.generic.GenericData$RecordCaused by: java.lang.ClassCastException: com.cerner.bunsen.stu3.avro.Provenance cannot be cast to org.apache.avro.generic.GenericData$Record at com.cerner.bunsen.avro.converters.DefinitionToAvroVisitor$HapiContainedToAvroConverter.createContained(DefinitionToAvroVisitor.java:258) at com.cerner.bunsen.definitions.HapiContainedConverter.fromHapi(HapiContainedConverter.java:113) at com.cerner.bunsen.definitions.HapiCompositeConverter.fromHapi(HapiCompositeConverter.java:190) at com.cerner.bunsen.avro.AvroConverter.resourceToAvro(AvroConverter.java:173) at
The push_valuesets
function is defined as taking the name of an ontologies database for the purpose of discovering the valuesets [1]. This may be heavy-handed in instances where datascientists want to work with experimental instances of ValueSets, as it would require them to save tables having new versions (which might not make sense of "working" instances of ValueSets).
We should open the Python API to allow the user to provide an instance of ValueSets directly. This change could be performed passively by still defaulting the database parameter to "ontologies" but performing an instance
check against the class ValueSets or loading the valuesets from the source database if a the argument is a string.
This issue may make sense to include in the work of #25.
[1] -- https://github.com/cerner/bunsen/blob/master/python/bunsen/stu3/valuesets.py#L57
When a structure definition contains a modifierExtension element, it is missing in the generated Avro schema. This modifierExtension is being used to specify modifier codes for "Procedure" resource.
A.C:
Bunsen 0.5.4 release
For FHIR codes that are null, HAPI encodes a value of ?
for the value. This results in errors during conversion back to FHIR, as ?
will not be an acceptable value to set as a String.
Our implementation the "in_valueset" UDF needs to be broadened to support Spark Arrays of CodeableConcept rather than one (Row of) CodeableConcept. This can be down by broadening the InValueSetUdf [1] to accept an Object rather than just a Row and then by performing an instanceof
check on the argument.
Spark Row schema generated from DefinitionToSparkRow skips recursively nested elements.
E.g.,
In below element structure, the "assigner" field is skipped in the generated spark schema. Refer attachment.
Patient
-> generalPractitioner (Reference Organization|Practitioner)
--> identifier
---> assigner (Reference Organization)
The Spark schema shall be equivalent to the Avro schema.
Avro Schema for fields of "Element" type doesn't include all the specialized items defined in the corresponding profile. E.g., for "Timing" [1] data-type, the avro schema doesn't include all the children for "repeat" element such as count, bounds etc., instead its defined as base Element type.
[1] https://www.hl7.org/fhir/datatypes.html#Timing
right now Bunsen creates encoder / schema for the entire set of resource elements. Realistically though FHIR servers would return a limited set of vendor specific elements and they would commonly be described in FHIR profile. I think it would simplify greatly schemas in Spark and reduce confusion for users since dataframes will contain only actual elements, support by a vendor.
PySpark 2.3 needs bumped to 2.4.
When writing out new ConceptMaps to an existing table, there can be an exception on column data-type mismatches:
An error occurred while calling o214.writeToDatabase.
: org.apache.spark.sql.AnalysisException: cannot resolve '`useContext`' due to data type mismatch: cannot cast array<struct<id:string,code:struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>,valueQuantity:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,valueRange:struct<id:string,low:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,high:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>>,valueCodeableConcept:struct<id:string,coding:array<struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>>,text:string>>> to array<struct<id:string,code:struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>,valueCodeableConcept:struct<id:string,coding:array<struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>>,text:string>,valueQuantity:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,valueRange:struct<id:string,low:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,high:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>>>>;;
'InsertIntoHadoopFsRelationCommand location/warehouse/ontologies.db/conceptmaps, false, [timestamp#5475], Parquet, Map(serialization.format -> 1, path -> location/warehouse/ontologies.db/conceptmaps), Append, CatalogTable(
Database: ontologies
Table: conceptmaps
Owner: hadoop
Created Time: Mon Aug 06 20:33:27 UTC 2018
Last Access: Thu Jan 01 00:00:00 UTC 1970
Created By: Spark 2.3.0
Type: MANAGED
Provider: parquet
Table Properties: [transient_lastDdlTime=1533587608]
Location: location/warehouse/ontologies.db/conceptmaps
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Storage Properties: [serialization.format=1]
Partition Provider: Catalog
Partition Columns: [`timestamp`]
On analysis it appears the Hive table schema can drift from the dynamically created Spark schema; columns can be in different orders. We can compare gists of the schemas:
Schema for a Dataset<ConceptMap>
using Bunsen 0.4.5, using HAPI 3.3.0.
Schema for an existing ConceptMap table, built on a previous version of Bunsen. This schema differs from the first in column order of SourceUri/SourceReference, TargetUri/TargetReference, and useContext.valueQuantity fields (valueQuantity being in a different position is what is conveyed by the error message at the top).
Schema for a new ConceptMap table, built from the Dataset. This schema matches the first.
Even if we load the original table using Bunsen APIs
ontologies_maps = get_concept_maps(spark, "ontologies")
ontologies_maps.get_maps().printSchema()
as opposed to Spark APIs
spark.table("ontologies.conceptmaps").printSchema()
the result is still a mismatch to the Dataset<ConceptMap>
we'd intend to write.
I don't think this is related to issues we've seen with Spark in the past, where we have to explicitly SELECT
columns in a particular order to avoid data being written under the wrong column.
I think this is an issue related to the order the a RuntimeElement returns information about its children in the EncoderBuilder
. Digging into ConceptMap.useContext.value
, comparing the Encoder
schema for different versions of Bunsen again, we see the differences seen at the table/dataset schema level, and if we dig more deeply into the EncoderBuilder
runtime, we find that depending on the HAPI version, we get different orders in the ChoiceType children for ConceptMap.useContext.value
, and those orders match the differences we see in the Dataset and table schemas.
This amounts to tables for a given STU release being subject to non-passive changes (even though updates within HAPI for an STU release should be purely passive in regards to the resources).
The simplest thing to do is to just drop/archive the tables and rebuild them with the latest Bunsen version, but this requirement might be unexpected to users while consuming Bunsen over a HAPI version on the same FHIR STU release.
A Docker image based on the pyspark-notebook would be a convenient way for users to experiment with Bunsen. Logging this to create one.
Bunsen contains several instances of Spark Expression
(see StaticField, ObjectCast, etc.), which should be removed from the codebase once committed into SparkSQL-proper, and the FHIREncoder should be refactored to make use of the newly included Expressions from Spark.
I am currently attempting to read in FHIR Bundles from a directory that contains JSON files and then extract certain resource types to Spark Datasets. While Datasets are being successfully created, Extensions that were part of resources in my FHIR bundle are being dropped altogether.
If I am looking at the correct places in code, it seems like lack of Extension support was a conscious decision:
I would like to be able to create Datasets for FHIR resources that still contain the Extensions from the original resources.
Using Bunsen 0.4.9
Run this Scala code (or Java equivalent):
object BunsenExample {
def main(args: Array[String]): Unit = {
failBundles()
}
def failBundles(): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.sql.crossJoin.enabled", "true")
val spark = SparkSession.builder().config(conf).getOrCreate()
val data = Bundles.forStu3().loadFromDirectory(spark, "/path/to/bundles/with/resource/extensions", 2).cache()
val patients = Bundles.forStu3().extractEntry(spark, data, "Patient")
patients.show()
patients.printSchema()
}
}
The patients
dataset will not contain the extensions that were originally part of the Patient FHIR resources in the bundle. There does not appear to be a place for extensions to exist in the schema for the Dataset. I verified that the Extensions are being parsed successfully and are accessible through the BundleContainers returned if you run data.collect()
and dive into the result.
Add support for Extensions to be included in Datasets when they are created by extracting resources from a collection of FHIR Bundles.
When using extract_entry to extract resources like imagingstudy from a bundle, there is an error that says "Unknown primitive type: org.hl7.fhir.dstu3.model.OidType"
0.4.9
In [0.5.0-dev] branch bunsen-spark
module, Bundles
java class extractEntry
api needs additional overloads to accept contained resources list, in order to read FHIR bundles with contained resources.
The current hashcode implementations appear to follow the pattern here. For convenience I'll copy the method here:
@Override
public int hashCode() {
return 17 * url.hashCode() * version.hashCode();
}
If the hashcodes collide I don't see how a simple multiplication would help matters, and they would certainly still collide if they were actually equal. In principle in some cases it might help if two hashcodes were in the same bucket but not actually equal depending on the bucketing strategy used, but it seems like collisions would be worsened just as often for any bucketing strategy I can think of.
I think what you actually were going for here is probably a variation on Java's strategy for hashing strings, whereby each member hashcode is effectively multiplied by a different power of a prime number and these results summed. See the implementation for Java 7, noting that previous summands onto h are multiplied by 31 again in successive loops. I can't recall the mathematical explanation behind why sums of this form give a better distribution than alternatives, although I remember having studied it at one point - you could probably find it in an algorithms textbook. In any event though we can probably trust that Java's default for this is reasonable.
The easiest way to get this behavior is likely to use the hashCode method on Objects provided in Java 7+. That ends up using the Arrays hashcode which uses the same algorithm as in String.
I don't see a functional problem here since we seem to still be obeying hashcode and equality contracts - the performance in hash-based data structures probably just won't be quite optimal.
There are some cases where it would be useful to follow the FHIR "Contained Resource" pattern, wherein rather than having a resource stand-alone, it is contained within another resource. A good example where this may be useful is for the Provenance resource, which could be contained within its referring resource rather than being independent.
While the HAPI APIs all allow for adding a contained resource, Bunsen would currently not support Spark Datasets of such resources, since the encoder is only created off the static resource definition, "knowledge" of the contained-resources structure wouldn't immediately be available at encoder-creation time.
Getting the following NullPointerExceptions in 2 different environments from executors:
AWS EMR Spark cluster:
java.lang.NullPointerException
at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:262)
at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:408)
at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:334)
at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:408)
at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformRoot(Stu3StructureDefinitions.java:602)
at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transform(Stu3StructureDefinitions.java:522)
at com.cerner.bunsen.spark.SparkRowConverter.forResource(SparkRowConverter.java:97)
at com.cerner.bunsen.spark.Bundles$ToResourceRow.readObject(Bundles.java:454)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
on-prem YARN Spark cluster:
java.lang.NullPointerException at
com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:274) at
com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:420) at
com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:387) at
com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:420) at
com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformRoot(Stu3StructureDefinitions.java:614) at
com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transform(Stu3StructureDefinitions.java:534) at
com.cerner.bunsen.spark.SparkRowConverter.forResource(SparkRowConverter.java:97) at
com.cerner.bunsen.spark.SparkRowConverter.forResource(SparkRowConverter.java:54) at
com.cerner.bunsen.spark.converters.HasSerializableConverter.readObject(HasSerializableConverter.java:38) at
sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:2031) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1612) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:88) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at
org.apache.spark.scheduler.Task.run(Task.scala:121) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)
0.5.6
Note: below snippets include <placeholders> that you would need to replace for recreation.
This issue has never occurred locally, despite my many attempts to write tests to do so. The issue can be intermittent in cluster mode, depending on configuration and environment, but seems to be consistently recreated when using AWS EMR (fixed 25 core r5.4xlarge instances) Step with the following spark-submit argument:
spark-submit --verbose --deploy-mode cluster --master yarn --conf spark.hadoop.fs.s3.maxRetries=20 --conf spark.sql.shuffle.partitions=2003 --conf spark.executor.memory=30g --conf spark.driver.memory=24g --conf spark.executor.cores=3 --conf spark.driver.cores=3 --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=30 --conf spark.yarn.maxAppAttempts=1 --conf spark.python.worker.memory=4g --conf spark.driver.maxResultSize=8g --jars <my_bunsen_consuming_java>.jar --py-files <my_bunsen_consuming_python_java_wrapper>.py,<my_bunsen_consuming_python_java_wrapper>.zip <my_python_script_calling_my_bunsen_consuming_python_java_wrapper_api_to_read_>.py
and AWS EMR cluster configurations:
Classification | Property | Value | Source | |
---|---|---|---|---|
hive-site | javax.jdo.option.ConnectionPassword | <my_connection_password> | Cluster configuration | |
hive-site | javax.jdo.option.ConnectionURL | jdbc:mysql://<my_rds_cluster>.us-west-2.rds.amazonaws.com:3306/<my_rds_name>?createDatabaseIfNotExist=true&verifyServerCertificate=false&useSSL=true&requireSSL=true | Cluster configuration | |
hive-site | javax.jdo.option.ConnectionDriverName | org.mariadb.jdbc.Driver | Cluster configuration | |
hive-site | javax.jdo.option.ConnectionUserName | <my_connection_user> | Cluster configuration | |
spark-defaults | spark.sql.warehouse.dir | s3://<my_warehouse_s3_bucket>/warehouse | Cluster configuration |
Bunsen currently only parses XML resources using an XML parser built from a FhirContext
. We want to support reading JSON resources as well. We can do this by inferring which parser to use based on the file extension. Encountering resources that lack a file extension can throw a runtime exception with a message.
Save resource dataframes to hive tables results in error. It seems to be an error in the
permission of spark saveAsTable to write in hive storage.
Windows 10 with WSL2 (Ubuntu 18.04)
0.4.9 with Jupyter Pyspark/All-spark-notebooks
Has anybody an idea, what I can do about that? I rebuild several times, used different spark bases, coinstalled hadoop to manually change permissions of the spark-warehouse (which failed) and I'm out of ideas now.
Thank you!
`---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
15 bundles,
16 'tutorial_small',
---> 17 resources)
/usr/local/bunsen/python/bunsen/stu3/bundles.py in write_to_database(sparkSession, javaRDD, databaseName, resourceNames)
105 bundles = _bundles(sparkSession._jvm)
106
--> 107 bundles.saveAsDatabase(sparkSession._jsparkSession, javaRDD, databaseName, namesArray)
108
109 def save_as_database(sparkSession, path, databaseName, *resourceNames, **kwargs):
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o88.saveAsDatabase.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:503)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:217)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:474)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:453)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
at com.cerner.bunsen.Bundles.saveAsDatabase(Bundles.java:324)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): ExitCodeException exitCode=1: chmod: changing permissions of '/home/jovyan/work/Projects/FHIR/bunsen/spark-warehouse/tutorial_small.db/allergyintolerance/_temporary/0/_temporary/attempt_20191008144337_0000_m_000000_0/part-00000-afdb5614-2fe2-4ec7-a1d4-25d3d8447784-c000.snappy.parquet': Operation not permitted
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 36 more
Caused by: ExitCodeException exitCode=1: chmod: changing permissions of '/home/jovyan/work/Projects/FHIR/bunsen/spark-warehouse/tutorial_small.db/allergyintolerance/_temporary/0/_temporary/attempt_20191008144337_0000_m_000000_0/part-00000-afdb5614-2fe2-4ec7-a1d4-25d3d8447784-c000.snappy.parquet': Operation not permitted
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
`
Avro Schema generation fails with fields that have identical ChoiceType structures for a given Resource, e.g. Medication.ingredient.item[x] and Medication.package.content.item[x].
This causes a Schema redefinition error.
It should be possible to generate Avro Schemas for Generic Records and SpecificRecords for Resources that have fields with identical ChoiceType structures.
These GenericRecords/SpecificRecords should be de/serializable to/from FHIR objects and Spark Rows and Avro objects.
FHIR R4 will be here soon, so we should support it. The goal is to have a single Bunsen build that will support both STU3 and R4 to keep the user experience as simple as possible.
Currently when setting the IdType of an IAnyResource
element, the Converter for that String primitive type is not invoked, but rather, the String value is set directly to the new Hapi objects values.
This precludes special converters from being applied that might handle String-types differently than the default setting of String.
AC:
Issues were being seen when multiple threads were each building their own BroadcastableValueSets with unique Spark sessions.
BroadcastableValueSets has static Spark encoders which are not guaranteed to be thread safe. This is believed to be the issue.
0.5.2
We found some API improvements that could simplify Bunsen usage when create tutorial content. Specifically, they are:
These are are small changes, so I'm wrapping them up into this issue.
ConceptMaps
and ValueSets
support loading resources from directories. It would be useful to expose that functionality to users through the Python API such that users can load resources from local and distributed filesystems.
Null pointer exception raised while generating Avro schema for any FHIR resource having Extension elements of primitive types (boolean
and integer
) are present in resource' structure definition. However string
type works as expected.
Bunsen 0.5.0 release
In some applications, caller-code may have already broadcast an instance of BroadcastableValueSets
to a cluster, and may wish to provide the broadcast variable reference to register an "in_valueset" UDF.
We should expose a method that takes a Broadcast<BroadcastableValueSets>
for pushUdf
.
Travis CI build is failing with below errors.
Tests in error: checkSystems(com.cerner.bunsen.spark.codes.systems.SnomedTest): Unsupported class file major version 55 testHasParent(com.cerner.bunsen.spark.codes.systems.SnomedTest): Unsupported class file major version 55 checkSystems(com.cerner.bunsen.spark.codes.systems.LoincTest): Unsupported class file major version 55 testHasParent(com.cerner.bunsen.spark.codes.systems.LoincTest): Unsupported class file major version 55 com.cerner.bunsen.spark.codes.broadcast.BroadcastableValueSetsTest: Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars. testMultiNestedExtension(com.cerner.bunsen.spark.SparkRowConverterTest): Unsupported class file major version 55 testMultiModifierExtensionsWithCodeableConceptField(com.cerner.bunsen.spark.SparkRowConverterTest): Unsupported class file major version 55 com.cerner.bunsen.spark.ValueSetUdfsTest: Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars.
Travis CI is picking latest "openjdk11" version, even though we specify jdk to "openjdk8" in Travis CI config file. Logs show below error
Installing openjdk8174travis_setup_java: command not found
The following minimal reproducing test case fails in which we broadcast ConceptMaps having a later version for the same URI:
ConceptMap conceptMapLatest = new ConceptMap();
conceptMapLatest.setUrl("uri:test:concept:map")
.setVersion("1")
.setSource(new UriType("uri:test:source:valueset"))
.setTarget(new UriType("uri:test:target:valueset"));
ConceptMapGroupComponent groupLatest = conceptMapLatest.addGroup()
.setSource("uri:test:source:system")
.setTarget("uri:test:target:system");
groupLatest.addElement().setCode("abc").addTarget().setCode("123");
groupLatest.addElement().setCode("def").addTarget().setCode("xyz");
ConceptMaps maps = ConceptMaps.getEmpty(spark)
.withConceptMaps(conceptMap, conceptMapLatest);
broadcastLatest = maps.broadcast(maps.getLatestVersions(true));
...
@Test
public void testBroadcastLatest() {
BroadcastableConceptMap broadcastableConceptMap = broadcastLatest.getValue()
.getBroadcastConceptMap("uri:test:concept:map");
BroadcastableConceptMap.CodeValue value = broadcastableConceptMap
.getTarget("uri:test:source:system", "abc").get(0);
Assert.assertEquals("uri:test:target:system", value.getSystem());
Assert.assertEquals("123", value.getValue());
value = broadcastableConceptMap.getTarget("uri:test:source:system", "def").get(0);
Assert.assertEquals("uri:test:target:system", value.getSystem());
Assert.assertEquals("xyz", value.getValue());
}
The broadcast
method taking a set of URI-to-Versions needs to filter by the given versions (code seems in place to do this, but is not used) [1].
Not sure if this is an issue or simply a misuse of the package, however, the FHIR JSON that I need to read from is not in bundles, but rather resource based.
I am unable to create a dataframe that is suitable for use with to_bundle using the R4 Python API. I assume the workflow would be something similar to, read files containing single resources in S3 > to_bundle > from_json > extract_entity. Some code I have attempted:
print('Starting text schema')
text_data_frame = spark.read.text("s3://<bucket_name>/WLW-test-1234/Patient")
text_data_frame.printSchema()
text_data_frame.show(100, False)
print("Starting to_bundle")
fhir_bundle = to_bundle(spark, text_data_frame)
Is there some specific format that the DataFrame needs to be in order to use the to_bundle function or is it truly that resources, unbundled, are not supported? From the documentation, "dataset – a DataFrame of encoded FHIR Resources"
Thank you
For FHIR primitive types with maximum cardinality in the structure definition (e.g., "given" in "HumanName"), the corresponding generated Avro schema doesn't generate the field as an Array Type.
Bunsen 0.5.0 release
Contained Resources are currently not supported by the 0.5.0-dev
paradigm of Bunsen.
There are two approaches to this problem:
contained
field of DomainResources
, which itself contains all possible DomainResources
.While the first approach would generate schemas that could be compiled once and then be universally applicable, it would likely also generate unnecessarily complex schemas. For reference-supporting schemas, like Avro, this wouldn't pose a problem. But for schemas that don't support referencing prior structures, rather, requiring full, inline schema declarations, like Spark, schemas would become large and unwieldy.
The second approach is probably more advisable at this time.
Spark 2.3 is out and gaining traction, so we should support it. Ideally we should be able to produce a build that supports both Spark 2.2 and 2.3.
For one of our projects, we store bundles with various resources in a Hive table. That table has several other columns, used for the various purposes. So a table looks like that:
create table fhir_extract (id bigint, col1 string, col2 string, fhir_bundle string);
fhir_bundle can contain various resources (patient, observation, encounter etc.) and resources can have both mandatory and optional elements.
I think at the moment Bunsen does not support loading of resources from a hive table column.
After using these APIs in a notebook, I found it simpler and more intuitive to store customized ontology data in a separate database that can be shared and referenced in code, as opposed to referencing Python objects containing them. Therefore I'd like to use the database name as an optional parameter to let load and work with different databases. Here's an example of how push_valuesets would be used:
push_valuesets(spark,
{'hypertension' : isa_snomed('59621000'),
'heart_rate' : isa_loinc('8867-4'),
database='my_custom_ontologies')
This also reduces the load on a user that just consumes a custom ontology database, since just pointing to a different database eliminates intermediate steps and the need to be aware of our classes.
Of course, I'd expect custom ontologies to be written to the default database in many workflows...this just makes it easy to experiment with a custom database without impacting others.
When loading external concept maps, it's sometime convenient to compute the mappings themselves in their own dataset (rather than reducing them all into a single ConceptMap object). Therefore we should support the ability to import a Dataset of the expanded values along with the underlying concept map.
Spark 2.3.2 is available now and we should uplift Bunsen to use this later, stable version.
Spark 2.4.0 is available now and we should uplift Bunsen to use this latest, stable version.
Hi, I get this error while creating a Dataset with a FHIR type resource. Requesting your help.
Spark 2.2
FHIR DSTU3
bunsen-core-0.1.0
PatientSummary is a custom FHIR Resource
Dataset<PatientSummary> patientSummaryDS = sparkSession.createDataset(patientSummaryRDD.collect(),
encoders.of(PatientSummary.class));
Error:
scala.MatchError: RuntimeElementDirectResource[DirectChildResource, IBaseResource] (of class ca.uhn.fhir.context.RuntimeElementDirectResource)
at com.cerner.bunsen.SchemaConverter.com$cerner$bunsen$SchemaConverter$$childToFields(SchemaConverter.scala:119)
at com.cerner.bunsen.SchemaConverter$$anonfun$1.apply(SchemaConverter.scala:161)
at com.cerner.bunsen.SchemaConverter$$anonfun$1.apply(SchemaConverter.scala:161)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at com.cerner.bunsen.SchemaConverter.compositeToStructType(SchemaConverter.scala:161)
at com.cerner.bunsen.EncoderBuilder$.of(EncoderBuilder.scala:43)
at com.cerner.bunsen.EncoderBuilder.of(EncoderBuilder.scala)
at com.cerner.bunsen.FhirEncoders.of(FhirEncoders.java:64)
A FHIR ValueSet is described as
... a set of codes drawn from one or more code systems.
It would be useful to be able to read and broadcast FHIR ValueSets in a manner similar to how BroadcastableValueSets
supports LOINC and SOMED codes. FHIR ValueSets are perhaps a simplified form of the relationship-structured LOINC and SNOMED, as FHIR ValueSets have no notion of an ancestor or descendant.
At the moment, our BroadcastableValueSets
uses the ConceptMaps
class underneath, where the Source and Target systems are the same. I think there's some room for simplification as part of this issue, in which we create a ValueSets
class that can capture collections with relationships between their codes (e.g. LOINC, SNOMED), and collections that are flag (e.g. a FHIR ValueSet). ConceptMaps
can be disentangled from that implementation and left to be exposed as BroadcastableConceptMaps
for mappings between distinct Source and Target systems.
Right now the getContainer
method for Avro expects GenericData.Array
as the container list, but can clash with a Java ArrayList
type.
We should make sure we're using GenericData.Array throughout for Avro lists, but also loosen the type restriction to match the interface, not the implementation in getContainer
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.