Giter Site home page Giter Site logo

spark-xml's Introduction

XML Data Source for Apache Spark 3.x

  • A library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. The structure and test tools are mostly copied from CSV Data Source for Spark.

  • This package supports to process format-free XML files in a distributed way, unlike JSON datasource in Spark restricts in-line JSON format.

  • Compatible with Spark 3.0 and later with Scala 2.12, and also Spark 3.2 and later with Scala 2.12 or 2.13. Scala 2.11 and Spark 2 support ended with version 0.13.0.

  • Currently, spark-xml is planned to become a part of Apache Spark 4.0. This library will remain in maintenance mode for Spark 3.x versions.

Linking

You can link against this library in your program at the following coordinates:

groupId: com.databricks
artifactId: spark-xml_2.12
version: 0.18.0

Using with Spark shell

This package can be added to Spark using the --packages command line option. For example, to include it when starting the spark shell:

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.18.0

Features

This package allows reading XML files in local or distributed filesystem as Spark DataFrames.

When reading files the API accepts several options:

  • path: Location of files. Similar to Spark can accept standard Hadoop globbing expressions.

  • rowTag: The row tag of your xml files to treat as a row. For example, in this xml <books> <book><book> ...</books>, the appropriate value would be book. Default is ROW.

  • samplingRatio: Sampling ratio for inferring schema (0.0 ~ 1). Default is 1. Possible types are StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType and NullType, unless user provides a schema for this.

  • excludeAttribute : Whether you want to exclude attributes in elements or not. Default is false.

  • treatEmptyValuesAsNulls : (DEPRECATED: use nullValue set to "") Whether you want to treat whitespaces as a null value. Default is false

  • mode: The mode for dealing with corrupt records during parsing. Default is PERMISSIVE.

    • PERMISSIVE :
      • When it encounters a corrupted record, it sets all fields to null and puts the malformed string into a new field configured by columnNameOfCorruptRecord.
      • When it encounters a field of the wrong datatype, it sets the offending field to null.
    • DROPMALFORMED : ignores the whole corrupted records.
    • FAILFAST : throws an exception when it meets corrupted records.
  • inferSchema: if true, attempts to infer an appropriate type for each resulting DataFrame column, like a boolean, numeric or date type. If false, all resulting columns are of string type. Default is true.

  • columnNameOfCorruptRecord: The name of new field where malformed strings are stored. Default is _corrupt_record.

    Note: this field should be present in the dataframe schema if it is passed explicitly, like this:

    schema = StructType([StructField("my_field", TimestampType()), StructField("_corrupt_record", StringType())])
    spark.read.format("xml").options(rowTag='item').schema(schema).load("file.xml")

    If schema is infered, this field is added automatically.

  • attributePrefix: The prefix for attributes so that we can differentiate attributes and elements. This will be the prefix for field names. Default is _. Can be empty, but only for reading XML.

  • valueTag: The tag used for the value when there are attributes in the element having no child. Default is _VALUE.

  • charset: Defaults to 'UTF-8' but can be set to other valid charset names

  • ignoreSurroundingSpaces: Defines whether or not surrounding whitespaces from values being read should be skipped. Default is false.

  • wildcardColName: Name of a column existing in the provided schema which is interpreted as a 'wildcard'. It must have type string or array of strings. It will match any XML child element that is not otherwise matched by the schema. The XML of the child becomes the string value of the column. If an array, then all unmatched elements will be returned as an array of strings. As its name implies, it is meant to emulate XSD's xs:any type. Default is xs_any. New in 0.11.0.

  • rowValidationXSDPath: Path to an XSD file that is used to validate the XML for each row individually. Rows that fail to validate are treated like parse errors as above. The XSD does not otherwise affect the schema provided, or inferred. Note that if the same local path is not already also visible on the executors in the cluster, then the XSD and any others it depends on should be added to the Spark executors with SparkContext.addFile. In this case, to use local XSD /foo/bar.xsd, call addFile("/foo/bar.xsd") and pass just "bar.xsd" as rowValidationXSDPath.

  • ignoreNamespace: If true, namespaces prefixes on XML elements and attributes are ignored. Tags <abc:author> and <def:author> would, for example, be treated as if both are just <author>. Note that, at the moment, namespaces cannot be ignored on the rowTag element, only its children. Note that XML parsing is in general not namespace-aware even if false. Defaults to false. New in 0.11.0.

  • timestampFormat: Specifies an additional timestamp format that will be tried when parsing values as TimestampType columns. The format is specified as described in DateTimeFormatter. Defaults to try several formats, including ISO_INSTANT, including variations with offset timezones or no timezone (defaults to UTC). New in 0.12.0. As of 0.16.0, if a custom format pattern is used without a timezone, the default Spark timezone specified by spark.sql.session.timeZone will be used.

  • timezone: identifier of timezone to be used when reading timestamps without a timezone specified. New in 0.16.0.

  • dateFormat: Specifies an additional timestamp format that will be tried when parsing values as DateType columns. The format is specified as described in DateTimeFormatter. Defaults to ISO_DATE. New in 0.12.0.

When writing files the API accepts several options:

  • path: Location to write files.
  • rowTag: The row tag of your xml files to treat as a row. For example, in <books> <book><book> ...</books>, the appropriate value would be book. Default is ROW.
  • rootTag: The root tag of your xml files to treat as the root. For example, in <books> <book><book> ...</books>, the appropriate value would be books. It can include basic attributes by specifying a value like books foo="bar" (as of 0.11.0). Default is ROWS.
  • declaration: Content of XML declaration to write at the start of every output XML file, before the rootTag. For example, a value of foo causes <?xml foo?> to be written. Set to empty string to suppress. Defaults to version="1.0" encoding="UTF-8" standalone="yes". New in 0.14.0.
  • arrayElementName: Name of XML element that encloses each element of an array-valued column when writing. Default is item. New in 0.16.0.
  • nullValue: The value to write null value. Default is string null. When this is null, it does not write attributes and elements for fields.
  • attributePrefix: The prefix for attributes so that we can differentiating attributes and elements. This will be the prefix for field names. Default is _. Cannot be empty for writing XML.
  • valueTag: The tag used for the value when there are attributes in the element having no child. Default is _VALUE.
  • compression: compression codec to use when saving to file. Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive shorten names (bzip2, gzip, lz4, and snappy). Defaults to no compression when a codec is not specified.
  • timestampFormat: Controls the format used to write TimestampType format columns. The format is specified as described in DateTimeFormatter. Defaults to ISO_INSTANT. New in 0.12.0. As of 0.16.0, if a custom format pattern is used without a timezone, the default Spark timezone specified by spark.sql.session.timeZone will be used.
  • timezone: identifier of timezone to be used when writing timestamps without a timezone specified. New in 0.16.0.
  • dateFormat: Controls the format used to write DateType format columns. The format is specified as described in DateTimeFormatter. Defaults to ISO_DATE. New in 0.12.0.

Currently it supports the shortened name usage. You can use just xml instead of com.databricks.spark.xml.

NOTE: created files have no .xml extension.

XSD Support

Per above, the XML for individual rows can be validated against an XSD using rowValidationXSDPath.

The utility com.databricks.spark.xml.util.XSDToSchema can be used to extract a Spark DataFrame schema from some XSD files. It supports only simple, complex and sequence types, and only basic XSD functionality.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Parsing Nested XML

