Giter Site home page Giter Site logo

cerner / bunsen Goto Github PK

View Code? Open in Web Editor NEW
114.0 114.0 49.0 12.02 MB

Explore, transform, and analyze FHIR data with Apache Spark

Home Page: https://engineering.cerner.com/bunsen

License: Apache License 2.0

Java 87.69% Scala 1.38% Makefile 0.11% Python 10.72% Dockerfile 0.09%

bunsen's People

Contributors

amarvakul avatar bdrillard avatar dependabot[bot] avatar johngrimes avatar kthakore avatar ntlanglois avatar paulhartwell avatar rbrush avatar sanumula avatar sc036868 avatar ysmwei avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bunsen's Issues

[Question] Why is R4 support disabled on latest version (0.5.6)?

Description of Issue

I've tried to use the R4 version functions from PySpark but it seems it cannot find the java classes for R4 on the jar. I've checked the project code and it seems that the R4 module is not included.
When I try 0.4.6 version, R4 functions are present and working.

It will be support for R4 in the near future? Why is R4 support disabled on the latest versions?

Clean Encoder.of API to Support Unambiguous Parameters

#46 recently added support for Contained resources. The listing of contained resources could be supplied by a variadic parameter in Java. However, in consumption of the API in other projects, calls to of that provided no Contained resources could not be properly resolved. This issue seemed to affect the ValueSets and ConceptMaps classes in particular: the classes would fail to initialize over the private Encoder variable for the resource type.

This issue did not present itself during unit/integration testing in Bunsen proper.

The fix would be to add an additional method that unambiguously supports a single Resource argument, without any Contained arguments.

DSTU2 Support

A lot of systems continue to use DSTU2, so we should add it as an option to Bunsen.

latest_version Broken

The latest_version method in ConceptMaps and ValueSets is broken with missing references to self.

We should fix the function body and also add tests over the method.

add_mappings Broken and Missing Parameter

The add_mappings function is missing a java_package argument in its call to _add_mappings_to_map.

Additionally, the function will fail in any case, since all ConceptMaps must have a unique URI&Version, and attempting to add new mappings to an existing ConceptMap without updating the version violates this requirement.

We should fix the body of the function and additionally add a new_version parameter that will give a new version to the updated map.

ValueSets add_values method fails

The add_values method against ValueSets currently fails when adding additional values to an existing ValueSet instance by ValueSetUri.

A reproducing case,

from bunsen.stu3.codes import create_value_sets

version = '0.1'

url_1 = 'urn:test:valueset:testvalueset1'
initial_values_1 = [('urn:test:system1', 'urn:code:a'),
                    ('urn:test:system1', 'urn:code:b'),
                    ('urn:test:system2', 'urn:code:1')]

url_2 = 'urn:test:valueset:testvalueset2'
initial_values_2 = [('urn:test:system1', 'urn:code:a')]

initial = create_value_sets(spark) \
    .with_new_value_set(url_1, version, values=initial_values_1) \
    .with_new_value_set(url_2, version, values=initial_values_2)

appended_values = [('urn:test:system1', 'urn:code:c'),
                   ('urn:test:system2', 'urn:code:2')]

appended = initial.add_values(url_1, version, appended_values)

fails with

Py4JJavaError: An error occurred while calling o128.withValueSets.
: java.lang.IllegalArgumentException: Cannot add value sets having duplicate valueSetUri and valueSetVersion
	at com.cerner.bunsen.stu3.codes.ValueSets.withValueSets(ValueSets.java:116)

This issue can be resolved by aligning the add_values method with its counterpart in ConceptMaps, add_mappings, which takes a new_version as an additional parameter, keeping the ConceptMaps distinct and immutable along ConceptMapUri and Version.

[0.5.0-Dev] type cast exception while converting from hapi object to avro specific record

Please fill out the below template as best you can.

Description of Issue

Following exception is thrown while converting a HAPI object that has contained resource to an Avro SpecificRecord object

Caused by: java.lang.ClassCastException: com.cerner.bunsen.stu3.avro.Provenance cannot be cast to org.apache.avro.generic.GenericData$RecordCaused by: java.lang.ClassCastException: com.cerner.bunsen.stu3.avro.Provenance cannot be cast to org.apache.avro.generic.GenericData$Record at com.cerner.bunsen.avro.converters.DefinitionToAvroVisitor$HapiContainedToAvroConverter.createContained(DefinitionToAvroVisitor.java:258) at com.cerner.bunsen.definitions.HapiContainedConverter.fromHapi(HapiContainedConverter.java:113) at com.cerner.bunsen.definitions.HapiCompositeConverter.fromHapi(HapiCompositeConverter.java:190) at com.cerner.bunsen.avro.AvroConverter.resourceToAvro(AvroConverter.java:173) at

push_valuesets should take an instance of ValueSest

The push_valuesets function is defined as taking the name of an ontologies database for the purpose of discovering the valuesets [1]. This may be heavy-handed in instances where datascientists want to work with experimental instances of ValueSets, as it would require them to save tables having new versions (which might not make sense of "working" instances of ValueSets).

We should open the Python API to allow the user to provide an instance of ValueSets directly. This change could be performed passively by still defaulting the database parameter to "ontologies" but performing an instance check against the class ValueSets or loading the valuesets from the source database if a the argument is a string.

This issue may make sense to include in the work of #25.

[1] -- https://github.com/cerner/bunsen/blob/master/python/bunsen/stu3/valuesets.py#L57

modifierExtension element is missing in the generated Avro schema

Description of Issue

When a structure definition contains a modifierExtension element, it is missing in the generated Avro schema. This modifierExtension is being used to specify modifier codes for "Procedure" resource.
A.C:

  • Avro schema/Java shall include modifierExtension
  • Corresponding Avro to Hapi converters and vice-versa shall support this special field modifierExtension
  • Corresponding Spark Row to Hapi converters and vice-versa shall support this special field modifierExtension

System Configuration

Project Version

Bunsen 0.5.4 release

Support Null Codes in String Conversions

For FHIR codes that are null, HAPI encodes a value of ? for the value. This results in errors during conversion back to FHIR, as ? will not be an acceptable value to set as a String.

[0.5.0-dev] Spark Row schema skips recursively nested elements

Description of Issue

Spark Row schema generated from DefinitionToSparkRow skips recursively nested elements.

E.g.,
In below element structure, the "assigner" field is skipped in the generated spark schema. Refer attachment.

Patient
-> generalPractitioner (Reference Organization|Practitioner)
--> identifier
---> assigner (Reference Organization)

Expected Outcomes

The Spark schema shall be equivalent to the Avro schema.

patient-spark-row-schema.log

[0.5.0-Dev] Avro Schema generation for fields of "Element" type are not expanded

Please fill out the below template as best you can.

Description of Issue

Avro Schema for fields of "Element" type doesn't include all the specialized items defined in the corresponding profile. E.g., for "Timing" [1] data-type, the avro schema doesn't include all the children for "repeat" element such as count, bounds etc., instead its defined as base Element type.

[1] https://www.hl7.org/fhir/datatypes.html#Timing

Expected Outcomes

  • If a field of type "Element" is encountered, the corresponding schema should be defined as a specialized type that includes all the children.

Support FHIR profiles to use only used elements

right now Bunsen creates encoder / schema for the entire set of resource elements. Realistically though FHIR servers would return a limited set of vendor specific elements and they would commonly be described in FHIR profile. I think it would simplify greatly schemas in Spark and reduce confusion for users since dataframes will contain only actual elements, support by a vendor.

Writes Fail due to Column Mismatch

When writing out new ConceptMaps to an existing table, there can be an exception on column data-type mismatches:

An error occurred while calling o214.writeToDatabase.
: org.apache.spark.sql.AnalysisException: cannot resolve '`useContext`' due to data type mismatch: cannot cast array<struct<id:string,code:struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>,valueQuantity:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,valueRange:struct<id:string,low:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,high:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>>,valueCodeableConcept:struct<id:string,coding:array<struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>>,text:string>>> to array<struct<id:string,code:struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>,valueCodeableConcept:struct<id:string,coding:array<struct<id:string,system:string,version:string,code:string,display:string,userSelected:boolean>>,text:string>,valueQuantity:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,valueRange:struct<id:string,low:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>,high:struct<id:string,value:decimal(12,4),comparator:string,unit:string,system:string,code:string>>>>;;
'InsertIntoHadoopFsRelationCommand location/warehouse/ontologies.db/conceptmaps, false, [timestamp#5475], Parquet, Map(serialization.format -> 1, path -> location/warehouse/ontologies.db/conceptmaps), Append, CatalogTable(
Database: ontologies
Table: conceptmaps
Owner: hadoop
Created Time: Mon Aug 06 20:33:27 UTC 2018
Last Access: Thu Jan 01 00:00:00 UTC 1970
Created By: Spark 2.3.0
Type: MANAGED
Provider: parquet
Table Properties: [transient_lastDdlTime=1533587608]
Location: location/warehouse/ontologies.db/conceptmaps
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Storage Properties: [serialization.format=1]
Partition Provider: Catalog
Partition Columns: [`timestamp`]