Although primarily used to convert (portions of) large XML documents into a DataFrame, spark-xml can also parse XML in a string-valued column in an existing DataFrame with from_xml, in order to add it as a new column with parsed results as a struct.

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
  • This can convert arrays of strings containing XML to arrays of parsed structs. Use schema_of_xml_array instead
  • com.databricks.spark.xml.from_xml_string is an alternative that operates on a String directly instead of a column, for use in UDFs
  • If you use DROPMALFORMED mode with from_xml, then XML values that do not parse correctly will result in a null value for the column. No rows will be dropped.
  • If you use PERMISSIVE mode with from_xml et al, which is the default, then the parse mode will actually instead default to DROPMALFORMED. If however you include a column in the schema for from_xml that matches the columnNameOfCorruptRecord, then PERMISSIVE mode will still output malformed records to that column in the resulting struct.

Pyspark notes

The functions above are exposed in the Scala API only, at the moment, as there is no separate Python package for spark-xml. They can be accessed from Pyspark by manually declaring some helper functions that call into the JVM-based API from Python. Example:

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

Structure Conversion

Due to the structure differences between DataFrame and XML, there are some conversion rules from XML data to DataFrame and from DataFrame to XML data. Note that handling attributes can be disabled with the option excludeAttribute.

Conversion from XML to DataFrame

  • Attributes: Attributes are converted as fields with the heading prefix, attributePrefix.

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>

    produces a schema below:

    root
     |-- _myOneAttrib: string (nullable = true)
     |-- two: string (nullable = true)
     |-- three: string (nullable = true)
    
  • Value in an element that has no child elements but attributes: The value is put in a separate field, valueTag.

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>

    produces a schema below:

    root
     |-- two: struct (nullable = true)
     |    |-- _VALUE: string (nullable = true)
     |    |-- _myTwoAttrib: string (nullable = true)
     |-- three: string (nullable = true)
    

Conversion from DataFrame to XML

  • Element as an array in an array: Writing a XML file from DataFrame having a field ArrayType with its element as ArrayType would have an additional nested field for the element. This would not happen in reading and writing XML data but writing a DataFrame read from other sources. Therefore, roundtrip in reading and writing XML files has the same structure but writing a DataFrame read from other sources is possible to have a different structure.

    DataFrame with a schema below:

     |-- a: array (nullable = true)
     |    |-- element: array (containsNull = true)
     |    |    |-- element: string (containsNull = true)
    

    with data below:

    +------------------------------------+
    |                                   a|
    +------------------------------------+
    |[WrappedArray(aa), WrappedArray(bb)]|
    +------------------------------------+
    

    produces a XML file below:

    <a>
        <item>aa</item>
    </a>
    <a>
        <item>bb</item>
    </a>

Examples

These examples use a XML file available for download here:

$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml

SQL API

XML data source for Spark can infer data types:

CREATE TABLE books
USING com.databricks.spark.xml
OPTIONS (path "books.xml", rowTag "book")

You can also specify column names and types in DDL. In this case, we do not infer schema.

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING com.databricks.spark.xml
OPTIONS (path "books.xml", rowTag "book")

Scala API

Import com.databricks.spark.xml._ to get implicits that add the .xml(...) method to DataFrame. You can also use .format("xml") and .load(...).

import org.apache.spark.sql.SparkSession
import com.databricks.spark.xml._

val spark = SparkSession.builder().getOrCreate()
val df = spark.read
  .option("rowTag", "book")
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

You can manually specify the schema when reading data:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import com.databricks.spark.xml._

val spark = SparkSession.builder().getOrCreate()
val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))


val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

Java API

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession.builder().getOrCreate();
DataFrame df = spark.read()
  .format("xml")
  .option("rowTag", "book")
  .load("books.xml");

df.select("author", "_id").write()
  .format("xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .save("newbooks.xml");

You can manually specify schema:

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

SparkSession spark = SparkSession.builder().getOrCreate();
StructType customSchema = new StructType(new StructField[] {
  new StructField("_id", DataTypes.StringType, true, Metadata.empty()),
  new StructField("author", DataTypes.StringType, true, Metadata.empty()),
  new StructField("description", DataTypes.StringType, true, Metadata.empty()),
  new StructField("genre", DataTypes.StringType, true, Metadata.empty()),
  new StructField("price", DataTypes.DoubleType, true, Metadata.empty()),
  new StructField("publish_date", DataTypes.StringType, true, Metadata.empty()),
  new StructField("title", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = spark.read()
  .format("xml")
  .option("rowTag", "book")
  .schema(customSchema)
  .load("books.xml");

df.select("author", "_id").write()
  .format("xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .save("newbooks.xml");

Python API

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.format('xml').options(rowTag='book').load('books.xml')
df.select("author", "_id").write \
    .format('xml') \
    .options(rowTag='book', rootTag='books') \
    .save('newbooks.xml')

You can manually specify schema:

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()
customSchema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)])

df = spark.read \
    .format('xml') \
    .options(rowTag='book') \
    .load('books.xml', schema = customSchema)

df.select("author", "_id").write \
    .format('xml') \
    .options(rowTag='book', rootTag='books') \
    .save('newbooks.xml')

R API

Automatically infer schema (data types)

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:0.18.0"))

df <- read.df("books.xml", source = "xml", rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "newbooks.csv", "xml", "overwrite")

You can manually specify schema:

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:0.18.0"))
customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "newbooks.csv", "xml", "overwrite")

Hadoop InputFormat

The library contains a Hadoop input format for reading XML files by a start tag and an end tag. This is similar with XmlInputFormat.java in Mahout but supports to read compressed files, different encodings and read elements including attributes, which you may make direct use of as follows:

import com.databricks.spark.xml.XmlInputFormat
import org.apache.spark.SparkContext
import org.apache.hadoop.io.{LongWritable, Text}

val sc: SparkContext = _

// This will detect the tags including attributes
sc.hadoopConfiguration.set(XmlInputFormat.START_TAG_KEY, "<book>")
sc.hadoopConfiguration.set(XmlInputFormat.END_TAG_KEY, "</book>")

val records = sc.newAPIHadoopFile(
  "path",
  classOf[XmlInputFormat],
  classOf[LongWritable],
  classOf[Text])

Building From Source

This library is built with SBT. To build a JAR file simply run sbt package from the project root.

Acknowledgements

This project was initially created by HyukjinKwon and donated to Databricks.

spark-xml's People

Contributors

anastasia avatar bioqwer avatar cefn avatar comtef avatar dolfinus avatar ericsun95 avatar ganeshchand avatar gfeuillen avatar haavard avatar hyukjinkwon avatar jimenefe avatar joristruong avatar joshrosen avatar kduvekot-wehkamp-nl avatar lokm01 avatar mattroberts297 avatar mengxr avatar neherim avatar nyoungstudios avatar pjfanning avatar rxin avatar seddonm1 avatar shuch3ng avatar sivaponting avatar skadyan avatar socialpercon avatar srowen avatar trheming avatar xanderbailey avatar yjhyjhyjh0 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-xml's Issues

Log shows error even though it's catched

Take the following XML

<Root>
   <RowTag1>
      <col1>foo</col1>
   </RowTag1>
   <RowTag2>
      <RowTag1>foo3</RowTag1>
   </RowTag2>
</Root>

If you do

val dfTry = Try(sqlContext.read
      .format("xml")
      .option("rowTag", "RowTag1")
      .load(path))

Then I wouldn't expect to see this log (the code still executes fine... but I don't want to see this error message because I have it surrounded in Try)

16/04/19 15:34:55 ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 30)
scala.MatchError: null
at com.databricks.spark.xml.parsers.StaxXmlParserUtils$.checkEndElement(StaxXmlParserUtils.scala:25)
at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:198)
at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:92)
at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:83)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:212)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1100)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1100)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/04/19 15:34:55 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job
16/04/19 15:34:59 ERROR Executor: Exception in task 0.0 in stage 40.0 (TID 70)
scala.MatchError: null
at com.databricks.spark.xml.parsers.StaxXmlParserUtils$.checkEndElement(StaxXmlParserUtils.scala:25)
at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:198)
at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:92)
at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:83)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:212)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1100)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1100)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/04/19 15:34:59 ERROR TaskSetManager: Task 0 in stage 40.0 failed 1 times; aborting job

FYI this code:

    val tryStuff = Try(throw new IllegalArgumentException("sample exception..."))
    System.exit(1)

is silent

Support for namespaces declared at the ancestor level?

Hello! Some clarifications on applicability needed.
Suppose I'd like to extract Topic entries from DMOZ RDF dump (http://rdf.dmoz.org/)
WLOG, it goes like this:

<?xml version="1.0" encoding="UTF-8"?>
<RDF xmlns:r="http://www.w3.org/TR/RDF/" xmlns:d="http://purl.org/dc/elements/1.0/" xmlns="http://dmoz.org/rdf/">
  <!-- Generated at 2016-01-24 00:05:51 EST from DMOZ 2.0 -->
  <Topic r:id="">
    <catid>1</catid>
  </Topic>
</RDF>

Topic attribute id refers to a namespace r declared in the RDF scope. If I'm getting the whole thing right, on Spark, when looking for rowtag entries within a particular partition, there is no way to tell that this namespaces have been defined in the outer scope, in a way, globally to the whole Spark cluster. This is why I'm getting 'Malformed row' warning in the log on this example (caused by javax.xml.stream.XMLStreamException: ParseError at [row,col]:[1,16] Message: http://www.w3.org/TR/1999/REC-xml-names-19990114#AttributePrefixUnbound?Topic&r:id&r)

First of all, am I correct?
Second: any workaround here? I'd really enjoy reading that RDF dump. Maybe parser could expect for some static definitions provided by the user?
Thanks.

Support for PERMISSIVE mode explicitly.

After working on modes in JSON data source, I think those modes should also be supported here.

Currently, supported modes are DROPMALFORMED and FAILFAST.

Failed to parse XML documents when the datatypes in the same elements are structural data type and non-structural data type

Currently, it fails to load XML documents having different data types (structural data types and non-structural data types) for the same elements.

For example,

<?xml version="1.0"?>
<ROWSET>
    <ROW>
        <user><id>1</id></user>
    </ROW>
    <ROW>
        <user>1234</user>
    </ROW>
</ROWSET>

In the case above, this infers the type for user as StringType as below:

root
 |-- user: string (nullable = true)

but this fails to parse throwing the exception below:

15:39:54.518 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$)
    at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$convertComplicatedType$1$1.apply(StaxXmlParser.scala:86)
    at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$convertComplicatedType$1$1.apply(StaxXmlParser.scala:86)
    at com.databricks.spark.xml.parsers.StaxXmlParser$.convertField(StaxXmlParser.scala:94)
    at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$com$databricks$spark$xml$parsers$StaxXmlParser$$convertObject$1.apply$mcVI$sp(StaxXmlParser.scala:254)
    at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$com$databricks$spark$xml$parsers$StaxXmlParser$$convertObject$1.apply(StaxXmlParser.scala:232)
    at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$com$databricks$spark$xml$parsers$StaxXmlParser$$convertObject$1.apply(StaxXmlParser.scala:232)
    at scala.Option.foreach(Option.scala:257)
...

It looks this can be dealt with similarly with JSON data source.

In case of JSON datasource in Spark, it identically infers the type as StringType as below:

For example, the data below:

{"user": {"id": 1}}
{"user": 1234}

infers the types below:

root
 |-- user: string (nullable = true)

but it succeed to parse tham as StringType as below:

+--------+
|    user|
+--------+
|{"id":1}|
|    1234|
+--------+

Incomplete values for some tags

I'm trying to read the english wikipedia data dump using spark-xml. My setup is as follows:

  • zeppelin 0.5.6 (using built it Spark 1.6.0)
  • using spark-xml 0.3.1 on scala 2.10

I'm loading the wikipedia pages as follows
val items = z.sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "page")
.load("<PATH>/enwiki-latest-pages-articles-sample.xml")

which succeeds and infers a schema that seems reasonable
org.apache.spark.sql.DataFrame = [id: bigint, ns: bigint, redirect: struct<#VALUE:string,@title:string>, revision: struct<comment:string,contributor:struct<id:bigint,ip:string,username:string>,format:string,id:bigint,minor:string,model:string,parentid:bigint,sha1:string,text:struct<#VALUE:string,@space:string>,timestamp:string>, title: string]

If I try to items.select("revision.text.#VALUE") it gives me a value that's only a prefix to the whole text available in that page meaning the value is not read fully.

Another thing I tried is to do
// changing the rowTag from page to text
val items = z.sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "text")
.load("<PATH>/enwiki-latest-pages-articles-sample.xml")

and I now get
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost): scala.MatchError: ENDDOCUMENT (of class com.sun.xml.internal.stream.events.EndDocumentEvent)
at com.databricks.spark.xml.parsers.StaxXmlParser$.checkEndElement(StaxXmlParser.scala:94)
at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:218)
at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:95)
at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
...

which seems to suggest that for some reason it's not able to find the end_tag for text even though it is there.

I'm assuming something is wrong with the XML parsing here but I'm not really sure what's special abt the text tag.

I've attached the input file at https://gist.github.com/humanzz/e2e296fd82dccaa5109c

Unable to run the example

I've imported spark-xml_2.11-0.1.2.jar generated by sbt inside my library in intellij 15

but when i try to run this class:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;

public class Main {
    public static void main(String[] args) throws Exception {
        // Create a Java Spark Context.
        SparkConf conf = new SparkConf().setMaster("local").setAppName("xmlParser");
        JavaSparkContext sc = new JavaSparkContext(conf);



        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.read()
                .format("org.apache.spark.sql.xml")
                .option("rootTag", "book")
                .load("/Users/Paolo/Developer/Java/ProgettoAnalisiDati-ArticleMiner/src/main/java/spark/jobs/books.xml");

         df.select("author", "id").collect();
    }
}

i get this error:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to load class for data source: org.apache.spark.sql.xml.
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
at spark.jobs.Main.main(Main.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.xml.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)
at scala.util.Try.orElse(Try.scala:82)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:60)
... 9 more

what i'm doing wrong?

example doesn't work in Pyspark for spark 1.5+

File "/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 121, in load
File "/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in call
File "/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
File "/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.load.
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.databricks.spark.xml.XmlOptions$.createFromConfigMap(XmlOptions.scala:45)
at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:57)
at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:43)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Support specific xml path for duplicated tags