On analysis it appears the Hive table schema can drift from the dynamically created Spark schema; columns can be in different orders. We can compare gists of the schemas:

Schema for a Dataset<ConceptMap> using Bunsen 0.4.5, using HAPI 3.3.0.

Schema for an existing ConceptMap table, built on a previous version of Bunsen. This schema differs from the first in column order of SourceUri/SourceReference, TargetUri/TargetReference, and useContext.valueQuantity fields (valueQuantity being in a different position is what is conveyed by the error message at the top).

Schema for a new ConceptMap table, built from the Dataset. This schema matches the first.

Even if we load the original table using Bunsen APIs

ontologies_maps = get_concept_maps(spark, "ontologies")

ontologies_maps.get_maps().printSchema()

as opposed to Spark APIs

spark.table("ontologies.conceptmaps").printSchema()

the result is still a mismatch to the Dataset<ConceptMap> we'd intend to write.

I don't think this is related to issues we've seen with Spark in the past, where we have to explicitly SELECT columns in a particular order to avoid data being written under the wrong column.

I think this is an issue related to the order the a RuntimeElement returns information about its children in the EncoderBuilder. Digging into ConceptMap.useContext.value, comparing the Encoder schema for different versions of Bunsen again, we see the differences seen at the table/dataset schema level, and if we dig more deeply into the EncoderBuilder runtime, we find that depending on the HAPI version, we get different orders in the ChoiceType children for ConceptMap.useContext.value, and those orders match the differences we see in the Dataset and table schemas.

This amounts to tables for a given STU release being subject to non-passive changes (even though updates within HAPI for an STU release should be purely passive in regards to the resources).

The simplest thing to do is to just drop/archive the tables and rebuild them with the latest Bunsen version, but this requirement might be unexpected to users while consuming Bunsen over a HAPI version on the same FHIR STU release.

Remove Objects Code

Bunsen contains several instances of Spark Expression (see StaticField, ObjectCast, etc.), which should be removed from the codebase once committed into SparkSQL-proper, and the FHIREncoder should be refactored to make use of the newly included Expressions from Spark.

Support FHIR Extensions in Spark Datasets

Please fill out the below template as best you can.

Description of Issue

I am currently attempting to read in FHIR Bundles from a directory that contains JSON files and then extract certain resource types to Spark Datasets. While Datasets are being successfully created, Extensions that were part of resources in my FHIR bundle are being dropped altogether.

If I am looking at the correct places in code, it seems like lack of Extension support was a conscious decision:

// Contained resources and extensions not yet supported.

// Contained resources and extensions not yet supported.

I would like to be able to create Datasets for FHIR resources that still contain the Extensions from the original resources.

System Configuration

Project Version

Using Bunsen 0.4.9

Steps to Reproduce the Issue

Run this Scala code (or Java equivalent):

object BunsenExample {
  def main(args: Array[String]): Unit = {
    failBundles()
  }

  def failBundles(): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .set("spark.sql.crossJoin.enabled", "true")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    
    val data = Bundles.forStu3().loadFromDirectory(spark, "/path/to/bundles/with/resource/extensions", 2).cache()

    val patients = Bundles.forStu3().extractEntry(spark, data, "Patient")
    patients.show()
    patients.printSchema()
  }
}

The patients dataset will not contain the extensions that were originally part of the Patient FHIR resources in the bundle. There does not appear to be a place for extensions to exist in the schema for the Dataset. I verified that the Extensions are being parsed successfully and are accessible through the BundleContainers returned if you run data.collect() and dive into the result.

Expected Outcomes

Add support for Extensions to be included in Datasets when they are created by extracting resources from a collection of FHIR Bundles.