Recently I found spark-xml library and it works well with our requirement. However, since we need to process more complex/nested xml file which has the case with same tag in different path. What will be the best practice to deal with this case? E.g., we have under as well as under , I tried to set “protocols” as the rowTag, and the select function returns all the children under configuration and firewall. Do I need to split xml file first in order to select the exact xml path? I think it will be nice to be able to specify the XML tag path in the API.

e.g,

...
<protocols>
    <configuration>
        <value>aaa</value>
    </configuration>
</protocols>
...
...
<firewall>
    <configuration>
        <value>bbb</value>
    </configuration>
</firewall>
...

Flattening Nested XMLs to DataFrame

Thanks for the very helpful module. I have the following XML structure that gets converted to Row of POP with the sequence inside. Is there any way to map attribute with NAME and PVAL as value to Columns in dataframe?

<RECORDS>
 <RECORD>
    <PROP NAME="A">
      <PVAL>ValueA</PVAL>
    </PROP>
    <PROP NAME="B">
      <PVAL>ValueB</PVAL>
    </PROP>
 </RECORD>
</RECORDS>

This is what I end up with

root
 |-- PROP: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- @NAME: string (nullable = true)
 |    |    |-- PVAL: string (nullable = true)

Adding tests for Spark 1.6

Spark 1.6.0 is released. So I think I should add some tests for this and check if there are things that should be changed.

Create a way to exclude rootTag when saving file

If I create an RDD with multiple slices, then I end up with a file for each slice. Each file includes a rootTag in it, so that if I use Hadoop to merge the files I end up with a resulting file like so...

<animals>
  <animal>
    <name>cat</name>
  </animal>
  <animal>
    <name>dog</name>
  </animal>
</animals>
<animals>
  <animal>
    <name>fish</name>
  </animal>
  <animal>
    <name>bird</name>
  </animal>
</animals>

If I set the rootTag option to null I end up with:

<null>
  <animal>
    <name>cat</name>
  </animal>
  <animal>
    <name>dog</name>
  </animal>
</null>
<null>
  <animal>
    <name>fish</name>
  </animal>
  <animal>
    <name>bird</name>
  </animal>
</null>

If I could exclude the rootTag option, then I can surround the merged file in the root tag myself after all processing is complete. This would be similar to the option in spark-csv where you can exclude the CSV header.

Ideally if the root tag and closing tag could go in their own partitioned files (at the beginning and end respectively) then I wouldn't have to perform the additional file manipulation.

Or perhaps I'm doing something wrong?

Values not being parsed correctly / showing as null

Hi,

I've noticed that some fields in an xml I'm parsing come up as null. I'm using the code from master (updated as of today) and Schema generation seems to be alright.

It seems to be a problem in StaxXmlParser sending an incorrect "sub-schema". But I could not get to the root of the problem yet.

Any ideas on what could be wrong?

Sample file here

<dtcc:genericProductDTCC id="Id_12345">
    <fpml:primaryAssetClass>Equity</fpml:primaryAssetClass>
    <fpml:productType productTypeScheme="urn:bank:product-type:isda-plus">Equity:Exotic:Option</fpml:productType>
    <fpml:productId productIdScheme="http://www.fpml.org/coding-scheme/product-taxonomy">Equity:Other</fpml:productId>
    <fpml:productId productIdScheme="http://www.dtcc.com/coding-scheme/Narrow-Broad-Id">Broad</fpml:productId>
    <fpml:embeddedOptionType>false</fpml:embeddedOptionType>
    <fpml:buyerPartyReference href="party1"></fpml:buyerPartyReference>
    <fpml:buyerAccountReference href="account1"></fpml:buyerAccountReference>
    <fpml:sellerPartyReference href="party2"></fpml:sellerPartyReference>
    <fpml:premium>
        <fpml:payerPartyReference href="party1"/>
        <fpml:payerAccountReference href="account1"/>
        <fpml:receiverPartyReference href="party2"/>
        <fpml:paymentAmount>
            <fpml:currency>USD</fpml:currency>
            <fpml:amount>0</fpml:amount>
        </fpml:paymentAmount>
        <fpml:paymentDate>
            <fpml:adjustableDate>
                <fpml:unadjustedDate>2014-10-28</fpml:unadjustedDate>
            </fpml:adjustableDate>
        </fpml:paymentDate>
    </fpml:premium>
    <fpml:effectiveDate>
        <fpml:unadjustedDate>2014-10-24</fpml:unadjustedDate>
    </fpml:effectiveDate>
    <fpml:expirationDate>
        <fpml:unadjustedDate>2016-10-21</fpml:unadjustedDate>
    </fpml:expirationDate>
    <fpml:underlyer>
        <fpml:basket>
            <fpml:openUnits>999</fpml:openUnits>
            <fpml:basketConstituent>
                <fpml:index id="ULId_67158430_843509334">
                    <fpml:instrumentId instrumentIdScheme="http://www.fpml.org/coding-scheme/external/instrument-id-Reuters-RIC">.SPX</fpml:instrumentId>
                    <fpml:exchangeId exchangeIdScheme="http://www.fpml.org/coding-scheme/external/exchange-id-MIC">NYSE</fpml:exchangeId>
                </fpml:index>
                <fpml:constituentWeight>
                    <fpml:basketPercentage>1</fpml:basketPercentage>
                </fpml:constituentWeight>
            </fpml:basketConstituent>
        </fpml:basket>
    </fpml:underlyer>
    <fpml:notional>
        <fpml:currency>USD</fpml:currency>
        <fpml:amount>1100000</fpml:amount>
    </fpml:notional>
    <fpml:optionType>Call</fpml:optionType>
    <fpml:settlementCurrency>USD</fpml:settlementCurrency>
    <dtcc:genericProductExtension>
        <dtcc:exerciseStyleEnum>European</dtcc:exerciseStyleEnum>
        <dtcc:settlementTypeEnum>Cash</dtcc:settlementTypeEnum>
        <dtcc:strike>
            <fpml:strikePercentage>104.615</fpml:strikePercentage>
            <fpml:currency>USD</fpml:currency>
        </dtcc:strike>
        <dtcc:optionCommencementDate>
            <fpml:adjustableDate>
                <fpml:unadjustedDate>2014-10-24</fpml:unadjustedDate>
            </fpml:adjustableDate>
        </dtcc:optionCommencementDate>
        <dtcc:paymentFrequency/>
    </dtcc:genericProductExtension>
</dtcc:genericProductDTCC>

Sample output:

scala> df.select("tradeEvent.trade.dtcc:genericProductDTCC.*").show(true)
+--------+----------------------------+--------------------------+------------------------+------------------+-----------------------+-------------------+-------------+---------------+------------+----------------------+--------------------+--------------------+-------------------------+-----------------------+--------------+
|     @id|dtcc:genericProductExtension|fpml:buyerAccountReference|fpml:buyerPartyReference|fpml:effectiveDate|fpml:embeddedOptionType|fpml:expirationDate|fpml:notional|fpml:optionType|fpml:premium|fpml:primaryAssetClass|      fpml:productId|    fpml:productType|fpml:sellerPartyReference|fpml:settlementCurrency|fpml:underlyer|
+--------+----------------------------+--------------------------+------------------------+------------------+-----------------------+-------------------+-------------+---------------+------------+----------------------+--------------------+--------------------+-------------------------+-----------------------+--------------+
|Id_12345|                        null|                      null|                    null|              null|                   null|               null|         null|           null|        null|                Equity|[[null,http://www...|[Equity:Exotic:Op...|                     null|                   null|          null|
+--------+----------------------------+--------------------------+------------------------+------------------+-----------------------+-------------------+-------------+---------------+------------+----------------------+--------------------+--------------------+-------------------------+-----------------------+--------------+
scala> df.select("tradeEvent.trade.dtcc:genericProductDTCC.fpml:buyerAccountReference.*").show(true)
+------+-----+
|#VALUE|@href|
+------+-----+
|  null| null|
+------+-----+


Inconsistent element attributes

I have a question about the behavior of this code when presented with a document such as this:

<library location="uptown">
    <shelf category="fiction">
        <book author="Lewis Carol">Alice's Adventures In Wonderland</book>
        <book author="James Joyce">Portrait of the Artist as a Young Man</book>
    </shelf>
    <shelf category="non-fiction">
        <book author="Carl Sagan">Billions and Billions</book>
        <book>Webster's Collegiate Dictionary</book>
    </shelf>
</library>
<library location="downtown">
    <shelf category="fiction">
        <book author="J.K. Rowling">Harry Potter and the Prisoner of Azkaban</book>
        <book author="Homer">The Iliad</book>
    </shelf>
    <shelf category="non-fiction">
        <book author="Stephen Hawking">A Brief History of Time</book>
        <book author="Benjamin Franklin">The Autobiography of Benjamin Franklin</book>
    </shelf>
</library>

As you can see there's a book in there without an author attribute.

I can load this xml and print the schema:

val test = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "library").load("maprfs:///user/cmatta/projects/test.xml")
test.printSchema

test: org.apache.spark.sql.DataFrame = [@location: string, shelf: array<struct<@category:string,book:array<struct<#VALUE:string,@author:string>>>>]
root
 |-- @location: string (nullable = true)
 |-- shelf: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- @category: string (nullable = true)
 |    |    |-- book: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- #VALUE: string (nullable = true)
 |    |    |    |    |-- @author: string (nullable = true)

If I want to select all the books by location and collect them I get a java.lang.ArrayIndexOutOfBoundsException: 1 error. If I just collect the whole dataframe without select I can see that the "Webster's Collegiate Dictionary" book array only has one element, where I would have expected two: the value and a null representing the author. Is this a correct expectation?

test.collect
res552: Array[org.apache.spark.sql.Row] = Array([uptown,WrappedArray([fiction,WrappedArray([Alice's Adventures In Wonderland,Lewis Carol], [Portrait of the Artist as a Young Man,James Joyce])], [non-fiction,WrappedArray([Billions and Billions,Carl Sagan], [Webster's Collegiate Dictionary])])], [downtown,WrappedArray([fiction,WrappedArray([Harry Potter and the Prisoner of Azkaban,J.K. Rowling], [The Iliad,Homer])], [non-fiction,WrappedArray([A Brief History of Time,Stephen Hawking], [The Autobiography of Benjamin Franklin,Benjamin Franklin])])])

If this behavior is expected, what's the best way to deal with inconsistent XML like this?

Separate functionalities of reading from XML files and reading from RDD[String]s

As mentioned in #4, it would be nice to have the XML data source migrated to HadoopFsRelation. However, there's a concern about reading from RDD[String].

The current version resembles early versions of the JSON data source. The JSON data source also supports reading from both JSON files and RDD[String]s that contain JSON records. Later on, the JSON data source was migrated to HadoopFsRelation for partitioning support. In another PR of Spark, we found that this design is pretty confusing, because now JSON data source is a HadoopFsRelation, but it may not read from a FileSystem at all. This violates some basic assumption of HadoopFsRelation, and introduced unnecessary complexities here and there.

Maybe we can drop reading from RDD[String] for the first version, and come up with a more solid solution later. For example, separate it into another simple data source that shares most code of the HadoopFsRelation one. I'm not quite confident about this problem either. Suggestions are welcomed.

Link from https://github.com/HyukjinKwon/spark-xml/issues/7

Duplicate "value" fields

Inferred schema contains duplicate "#VALUE" fields.
Test XML structure:
<root><a attr="abc">123</a></root>
<root><a >456</a></root>
<root><a >789</a></root>
<root><a >900</a></root>

Produced schema:
root
|-- a: struct (nullable = true)
| |-- #VALUE: long (nullable = true)
| |-- #VALUE: long (nullable = true)
| |-- #VALUE: long (nullable = true)
| |-- #VALUE: long (nullable = true)
| |-- @attr: string (nullable = true)

I found out that it is caused by InferSchema, compatibleType(StructType, DataType) code which is not reducing by field name.
I managed to fix it, but because I'm lame in scala and spark I'm not committing it. Please review.

Here is my code fragment:

    case (StructType(fields), dt: DataType) =>
      val safeFields = (fields :+ StructField(options.valueTag, dt, nullable = true)).groupBy(field => field.name).map {
        case (name, fieldTypes) =>
          val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType(options))
          StructField(name, dataType, nullable = true)
      }
      StructType(safeFields.toSeq.sortBy(_.name))


    case (dt: DataType, StructType(fields)) =>
      val safeFields = (fields :+ StructField(options.valueTag, dt, nullable = true)).groupBy(field => field.name).map {
        case (name, fieldTypes) =>
          val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType(options))
          StructField(name, dataType, nullable = true)
      }
      StructType(safeFields.toSeq.sortBy(_.name))

Encoding not working with UTF-16/32 (not UTF-16LE/BE or UTF-32LE/BE).

Since UTF-16/32 has BOM bytes ahead, BOM bytes should be handled because XmlRecordReader reads record between startTag and endTag, which are also encoded with BOM when the given encoding is UTF-16/32.

So, it ends up with trying to find startTag and endTag including BOM bytes for every record, which will not find any because DOM bytes only exist in the head of data.

I think I better do not support the, but just mention in documentation maybe.

Slow performance for reading XML due to the XML library and not optimized logics.

Sameer Farooqui said:

I'm reading the wikipedia file like this:

val wiki001DF = sqlContext.read
    .format("com.databricks.spark.xml")
    .option("rowTag", "page")
    .load("/mnt/wikipedia-readonly/en_wikipedia/enwiki-20160204-pages-articles-multistream.xml")

When reading the 54 GB XML file, I noticed that on a Spark cluster with 15 Executors (4 cores on each), it took 40 minutes to read the file (about 839 tasks). Each task is running for about 2 minutes. I found that very slow/long.

I then took some thread dumps on the Executors and noticed that a lot of time is being spent in the xerces XML parsing library:

com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.scanName(XMLEntityScanner.java:716)
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanEntityReference(XMLDocumentFragmentScannerImpl.java:1844)
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:3067)
com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerlmpl.java:117)
com.sun.org.apache.xerces.internal.impl.XMLStreamReaderImpl.next(XMLStreamReaderImpl.java:558)
com.sun.xml.internal.stream.XMLEventReaderImpl.peek(XMLEventReaderImpl.java:276)
com.databricks.spark.xml.parsers.StaxXmlParserUtils$.readDataFully(StaxXmlParserUtils.scala:29)

Although this issue sounds a bit broad, I think I should create this as a remainder. It looks there are two things to deal with.

  • Better XML library.
  • Optimization for parsing logic.