Support oid data types in resources (Eg STU3 - ImagingStudy)

Please fill out the below template as best you can.

Description of Issue

When using extract_entry to extract resources like imagingstudy from a bundle, there is an error that says "Unknown primitive type: org.hl7.fhir.dstu3.model.OidType"

System Configuration

Project Version

0.4.9

Additional Details (optional)

Steps to Reproduce the Issue

  1. Use the load_from_directory api to load a bundle that contains an ImagingStudy resource with an uid attribute
  2. Attempt to extract the imaging study like so ... imagingstudies = extract_entry(spark, bundles, 'imagingstudy')
  3. An error gets thrown with the message "Unknown primitive type: org.hl7.fhir.dstu3.model.OidType"

Expected Outcomes

  • Successfully load and extract the ImagingStudy resource type in STU3

Use standard JVM library methods to mix hashcodes

The current hashcode implementations appear to follow the pattern here. For convenience I'll copy the method here:

  @Override
  public int hashCode() {

    return 17 * url.hashCode() * version.hashCode();
  }

If the hashcodes collide I don't see how a simple multiplication would help matters, and they would certainly still collide if they were actually equal. In principle in some cases it might help if two hashcodes were in the same bucket but not actually equal depending on the bucketing strategy used, but it seems like collisions would be worsened just as often for any bucketing strategy I can think of.

I think what you actually were going for here is probably a variation on Java's strategy for hashing strings, whereby each member hashcode is effectively multiplied by a different power of a prime number and these results summed. See the implementation for Java 7, noting that previous summands onto h are multiplied by 31 again in successive loops. I can't recall the mathematical explanation behind why sums of this form give a better distribution than alternatives, although I remember having studied it at one point - you could probably find it in an algorithms textbook. In any event though we can probably trust that Java's default for this is reasonable.

The easiest way to get this behavior is likely to use the hashCode method on Objects provided in Java 7+. That ends up using the Arrays hashcode which uses the same algorithm as in String.

I don't see a functional problem here since we seem to still be obeying hashcode and equality contracts - the performance in hash-based data structures probably just won't be quite optimal.

Support for Contained Resources

There are some cases where it would be useful to follow the FHIR "Contained Resource" pattern, wherein rather than having a resource stand-alone, it is contained within another resource. A good example where this may be useful is for the Provenance resource, which could be contained within its referring resource rather than being independent.

While the HAPI APIs all allow for adding a contained resource, Bunsen would currently not support Spark Datasets of such resources, since the encoder is only created off the static resource definition, "knowledge" of the contained-resources structure wouldn't immediately be available at encoder-creation time.

SparkRowConverter.forResource seems to not be thread-safe based on intermittent NullPointerException in cluster mode

Description of Issue

Getting the following NullPointerExceptions in 2 different environments from executors:

  1. AWS EMR Spark cluster:

    java.lang.NullPointerException
    at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:262)
    at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:408)
    at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:334)
    at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:408)
    at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformRoot(Stu3StructureDefinitions.java:602)
    at com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transform(Stu3StructureDefinitions.java:522)
    at com.cerner.bunsen.spark.SparkRowConverter.forResource(SparkRowConverter.java:97)
    at com.cerner.bunsen.spark.Bundles$ToResourceRow.readObject(Bundles.java:454)
    at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

  2. on-prem YARN Spark cluster:

    java.lang.NullPointerException at
    com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:274) at
    com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:420) at
    com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.elementToFields(Stu3StructureDefinitions.java:387) at
    com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformChildren(Stu3StructureDefinitions.java:420) at
    com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transformRoot(Stu3StructureDefinitions.java:614) at
    com.cerner.bunsen.definitions.stu3.Stu3StructureDefinitions.transform(Stu3StructureDefinitions.java:534) at
    com.cerner.bunsen.spark.SparkRowConverter.forResource(SparkRowConverter.java:97) at
    com.cerner.bunsen.spark.SparkRowConverter.forResource(SparkRowConverter.java:54) at
    com.cerner.bunsen.spark.converters.HasSerializableConverter.readObject(HasSerializableConverter.java:38) at
    sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readArray(ObjectInputStream.java:2031) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1612) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at
    sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
    java.lang.reflect.Method.invoke(Method.java:498) at
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2234) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343) at
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267) at
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125) at
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) at
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at
    org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at
    org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:88) at
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at
    org.apache.spark.scheduler.Task.run(Task.scala:121) at
    org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
    java.lang.Thread.run(Thread.java:748)