Java version compatibility

This library should take Java version compatibility between 6 & 7 JDK and 8 JDK into account due to XmlInputFormat.

I think it would be better to change it into Scala.

Support for reading compressed files.

Since this library has a custom inputformat, XmlInputFormat, compression should manually be supported to read compressed files.

For writing, it is supported to write compressed files.

MatchError when trying to query XML files with partial attributes

Hi,

I have a fairly complex set of XML files that I'd like to load. Schema inference seems to work fine, but I cannot query anything as it fails with "scala.MatchError: 2 (of class java.lang.Long)"

I extracted a small example from my XMLs

1.xml

<?xml version="1.0" encoding="UTF-8"?>
<events>
  <event pid="2">10</event>
  <event pid="2">12</event>
  <event pid="2">16</event>
  <event pid="2">17</event>
  <event pid="2">18</event>
</events>

2.xml

<?xml version="1.0" encoding="UTF-8"?>
<events>
  <event pid="1">7</event>
  <event pid="2">12</event>
  <event pid="2">13</event>
  <event pid="2">14</event>
  <event pid="2" exit="t">14</event>
  <event pid="2">16</event>
  <event pid="2">17</event>
</events>

This is what happens when I tried to query it in Spark-Shell

scala> val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","events").load("/tmp/test_dataset/*.xml")
df: org.apache.spark.sql.DataFrame = [event: array<struct<#VALUE:bigint,@exit:string,@pid:bigint>>]

scala> df.printSchema()
root
 |-- event: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- #VALUE: long (nullable = true)
 |    |    |-- @exit: string (nullable = true)
 |    |    |-- @pid: long (nullable = true)

scala> df.select("event").show()
16/04/04 17:39:51 ERROR Executor: Exception in task 0.0 in stage 37.0 (TID 8903)
scala.MatchError: 2 (of class java.lang.Long)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)

any ideas?
Thanks!!

Support for writing DTD/declaration.

Currently this library produces indented XML files without DTD/declaration.

Maybe it should write a DTD/declaration as default or support an option to write this.

Should test against Hadoop 1.x and Hadoop 2.x in Travis

This library's tests should be run against both Hadoop 1.x and 2.x in Jenkins, since binary incompatibilities in Hadoop's TaskAttemptContext class may mean that your published artifact may not be compatible with both versions. See databricks/spark-avro#79 for an example of a similar PR against spark-avro and harsha2010/magellan#25 for a PR against magellan, another Spark package.

I think this is especially important for this library, given that it includes a Hadoop InputFormat.

Link from https://github.com/HyukjinKwon/spark-xml/issues/5

Reading XML files should remove tags

Hi 👋,

this library fits my need of parsing Wikipedia dumps perfectly. However, I have a small problem I could need some help with. Consider the following input format:

<documents>
    <article title="A">
        Multiline text
    </article>
    <article title="B">
        ... 
    </article>
    ...
</documents>

The code to parse such an input file looks like this (according to the provided example):

val input = {
      sc.hadoopConfiguration.set(XmlInputFormat.START_TAG_KEY, "<article>")
      sc.hadoopConfiguration.set(XmlInputFormat.END_TAG_KEY, "</article>")
      sc.hadoopConfiguration.set(XmlInputFormat.ENCODING_KEY, "utf-8")
      val records = sc.newAPIHadoopFile(config.inFile, classOf[XmlInputFormat],classOf[LongWritable],classOf[Text])
      records
    }
input.saveAsTextFile(config.outFile)

However, the output still contains the XML tags:

(283,<article title="A">
Multiline text
</article>)

Is this a feature or bug? For my use case I don't want these tags.

Support for writing attributes

Currently, writing XML files with this library does not support to write attributes.
It would be great if we can write some attributes through this library.

I am thinking both ways for this.
First is to make some options to designate which one should be treated as attribute.
Second is to use prefix for attributes such as _ or @ for both reading and writing so that we can differentiate elements and attributes easily.

spark-xml fails to load xml with certain comments

Sample document

Removing the comment

<!-- just for information: not inherited in child poms -->

allows it to load the xml file into a dataframe.

I am running the master branch of spark-xml with Spark 1.6.0 and Scala 2.10

<?xml version="1.0" encoding="UTF-8"?>

<!--
Inspired by
* groupId: org.apache, artifactId: apache, version: 15
* http://central.sonatype.org/pages/apache-maven.html

Probably want to extend with inspiration from https://github.com/kevinsawicki/github-maven-example/blob/master/example/pom.xml
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>ae.teletronics</groupId>
  <artifactId>godfather</artifactId>
  <version>0.9</version>
  <packaging>pom</packaging>

  <name>teletronics.ae</name>
  <description>Maven parent for all teletronics.ae open-source</description>
  <url>http://www.teletronics.ae</url>
  <organization>
    <name>teletronics.ae</name>
    <url>http://www.teletronics.ae</url>
  </organization>
  <licenses>
    <license>
      <name>Apache License, Version 2.0</name>
      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <!-- mailingLists>
  </mailingLists -->

  <prerequisites><!-- just for information: not inherited in child poms -->
    <maven>2.2.1</maven><!-- prerequisite of some plugins -->
  </prerequisites>

  <scm>
    <url>https://github.com/TeletronicsDotAe/maven-cross-organization</url>
    <connection>scm:git:git://github.com/TeletronicsDotAe/maven-cross-organization.git</connection>
    <developerConnection>scm:git:git://github.com/TeletronicsDotAe/maven-cross-organization.git</developerConnection>
  </scm>

  <developers>
    <developer>
      <email>[email protected]</email>
      <name>Teletronics UAE Open-Source</name>
      <url>https://github.com/TeletronicsDotAe</url>
      <id>open-source-teletronics.ae</id>
    </developer>
  </developers>

  <distributionManagement>
    <snapshotRepository>
      <id>ossrh</id>
      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    </snapshotRepository>
    <repository>
      <id>ossrh</id>
      <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
    </repository>
  </distributionManagement>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <!-- sourceReleaseAssemblyDescriptor>source-release</sourceReleaseAssemblyDescriptor>
    <organization.logo>http://www.apache.org/images/asf_logo_wide.gif</organization.logo>
    <gpg.useagent>true</gpg.useagent>
    <arguments />
    <!- Specify the default compiler source/target as 1.4 for backwards compatibility
    <maven.compiler.source>1.4</maven.compiler.source>
    <maven.compiler.target>1.4</maven.compiler.target -->    
  </properties>

  <build>
  </build>

  <profiles>
    <profile> 
      <id>release</id>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-source-plugin</artifactId>
            <version>2.2.1</version>
            <executions>
              <execution>
                <id>attach-sources</id>
                <goals>
                  <goal>jar-no-fork</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-javadoc-plugin</artifactId>
            <version>2.9.1</version>
            <executions>
              <execution>
                <id>attach-javadocs</id>
                <goals>
                  <goal>jar</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-gpg-plugin</artifactId>
            <version>1.5</version>
            <executions>
              <execution>
                <id>sign-artifacts</id>
                <phase>verify</phase>
                <goals>
                  <goal>sign</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-release-plugin</artifactId>
            <version>2.5</version>
            <configuration>
              <autoVersionSubmodules>true</autoVersionSubmodules>
              <useReleaseProfile>false</useReleaseProfile>
              <releaseProfiles>release</releaseProfiles>
              <goals>deploy</goals>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.sonatype.plugins</groupId>
            <artifactId>nexus-staging-maven-plugin</artifactId>
            <version>1.6.3</version>
            <extensions>true</extensions>
            <configuration>
               <serverId>ossrh</serverId>
               <nexusUrl>https://oss.sonatype.org/</nexusUrl>
               <autoReleaseAfterClose>true</autoReleaseAfterClose>
            </configuration>
          </plugin>      
        </plugins>
      </build>
    </profile>
  </profiles>
</project>

Stack Trace

scala> val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","project").load("godfather-0.9.pom")
16/03/08 09:45:45 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Failed to parse data with unexpected event <!-- just for information: not inherited in child poms -->
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:144)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:170)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:94)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:83)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1121)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1121)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1122)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1122)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
...

Schema after the comment is removed

scala> df.printSchema
root
 |-- @schemaLocation: string (nullable = true)
 |-- @xmlns: string (nullable = true)
 |-- @xsi: string (nullable = true)
 |-- artifactId: string (nullable = true)
 |-- build: string (nullable = true)
 |-- description: string (nullable = true)
 |-- developers: struct (nullable = true)
 |    |-- developer: struct (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- distributionManagement: struct (nullable = true)
 |    |-- repository: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |-- snapshotRepository: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- groupId: string (nullable = true)
 |-- licenses: struct (nullable = true)
 |    |-- license: struct (nullable = true)
 |    |    |-- distribution: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- modelVersion: string (nullable = true)
 |-- name: string (nullable = true)
 |-- organization: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- prerequisites: struct (nullable = true)
 |    |-- maven: string (nullable = true)
 |-- profiles: struct (nullable = true)
 |    |-- profile: struct (nullable = true)
 |    |    |-- build: struct (nullable = true)
 |    |    |    |-- plugins: struct (nullable = true)
 |    |    |    |    |-- plugin: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- artifactId: string (nullable = true)
 |    |    |    |    |    |    |-- configuration: struct (nullable = true)
 |    |    |    |    |    |    |    |-- autoReleaseAfterClose: boolean (nullable = true)
 |    |    |    |    |    |    |    |-- autoVersionSubmodules: boolean (nullable = true)
 |    |    |    |    |    |    |    |-- goals: string (nullable = true)
 |    |    |    |    |    |    |    |-- nexusUrl: string (nullable = true)
 |    |    |    |    |    |    |    |-- releaseProfiles: string (nullable = true)
 |    |    |    |    |    |    |    |-- serverId: string (nullable = true)
 |    |    |    |    |    |    |    |-- useReleaseProfile: boolean (nullable = true)
 |    |    |    |    |    |    |-- executions: struct (nullable = true)
 |    |    |    |    |    |    |    |-- execution: struct (nullable = true)
 |    |    |    |    |    |    |    |    |-- goals: struct (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- goal: string (nullable = true)
 |    |    |    |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |    |    |    |-- phase: string (nullable = true)
 |    |    |    |    |    |    |-- extensions: boolean (nullable = true)
 |    |    |    |    |    |    |-- groupId: string (nullable = true)
 |    |    |    |    |    |    |-- version: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- project.build.sourceEncoding: string (nullable = true)
 |    |-- project.reporting.outputEncoding: string (nullable = true)
 |-- scm: struct (nullable = true)
 |    |-- connection: string (nullable = true)
 |    |-- developerConnection: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- version: double (nullable = true)

Failed to load class for data source: com.databricks.spark.xml

Hello, I am using spark 1.4.0 with scala 2.10.4, when I try to use spark-xml this way,

bin/spark-shell --packages HyukjinKwon:spark-xml:0.1.1-s_2.10
scala> import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
scala> val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rootTag", "book") 
.load("books.xml")

Then I got this error,

java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.xml

I read from https://issues.apache.org/jira/browse/SPARK-8368, so do I need to move to spark 1.4.1 or 1.5, or could you please offer me some way to fix it?

Self-closing tags are not supported as top-level rows

Self-closing tags that are supported by xml standard are not supported currently.

Trying to read xml with self-closing tags generates empty schema.

Python Code snippet:

df = sqlContext.load(source="com.databricks.spark.xml", rowTag = 'book', path = 'file:///home/cloudera/pp/1/books.xml')
df.printSchema()

XML:

<?xml version="1.0"?>
<catalog>
   <book id="bk101"/>
   <book id="bk102"/>
   <book id="bk103"/>
   <book id="bk104"/>
</catalog>

Schema output:
root

Spark Version: 1.3

Cannot read stackoverflow data dump XML

I would like to parse XML from stackoverflow data dump, here an example of row:

<row Id="7" PostTypeId="2" ParentId="4" CreationDate="2008-07-31T22:17:57.883" Score="287" Body="An explicit cast to double isn't necessary." OwnerUserId="9" LastEditorUserId="967315" LastEditDate="2012-10-14T11:50:16.703" LastActivityDate="2012-10-14T11:50:16.703" CommentCount="0" />

But loading this XML, nothing happen (no error, and schema is empty), is it compatible with this library?

Thanks you,

Can't collect DataFrame with empty fields read with specified schema

I am having problems working with a DataFrame I loaded with a specified schema.

The problem looks related to the fix for #80, and could be the same as #93 as it produces the same error in some cases. (I think it is this one)

For the record I am using pyspark.

Short version

I can't collect() a certain DataFrame with missing values in it that I have read with a specified schema, and i get ArrayIndexOutOfBoundsException when trying to do certain operations with it.

Very long version

I have two files, File1 and File2:
File1

<row>
    <a>A</a>
    <b>
        <c>C</c>
    </b>
</row>

File2

<row>
    <a>A</a>
    <b>
    </b>
</row>

When I infer schemas from the two files separately I get this:
File1

root
 |-- a: string (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- c: string (nullable = true)

and File2

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)

As expected b is a struct in File1 and a string in File2. When I infer the schema from both files at the same time I get this schema:

root
 |-- a: string (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- #VALUE: string (nullable = true)
 |    |-- c: string (nullable = true)

At first I was surprised to see the #VALUE field in b. After thinking about it I think it makes sense, since b is a string in File2 and if a string value exists there it must go somewhere. However, I'm not interested in the #VALUE field and I want to use the schema from File1 on both files.

This is where the problems start

I save the first schema and specify it when reading both files again to a DataFrame df. I use treatEmptyValuesAsNulls=True and expect c to be null in the row from File2. When I try to use df I get errors.
The schema looks correct:

root
 |-- a: string (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- c: string (nullable = true)

And I can have a look at the contents with df.show():

+---+---+
|  a|  b|
+---+---+
|  A|[C]|
|  A| []|
+---+---+

This looks ok, but i would expect to see [null] and not [] in the second row. I think that is the cause of the following problem: Running df.collect() yields the nasty error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/dataframe.py", line 281, in collect
    return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
  File ".../spark/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File ".../spark/python/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File ".../spark/python/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File ".../spark/python/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
  File ".../spark/python/pyspark/sql/types.py", line 1159, in <lambda>
    return lambda *a: dataType.fromInternal(a)
  File ".../spark/python/pyspark/sql/types.py", line 568, in fromInternal
    return _create_row(self.names, values)
  File ".../spark/python/pyspark/sql/types.py", line 1163, in _create_row
    row = Row(*values)
  File ".../spark/python/pyspark/sql/types.py", line 1210, in __new__
    raise ValueError("No args or kwargs")
ValueError: (ValueError('No args or kwargs',), <function <lambda> at 0x10670ab18>, ())

When I for example try df.groupBy('b').count().show() i get this:

java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:227)

I think the problem is that the parser can't see that c is empty when b is empty.

For now I think I can work around this issue by using #VALUE in my schema.

Empty field causes scala.MatchError?

Hi,

I am having some issues reading a certain .xml file containing GPS data.

The file I am trying to read contains a row that I suppose is malformed. I supply my own schema and use failFast = false. Instead of dropping the row, it fails. Is this intended? Or am I using the library incorrectly?

I use this schema:

root
 |-- @lat: double (nullable = true)
 |-- @lon: double (nullable = true)
 |-- ele: double (nullable = true)
 |-- extensions: struct (nullable = true)
 |    |-- gpxtpx:TrackPointExtension: struct (nullable = true)
 |    |    |-- gpxtpx:atemp: long (nullable = true)
 |    |    |-- gpxtpx:cad: long (nullable = true)
 |    |    |-- gpxtpx:hr: long (nullable = true)
 |-- time: string (nullable = true)
 |-- athlete: string (nullable = true)
 |-- activity_type: string (nullable = true)

I try to parse this file:

<trkseg>
 <trkpt lat="59.9155180" lon="10.6962430">
  <ele>0.0</ele>
  <time>2013-01-24T06:18:43Z</time>
  <extensions>
   <gpxtpx:TrackPointExtension>
   </gpxtpx:TrackPointExtension>
  </extensions>
 </trkpt>
</trkseg>

I get this error:

scala.MatchError: 
      (of class java.lang.String)
    at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$com$databricks$spark$xml$parsers$StaxXmlParser$$convertObject$1.apply$mcVI$sp(StaxXmlParser.scala:218)

I think the problem may be caused by <gpxtpx:TrackPointExtension> being empty. Usually, it looks like this:

<gpxtpx:TrackPointExtension>
 <gpxtpx:hr>119</gpxtpx:hr>
 ...
</gpxtpx:TrackPointExtension>

In this case I'd perfer to have gpxtpx:TrackPointExtension be null. Alternatively, the row could be dropped. Is this a bug, or am I not seeing how I should do that?

Thanks in advance for taking the time.
Kind regards,
Lars

spark-xml fails to load xml files with comments

I found that spark-xml fails to load documents with comments in them. Has anyone else encountered this or I am doing something wrong. Here is a sample document I am having trouble with.

Sample Document with comments

<?xml version="1.0" encoding="utf-8"?>
<!--
  == Copyright (c) 2002-2015 All rights reserved.
  == Financial Products Markup Language is subject to the FpML public license.
  == A copy of this license is available at http://www.fpml.org/license/license.html
  -->
<dataDocument xmlns="http://www.fpml.org/FpML-5/master" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" fpmlVersion="5-8" xsi:schemaLocation="http://www.fpml.org/FpML-5/master ../../../schema/fpml-main.xsd http://www.w3.org/2000/09/xmldsig# ../../../schema/xmldsig-core-schema.xsd">
    <trade>
        <tradeHeader>
            <partyTradeIdentifier>
                <partyReference href="Party1"/>
                <tradeId tradeIdScheme="http://www.partyA.com/swaps/trade-id">Party19235</tradeId>
            </partyTradeIdentifier>
            <partyTradeIdentifier>
                <partyReference href="Party2"/>
                <tradeId tradeIdScheme="http://www.barclays.com/swaps/trade-id">Party22000</tradeId>
            </partyTradeIdentifier>
            <tradeDate>2014-11-19</tradeDate>
        </tradeHeader>
        <repo>
            <productType productTypeScheme="http://www.example.org/coding-scheme/product-taxonomy">InterestRate:Repo:BondRepo</productType>
            <productId productIdScheme="http://www.example.org/coding-scheme/product-id">Repo</productId>
            <fixedRateSchedule>
                <initialValue>0.85</initialValue>
            </fixedRateSchedule>
            <dayCountFraction>ACT/ACT.ICMA</dayCountFraction>
            <callingParty>AsDefinedInMasterAgreement</callingParty>
            <initialMargin>
                <marginType>Cash</marginType>
                <marginRatio>1.25</marginRatio>
            </initialMargin>
            <nearLeg>
                <buyerPartyReference href="Party1"/>
                <sellerPartyReference href="Party2"/>
                <settlementDate>
                    <adjustableDate>
                        <unadjustedDate>2014-11-03</unadjustedDate>
                        <dateAdjustments>
                            <businessDayConvention>NONE</businessDayConvention>
                        </dateAdjustments>
                    </adjustableDate>
                </settlementDate>
                <deliveryMethod>DeliveryVersusPayment</deliveryMethod>
                <settlementAmount>
                    <currency>USD</currency>
                    <amount>782855.55</amount>
                </settlementAmount>
                <collateral>
                    <nominalAmount>
                        <currency>USD</currency>
                        <amount>1000000</amount>
                    </nominalAmount>
                    <cleanPrice>97.8569437</cleanPrice>
                    <accruals>9.250000</accruals>
                    <dirtyPrice>107.1069437</dirtyPrice>
                    <assetReference href="XSJKL"/>
                </collateral>
            </nearLeg>
            <bond id="XSJKL">
                <instrumentId instrumentIdScheme="isin">JKL Corp Bond</instrumentId>
            </bond>
        </repo>
    </trade>
    <party id="Party1">
        <partyId>Party 1</partyId>
    </party>
    <party id="Party2">
        <partyId>Party 2</partyId>
    </party>
</dataDocument>

Datatype assumption causes document to be skipped.

Hi,

I am testing with v0.3.1 pulling through Maven. I encountered an issue where it appears that the module is assuming a datatype of a nested element based upon the first occurrence, when in fact the datatype is something else. I am only able to reproduce it using this XML document: http://docs.oasis-open.org/ubl/os-UBL-2.1/xml/UBL-Order-2.1-Example.xml

When the document is processed I receive this error: WARN StaxXmlParser$: Number format exception. Dropping malformed line: <Order xmlns=...

Within the document there is a PartyIdentification section with an ID of "PartyID123":

  <cac:PartyIdentification>
    <cbc:ID>PartyID123</cbc:ID>
  </cac:PartyIdentification>

If I change the ID to "123" instead of "PartyID123" the document is successfully processed. Alternatively, if instead of changing that element, I go to the prior PartyIdentification element and change its child ID element to include arbitrary text (notice the ABC appended to "7300070011115"):

  <cac:PartyIdentification>
    <cbc:ID schemeAgencyID="9" schemeID="GLN">7300070011115ABC</cbc:ID>
  </cac:PartyIdentification>

the document is then processed successfully. This is what makes me believe that some datatype is being determined/cached on the first occurrence, then attempted to be applied on subsequent occurrences. This behavior is not desirable because datatype should gracefully demote to a more generalized data type. For example if the first ID occurrence looked like a long type, but the second occurrence looks like a string, then the overall datatype should be assumed to be a string, not a long.

Correcting the datatype assumption will be important for more complex XML documents that do not reference a schema.

Thanks,
Dave

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.