System Configuration

Project Version

0.5.6

Additional Details (optional)

Steps to Reproduce the Issue

Note: below snippets include <placeholders> that you would need to replace for recreation.
This issue has never occurred locally, despite my many attempts to write tests to do so. The issue can be intermittent in cluster mode, depending on configuration and environment, but seems to be consistently recreated when using AWS EMR (fixed 25 core r5.4xlarge instances) Step with the following spark-submit argument:

spark-submit --verbose --deploy-mode cluster --master yarn --conf spark.hadoop.fs.s3.maxRetries=20 --conf spark.sql.shuffle.partitions=2003 --conf spark.executor.memory=30g --conf spark.driver.memory=24g --conf spark.executor.cores=3 --conf spark.driver.cores=3 --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=30 --conf spark.yarn.maxAppAttempts=1 --conf spark.python.worker.memory=4g --conf spark.driver.maxResultSize=8g --jars <my_bunsen_consuming_java>.jar --py-files <my_bunsen_consuming_python_java_wrapper>.py,<my_bunsen_consuming_python_java_wrapper>.zip <my_python_script_calling_my_bunsen_consuming_python_java_wrapper_api_to_read_>.py

and AWS EMR cluster configurations:

Classification Property Value Source  
hive-site javax.jdo.option.ConnectionPassword <my_connection_password> Cluster configuration  
hive-site javax.jdo.option.ConnectionURL jdbc:mysql://<my_rds_cluster>.us-west-2.rds.amazonaws.com:3306/<my_rds_name>?createDatabaseIfNotExist=true&verifyServerCertificate=false&useSSL=true&requireSSL=true Cluster configuration  
hive-site javax.jdo.option.ConnectionDriverName org.mariadb.jdbc.Driver Cluster configuration  
hive-site javax.jdo.option.ConnectionUserName <my_connection_user> Cluster configuration  
spark-defaults spark.sql.warehouse.dir s3://<my_warehouse_s3_bucket>/warehouse Cluster configuration

Expected Outcomes

  • This NullPointerException should never occur, and the fact that sometimes it does not, and runs completely successfully as expected, indicates that the content itself is not the issue. The times when it succeeds as expected is the expected outcome all the time.

Parse both XML and JSON Resources

Bunsen currently only parses XML resources using an XML parser built from a FhirContext. We want to support reading JSON resources as well. We can do this by inferring which parser to use based on the file extension. Encountering resources that lack a file extension can throw a runtime exception with a message.

Write resources to a database - saveAsTable/chmod / Operation not permitted

Description of Issue

Save resource dataframes to hive tables results in error. It seems to be an error in the
permission of spark saveAsTable to write in hive storage.

System Configuration

Windows 10 with WSL2 (Ubuntu 18.04)

Project Version

0.4.9 with Jupyter Pyspark/All-spark-notebooks

Steps to Reproduce the Issue

  1. Run original tutorial "getting_started" with docker
  2. At writing resources to a database exits with a SparkException:

Expected Outcomes

  • Dataframes are written as tables in database.

Has anybody an idea, what I can do about that? I rebuild several times, used different spark bases, coinstalled hadoop to manually change permissions of the spark-warehouse (which failed) and I'm out of ideas now.

Thank you!

`---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
15 bundles,
16 'tutorial_small',
---> 17 resources)

/usr/local/bunsen/python/bunsen/stu3/bundles.py in write_to_database(sparkSession, javaRDD, databaseName, resourceNames)
105 bundles = _bundles(sparkSession._jvm)
106
--> 107 bundles.saveAsDatabase(sparkSession._jsparkSession, javaRDD, databaseName, namesArray)
108
109 def save_as_database(sparkSession, path, databaseName, *resourceNames, **kwargs):

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o88.saveAsDatabase.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:503)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:217)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:474)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:453)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
at com.cerner.bunsen.Bundles.saveAsDatabase(Bundles.java:324)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): ExitCodeException exitCode=1: chmod: changing permissions of '/home/jovyan/work/Projects/FHIR/bunsen/spark-warehouse/tutorial_small.db/allergyintolerance/_temporary/0/_temporary/attempt_20191008144337_0000_m_000000_0/part-00000-afdb5614-2fe2-4ec7-a1d4-25d3d8447784-c000.snappy.parquet': Operation not permitted

at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 36 more
Caused by: ExitCodeException exitCode=1: chmod: changing permissions of '/home/jovyan/work/Projects/FHIR/bunsen/spark-warehouse/tutorial_small.db/allergyintolerance/_temporary/0/_temporary/attempt_20191008144337_0000_m_000000_0/part-00000-afdb5614-2fe2-4ec7-a1d4-25d3d8447784-c000.snappy.parquet': Operation not permitted

at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

`

[0.5.0-Dev] Avro Schema generation for fields with identical ChoiceTypes fails

Description of Issue

Avro Schema generation fails with fields that have identical ChoiceType structures for a given Resource, e.g. Medication.ingredient.item[x] and Medication.package.content.item[x].

This causes a Schema redefinition error.

Expected Outcomes

It should be possible to generate Avro Schemas for Generic Records and SpecificRecords for Resources that have fields with identical ChoiceType structures.

These GenericRecords/SpecificRecords should be de/serializable to/from FHIR objects and Spark Rows and Avro objects.

FHIR R4 Support

FHIR R4 will be here soon, so we should support it. The goal is to have a single Bunsen build that will support both STU3 and R4 to keep the user experience as simple as possible.

IdType Should Apply Converter

Currently when setting the IdType of an IAnyResource element, the Converter for that String primitive type is not invoked, but rather, the String value is set directly to the new Hapi objects values.

This precludes special converters from being applied that might handle String-types differently than the default setting of String.

AC:

  • HapiCompositeConverter should make use of the HapiConverter given for the Id element child.

BroadcastableValueSets threading issue

Description of Issue

Issues were being seen when multiple threads were each building their own BroadcastableValueSets with unique Spark sessions.

BroadcastableValueSets has static Spark encoders which are not guaranteed to be thread safe. This is believed to be the issue.

System Configuration

Project Version

0.5.2

Simplify APIs for tutorial usage

We found some API improvements that could simplify Bunsen usage when create tutorial content. Specifically, they are:

  • Offer the ability to import valuesets from bundles
  • Allow valuesets to be broadcast without requiring the ontologies databases to be built if they are not needed
  • Offer an API that accepts an RDD of bundles and writes each entity out to disk.

These are are small changes, so I'm wrapping them up into this issue.

Expose "withDirectory" to Python

ConceptMaps and ValueSets support loading resources from directories. It would be useful to expose that functionality to users through the Python API such that users can load resources from local and distributed filesystems.

Schema generation fails for extension elements of certain types

Description of Issue

Null pointer exception raised while generating Avro schema for any FHIR resource having Extension elements of primitive types (boolean and integer) are present in resource' structure definition. However string type works as expected.

System Configuration

Bunsen 0.5.0 release

Expected Outcomes

  • Avro schema and Spark schema shall be generated successfully for FHIR resources containing extension elements.

ValueSetUdfs Should Accept Already-Broadcast Instance

In some applications, caller-code may have already broadcast an instance of BroadcastableValueSets to a cluster, and may wish to provide the broadcast variable reference to register an "in_valueset" UDF.

We should expose a method that takes a Broadcast<BroadcastableValueSets> for pushUdf.

Travis CI build failure

Please fill out the below template as best you can.

Description of Issue

Travis CI build is failing with below errors.

Tests in error: checkSystems(com.cerner.bunsen.spark.codes.systems.SnomedTest): Unsupported class file major version 55 testHasParent(com.cerner.bunsen.spark.codes.systems.SnomedTest): Unsupported class file major version 55 checkSystems(com.cerner.bunsen.spark.codes.systems.LoincTest): Unsupported class file major version 55 testHasParent(com.cerner.bunsen.spark.codes.systems.LoincTest): Unsupported class file major version 55 com.cerner.bunsen.spark.codes.broadcast.BroadcastableValueSetsTest: Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars. testMultiNestedExtension(com.cerner.bunsen.spark.SparkRowConverterTest): Unsupported class file major version 55 testMultiModifierExtensionsWithCodeableConceptField(com.cerner.bunsen.spark.SparkRowConverterTest): Unsupported class file major version 55 com.cerner.bunsen.spark.ValueSetUdfsTest: Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars.

Travis CI is picking latest "openjdk11" version, even though we specify jdk to "openjdk8" in Travis CI config file. Logs show below error

Installing openjdk8174travis_setup_java: command not found

Broadcast of BroadcastableConceptMaps Fails to Find Latest

The following minimal reproducing test case fails in which we broadcast ConceptMaps having a later version for the same URI:

ConceptMap conceptMapLatest = new ConceptMap();

conceptMapLatest.setUrl("uri:test:concept:map")
        .setVersion("1")
        .setSource(new UriType("uri:test:source:valueset"))
        .setTarget(new UriType("uri:test:target:valueset"));

ConceptMapGroupComponent groupLatest = conceptMapLatest.addGroup()
        .setSource("uri:test:source:system")
        .setTarget("uri:test:target:system");

groupLatest.addElement().setCode("abc").addTarget().setCode("123");
groupLatest.addElement().setCode("def").addTarget().setCode("xyz");

ConceptMaps maps = ConceptMaps.getEmpty(spark)
        .withConceptMaps(conceptMap, conceptMapLatest);

broadcastLatest = maps.broadcast(maps.getLatestVersions(true));

...

@Test
public void testBroadcastLatest() {

BroadcastableConceptMap broadcastableConceptMap = broadcastLatest.getValue()
            .getBroadcastConceptMap("uri:test:concept:map");

BroadcastableConceptMap.CodeValue value = broadcastableConceptMap
        .getTarget("uri:test:source:system", "abc").get(0);

Assert.assertEquals("uri:test:target:system", value.getSystem());
Assert.assertEquals("123", value.getValue());

    value = broadcastableConceptMap.getTarget("uri:test:source:system", "def").get(0);

    Assert.assertEquals("uri:test:target:system", value.getSystem());
    Assert.assertEquals("xyz", value.getValue());
}

The broadcast method taking a set of URI-to-Versions needs to filter by the given versions (code seems in place to do this, but is not used) [1].

[1] -- https://github.com/cerner/bunsen/blob/master/bunsen-stu3/src/main/java/com/cerner/bunsen/stu3/codes/ConceptMaps.java#L313

Unable to Read from Standalone Resources

Not sure if this is an issue or simply a misuse of the package, however, the FHIR JSON that I need to read from is not in bundles, but rather resource based.

I am unable to create a dataframe that is suitable for use with to_bundle using the R4 Python API. I assume the workflow would be something similar to, read files containing single resources in S3 > to_bundle > from_json > extract_entity. Some code I have attempted:

print('Starting text schema')
text_data_frame = spark.read.text("s3://<bucket_name>/WLW-test-1234/Patient")
text_data_frame.printSchema()
text_data_frame.show(100, False)

print("Starting to_bundle")
fhir_bundle = to_bundle(spark, text_data_frame)

Is there some specific format that the DataFrame needs to be in order to use the to_bundle function or is it truly that resources, unbundled, are not supported? From the documentation, "dataset – a DataFrame of encoded FHIR Resources"

Thank you

Primitive Types with maximum cardinality is not supported

Please fill out the below template as best you can.

Description of Issue

For FHIR primitive types with maximum cardinality in the structure definition (e.g., "given" in "HumanName"), the corresponding generated Avro schema doesn't generate the field as an Array Type.

System Configuration

Project Version

Bunsen 0.5.0 release

[0.5.0-dev] support contained resources

Contained Resources are currently not supported by the 0.5.0-dev paradigm of Bunsen.

There are two approaches to this problem:

  1. Define a List of ResourceContainer schema, wherein a list of structures is rooted at the contained field of DomainResources, which itself contains all possible DomainResources.
  2. Allow users to specify which Resources they'd like contained to other Resources, in a similar manner as specifying extensions to a Resource.

While the first approach would generate schemas that could be compiled once and then be universally applicable, it would likely also generate unnecessarily complex schemas. For reference-supporting schemas, like Avro, this wouldn't pose a problem. But for schemas that don't support referencing prior structures, rather, requiring full, inline schema declarations, like Spark, schemas would become large and unwieldy.

The second approach is probably more advisable at this time.

Spark 2.3 Support

Spark 2.3 is out and gaining traction, so we should support it. Ideally we should be able to produce a build that supports both Spark 2.2 and 2.3.

load bundles from a Hive table

For one of our projects, we store bundles with various resources in a Hive table. That table has several other columns, used for the various purposes. So a table looks like that:

create table fhir_extract (id bigint, col1 string, col2 string, fhir_bundle string);

fhir_bundle can contain various resources (patient, observation, encounter etc.) and resources can have both mandatory and optional elements.

I think at the moment Bunsen does not support loading of resources from a hive table column.

Align Python ValueSets/Hierarchies API with database usage

After using these APIs in a notebook, I found it simpler and more intuitive to store customized ontology data in a separate database that can be shared and referenced in code, as opposed to referencing Python objects containing them. Therefore I'd like to use the database name as an optional parameter to let load and work with different databases. Here's an example of how push_valuesets would be used:

push_valuesets(spark,
               {'hypertension'  : isa_snomed('59621000'),
                'heart_rate' : isa_loinc('8867-4'),
                database='my_custom_ontologies')

This also reduces the load on a user that just consumes a custom ontology database, since just pointing to a different database eliminates intermediate steps and the need to be aware of our classes.

Of course, I'd expect custom ontologies to be written to the default database in many workflows...this just makes it easy to experiment with a custom database without impacting others.

Support for importing a dataset of mapping records

When loading external concept maps, it's sometime convenient to compute the mappings themselves in their own dataset (rather than reducing them all into a single ConceptMap object). Therefore we should support the ability to import a Dataset of the expanded values along with the underlying concept map.

Uplift for Spark 2.3.2

Spark 2.3.2 is available now and we should uplift Bunsen to use this later, stable version.

Uplift for Spark 2.4.0

Spark 2.4.0 is available now and we should uplift Bunsen to use this latest, stable version.

Creating Spark Dataset throws scala.MatchError

Hi, I get this error while creating a Dataset with a FHIR type resource. Requesting your help.

Spark 2.2
FHIR DSTU3
bunsen-core-0.1.0
PatientSummary is a custom FHIR Resource

Dataset<PatientSummary> patientSummaryDS = sparkSession.createDataset(patientSummaryRDD.collect(),
				encoders.of(PatientSummary.class));

Error:
scala.MatchError: RuntimeElementDirectResource[DirectChildResource, IBaseResource] (of class ca.uhn.fhir.context.RuntimeElementDirectResource)
at com.cerner.bunsen.SchemaConverter.com$cerner$bunsen$SchemaConverter$$childToFields(SchemaConverter.scala:119)
at com.cerner.bunsen.SchemaConverter$$anonfun$1.apply(SchemaConverter.scala:161)
at com.cerner.bunsen.SchemaConverter$$anonfun$1.apply(SchemaConverter.scala:161)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at com.cerner.bunsen.SchemaConverter.compositeToStructType(SchemaConverter.scala:161)
at com.cerner.bunsen.EncoderBuilder$.of(EncoderBuilder.scala:43)
at com.cerner.bunsen.EncoderBuilder.of(EncoderBuilder.scala)
at com.cerner.bunsen.FhirEncoders.of(FhirEncoders.java:64)

Support Broadcasting the FHIR ValueSet

A FHIR ValueSet is described as

... a set of codes drawn from one or more code systems.

It would be useful to be able to read and broadcast FHIR ValueSets in a manner similar to how BroadcastableValueSets supports LOINC and SOMED codes. FHIR ValueSets are perhaps a simplified form of the relationship-structured LOINC and SNOMED, as FHIR ValueSets have no notion of an ancestor or descendant.

At the moment, our BroadcastableValueSets uses the ConceptMaps class underneath, where the Source and Target systems are the same. I think there's some room for simplification as part of this issue, in which we create a ValueSets class that can capture collections with relationships between their codes (e.g. LOINC, SNOMED), and collections that are flag (e.g. a FHIR ValueSet). ConceptMaps can be disentangled from that implementation and left to be exposed as BroadcastableConceptMaps for mappings between distinct Source and Target systems.

MultiValue & Containers clashing with Type Restrictions

Right now the getContainer method for Avro expects GenericData.Array as the container list, but can clash with a Java ArrayList type.

We should make sure we're using GenericData.Array throughout for Avro lists, but also loosen the type restriction to match the interface, not the implementation in getContainer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.