Giter Site home page Giter Site logo

g-research / spark-extension Goto Github PK

View Code? Open in Web Editor NEW
170.0 21.0 28.0 798 KB

A library that provides useful extensions to Apache Spark and PySpark.

License: Apache License 2.0

Scala 73.85% Python 19.64% Shell 2.34% Java 4.18%
spark pyspark scala java python gr-oss

spark-extension's Introduction

Spark Extension

This project provides extensions to the Apache Spark project in Scala and Python:

Diff: A diff transformation and application for Datasets that computes the differences between two datasets, i.e. which rows to add, delete or change to get from one dataset to the other.

SortedGroups: A groupByKey transformation that groups rows by a key while providing a sorted iterator for each group. Similar to Dataset.groupByKey.flatMapGroups, but with order guarantees for the iterator.

Histogram: A histogram transformation that computes the histogram DataFrame for a value column.

Global Row Number: A withRowNumbers transformation that provides the global row number w.r.t. the current order of the Dataset, or any given order. In contrast to the existing SQL function row_number, which requires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.

Partitioned Writing: The writePartitionedBy action writes your Dataset partitioned and efficiently laid out with a single operation.

Inspect Parquet files: The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar to parquet-tools or parquet-cli by reading from a simple Spark data source. This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

Install Python packages into PySpark job: Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):

# noinspection PyUnresolvedReferences
from gresearch.spark import *

# using PIP
spark.install_pip_package("pandas==1.4.3", "pyarrow")
spark.install_pip_package("-r", "requirements.txt")

# using Poetry
spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python")

Fluent method call: T.call(transformation: T => R): R: Turns a transformation T => R, that is not part of T into a fluent method call on T. This allows writing fluent code like:

import uk.co.gresearch._

i.doThis()
 .doThat()
 .call(transformation)
 .doMore()

Fluent conditional method call: T.when(condition: Boolean).call(transformation: T => T): T: Perform a transformation fluently only if the given condition is true. This allows writing fluent code like:

import uk.co.gresearch._

i.doThis()
 .doThat()
 .when(condition).call(transformation)
 .doMore()

Shortcut for groupBy.as: Calling Dataset.groupBy(Column*).as[K, T] should be preferred over calling Dataset.groupByKey(V => K) whenever possible. The former allows Catalyst to exploit existing partitioning and ordering of the Dataset, while the latter hides from Catalyst which columns are used to create the keys. This can have a significant performance penalty.

Details:

The new column-expression-based groupByKey[K](Column*) method makes it easier to group by a column expression key. Instead of

ds.groupBy($"id").as[Int, V]

use:

ds.groupByKey[Int]($"id")

Backticks: backticks(string: String, strings: String*): String): Encloses the given column name with backticks (`) when needed. This is a handy way to ensure column names with special characters like dots (.) work with col() or select().

Count null values: count_null(e: Column): an aggregation function like count that counts null values in column e. This is equivalent to calling count(when(e.isNull, lit(1))).

.Net DateTime.Ticks: Convert .Net (C#, F#, Visual Basic) DateTime.Ticks into Spark timestamps, seconds and nanoseconds.

Available methods:
// Scala
dotNetTicksToTimestamp(Column): Column       // returns timestamp as TimestampType
dotNetTicksToUnixEpoch(Column): Column       // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos(Column): Column  // returns Unix epoch nanoseconds as LongType

The reverse is provided by (all return LongType .Net ticks):

// Scala
timestampToDotNetTicks(Column): Column
unixEpochToDotNetTicks(Column): Column
unixEpochNanosToDotNetTicks(Column): Column

These methods are also available in Python:

# Python
dotnet_ticks_to_timestamp(column_or_name)         # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch(column_or_name)        # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos(column_or_name)  # returns Unix epoch nanoseconds as LongType

timestamp_to_dotnet_ticks(column_or_name)
unix_epoch_to_dotnet_ticks(column_or_name)
unix_epoch_nanos_to_dotnet_ticks(column_or_name)

Spark temporary directory: Create a temporary directory that will be removed on Spark application shutdown.

Examples:

Scala:

import uk.co.gresearch.spark.createTemporaryDir

val dir = createTemporaryDir("prefix")

Python:

# noinspection PyUnresolvedReferences
from gresearch.spark import *

dir = spark.create_temporary_dir("prefix")

Spark job description: Set Spark job description for all Spark jobs within a context.

Examples:
import uk.co.gresearch.spark._

implicit val session: SparkSession = spark

withJobDescription("parquet file") {
  val df = spark.read.parquet("data.parquet")
  val count = appendJobDescription("count") {
    df.count
  }
  appendJobDescription("write") {
    df.write.csv("data.csv")
  }
}
Without job description With job description

Note that setting a description in one thread while calling the action (e.g. .count) in a different thread does not work, unless the different thread is spawned from the current thread after the description has been set.

Working example with parallel collections:

import java.util.concurrent.ForkJoinPool
import scala.collection.parallel.CollectionConverters.seqIsParallelizable
import scala.collection.parallel.ForkJoinTaskSupport

val files = Seq("data1.csv", "data2.csv").par

val counts = withJobDescription("Counting rows") {
  // new thread pool required to spawn new threads from this thread
  // so that the job description is actually used
  files.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool())
  files.map(filename => spark.read.csv(filename).count).sum
}(spark)

Using Spark Extension

The spark-extension package is available for all Spark 3.2, 3.3, 3.4 and 3.5 versions. Some earlier Spark versions may also be supported. The package version has the following semantics: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}:

  • SCALA_COMPAT_VERSION: Scala binary compatibility (minor) version. Available are 2.12 and 2.13.
  • SPARK_COMPAT_VERSION: Apache Spark binary compatibility (minor) version. Available are 3.2, 3.3, 3.4 and 3.5.
  • VERSION: The package version, e.g. 2.10.0.

SBT

Add this line to your build.sbt file:

libraryDependencies += "uk.co.gresearch.spark" %% "spark-extension" % "2.11.0-3.5"

Maven

Add this dependency to your pom.xml file:

<dependency>
  <groupId>uk.co.gresearch.spark</groupId>
  <artifactId>spark-extension_2.12</artifactId>
  <version>2.11.0-3.5</version>
</dependency>

Gradle

Add this dependency to your build.gradle file:

dependencies {
    implementation "uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5"
}

Spark Submit

Submit your Spark app with the Spark Extension dependency (version ≥1.1.0) as follows:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5 [jar]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark version.

Spark Shell

Launch a Spark Shell with the Spark Extension dependency (version ≥1.1.0) as follows:

spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark Shell version.

Python

PySpark API

Start a PySpark session with the Spark Extension dependency (version ≥1.1.0) as follows:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5") \
    .getOrCreate()

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your PySpark version.

PySpark REPL

Launch the Python Spark REPL with the Spark Extension dependency (version ≥1.1.0) as follows:

pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your PySpark version.

PySpark spark-submit

Run your Python scripts that use PySpark via spark-submit:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5 [script.py]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark version.

PyPi package (local Spark cluster only)

You may want to install the pyspark-extension python package from PyPi into your development environment. This provides you code completion, typing and test capabilities during your development phase.

Running your Python application on a Spark cluster will still require one of the above ways to add the Scala package to the Spark environment.

pip install pyspark-extension==2.11.0.3.5

Note: Pick the right Spark version (here 3.5) depending on your PySpark version.

Your favorite Data Science notebook

There are plenty of Data Science notebooks around. To use this library, add a jar dependency to your notebook using these Maven coordinates:

uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5

Or download the jar and place it on a filesystem where it is accessible by the notebook, and reference that jar file directly.

Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.

Build

You can build this project against different versions of Spark and Scala.

Switch Spark and Scala version

If you want to build for a Spark or Scala version different to what is defined in the pom.xml file, then run

sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]

For example, switch to Spark 3.5.0 and Scala 2.13.8 by running sh set-version.sh 3.5.0 2.13.8.

Build the Scala project

Then execute mvn package to create a jar from the sources. It can be found in target/.

Testing

Run the Scala tests via mvn test.

Setup Python environment

In order to run the Python tests, setup a Python environment as follows (replace [SCALA-COMPAT-VERSION] and [SPARK-COMPAT-VERSION] with the respective values):

virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest

Run Python tests

Run the Python tests via env PYTHONPATH=python:python/test python -m pytest python/test.

Note: you first have to build the Scala sources.

Build Python package

Run the following sequence of commands in the project root directory:

mkdir -p python/pyspark/jars/
cp -v target/spark-extension_*-*.jar python/pyspark/jars/
pip install build

Then execute python -m build python/ to create a whl from the sources. It can be found in python/dist/.

Publications

spark-extension's People

Contributors

demarillacizere avatar dependabot[bot] avatar enricomi avatar ezhou7 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-extension's Issues

PySpark - how return only changed rows

Is there a way to return only changed rows when comparing two DFs with .diff() ?

One could .filter() after applying .diff() and before calling .collect(). Does this imply on a local machine that only changed rows are loaded into memory?

An intermediate step of returning the full .diff() result is infeasible since the DFs have lots of rows.

Thanks

Diff: Add floating point precision comparison

Comparing datasets containing floats or doubles can flag up false-positive changes due to floating point precision. Provide diff methods that compare floats and doubles with a given epsilon or precision.

Pyspark - Import Error

Error:

Invalid Syntax while running import

FYI:

Spark Version: 3.1.1-amzn-0

Python Version - 3.6.10 | Anaconda, Inc.

Jar: uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5"

IMG_20240128_162821

Add `gresearch.spark.parquet` to Python package in `setup.py`

Hello there,

I am running into the trouble of reading user-defined file metadata from parquet files. Does this tool allow for reading metadata from multiple parquet files? I understand that the package pyarrow can read metadata, but would like something that can read and collect metadata from multiple parquet files into a dataframe for use.

Thanks!

DiffModes.Default throws error value withDiffMode is not a member of uk.co.gresearch.spark.diff.DiffOptions

scala - 2.11
spark - 2.4.0-cdh6.2.1
spark extension : 1.1.0-2.4

val options = DiffOptions.default
  .withDiffColumn("d")
  .withLeftColumnPrefix("l")
  .withRightColumnPrefix("r")
  .withInsertDiffValue("i")
  .withChangeDiffValue("c")
  .withDeleteDiffValue("d")
  .withNochangeDiffValue("n")
  .withChangeColumn("changes")
  .withDiffMode(DiffModes.Default)
  .withSparseMode(true)

showing compiler error,
value withDiffMode is not a member of uk.co.gresearch.spark.diff.DiffOptions
same issue with .withSparseMode"

Is there a way to handle comparison for Map<T, Struct>?

I am dealing with a complex nested field of this form:

|-- my_field: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- f1: integer (nullable = true)
 |    |    |-- [...]
 |    |    |-- fn: struct (nullable = true)
 |    |    |    |-- x: float (nullable = true)
 |    |    |    |-- y: float (nullable = true)

I was wondering if there is a way to pre-process the data or have a way to support a comparison of this kind of data.

I am already flattening the Dataframe as much as possible, but I could not find an easy way of handling map<struct>.

I am aware of .withComparator(DiffComparators.map(), ...) method, but I don't think we can express the struct here unless we know the full structure.

Is there a generic trick to support Map[Any, Any] even by resorting to some dirty trick like a string representation with sorting on lexicographic ordering or similar?

NoSuchMethodError: org.apache.spark.sql.internal.SQLConf$.get() Error

I have tried all over your versions for Scala 2.11 and everyone gives this error when trying to use diff.
I have the apache packages in the pom and use them for spark all the time with no issues.

xyz_data.diff(xyz_data2)

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf$.get()Lorg/apache/spark/sql/internal/SQLConf; at uk.co.gresearch.spark.diff.package$.handleConfiguredCaseSensitivity(package.scala:170) at uk.co.gresearch.spark.diff.DiffOptions.<init>(DiffOptions.scala:93) at uk.co.gresearch.spark.diff.DiffOptions$.<init>(DiffOptions.scala:216) at uk.co.gresearch.spark.diff.DiffOptions$.<clinit>(DiffOptions.scala) at uk.co.gresearch.spark.diff.Diff$.<init>(Diff.scala:305) at uk.co.gresearch.spark.diff.Diff$.<clinit>(Diff.scala) at uk.co.gresearch.spark.diff.package$DatasetDiff.diff(package.scala:86) at com.xyz.sh.spark.ds.app.XYZProducer$$anonfun$execute$7.apply(XYZProducer.scala:750) at com.xyz.sh.spark.ds.app.XYZProducer$$anonfun$execute$7.apply(XYZProducer.scala:713) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:256) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:255) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

On AWS - after Diff, Insert columns are all null

I found this project when trying to compare dataframes using pyspark, and it works appears to work great. I am seeing an issue when running this as part of an AWS Glue job with this jar - spark-extension_2.11-1.3.3-2.4.jar. Locally, it works fine and I cannot reproduce this issue.

If I create the dataframes using spark.createDataFrame().. and do a diff, I get the results I expect. When I read the dataframes from a glue context, after a diff, the Inserts columns are all null. I tried with and without unique identifiers, and I also reduced the number of rows and columns. I thought it might be Sparse mode causing this - so I tried both true and false on the sparse mode.

I created a simple case where the left dataframe has 1 row and the right dataframe has 2. One is the same row, and the diff identifies that and it identifies the Insert as well - but it loses all the values

The Deletes and No Changes rows are as expected, and currently, I only see the issue with Inserts. I have not tested Changes yes - been stuck on Inserts.

Any insights on what might be causing this?

Sample outlook
+----+---------+---------+------------------+-----------+-------------+--------------------+--------------+
|diff| col1|col2| col2|col4|col4| col5|col6|col7|col8|col9|
+----+---------+---------+------------------+-----------+-------------+--------------------+--------------+
| N|123456789| ABCDE|Test| 670| 4|Description...| 77.0| FL| null| null|
| I| null| null| null| null| null| null| null| null| null| null|
+----+---------+---------+------------------+-----------+-------------+--------------------+--------------+

Note: I also tried 1.3.0 and the same thing is happening (spark-extension_2.11-1.3.0-2.4)

Allow for extra columns on either side if they are ignored

The Diff API allows to specify ignored columns (differences in these columns are not considered differences) that are preserved in the result Dataset. However, such columns have to be part of both diff sides.

This can be generalizes by allowing those columns to exist only one one side.

Comparisons between DFs with MapFields fail comparison

df1.diff(df2), when a column has a type of Map, tries to use <=> for comparison which results in the following failure:

cannot resolve '(`state_options` <=> `state_options`)' due to data type mismatch: EqualNullSafe does not support ordering on type map<string,map<string,string>>;;
'Project [CASE WHEN isnull(____________________________________#1724) THEN Unexpected WHEN isnull(____________________________________#1853) THEN Absent WHEN NOT ((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((((accepting_new_bhp_patients#200 <=> accepting_new_bhp_patients#1091) AND (accreditation_body#201 <=> accreditation_body#1092)) AND (accreditation_expiration#202 <=> accreditation_expiration#1093)) AND (accreditation_status#203 <=> accreditation_status#1094)) AND (accreditation_type#204 <=> accreditation_type#1095)) AND (admitting_privileges_hospital_npi_1#205 <=> admitting_privileges_hospital_npi_1#1096)) AND (admitting_privileges_hospital_npi_2#206 <=> admitting_privileges_hospital_npi_2#1097)) AND (admitting_privileges_hospital_npi_3#207 <=> admitting_privileges_hospital_npi_3#1098)) AND (area_of_expertise#208 <=> area_of_expertise#1099)) AND (billing_email_address#209 <=> billing_email_address#1100)) AND (billing_fax#210 <=> billing_fax#1101)) AND (billing_phone#211 <=> billing_phone#1102)) AND (billing_tin#212 <=> billing_tin#1103)) AND (board_certification_date#213 <=> board_certification_date#1104)) AND (board_certification_status#214 <=> board_certification_status#1105)) AND (board_certification_type#215 <=> board_certification_type#1106)) AND (capitation_npi#216 <=> capitation_npi#1107)) AND (capitation_tin#217 <=> capitation_tin#1108)) AND (caqh#218 <=> caqh#1109)) AND (chp_provider#219 <=> chp_provider#1110)) AND (claim_remit_address_line1#220 <=> claim_remit_address_line1#1111)) AND (claim_remit_address_line2#221 <=> claim_remit_address_line2#1112)) AND (claim_remit_city#222 <=> claim_remit_city#1113)) AND (claim_remit_name#223 <=> claim_remit_name#1114)) AND (claim_remit_state#224 <=> claim_remit_state#1115)) AND (claim_remit_zip_code#225 <=> claim_remit_zip_code#1116)) AND (clinical_staff_language_spoken1#226 <=> clinical_staff_language_spoken1#1117)) AND (clinical_staff_language_spoken2#227 <=> clinical_staff_language_spoken2#1118)) AND (clinical_staff_language_spoken3#228 <=> clinical_staff_language_spoken3#1119)) AND (cred_status#229 <=> cred_status#1120)) AND (credentialing_contact_email_address#230 <=> credentialing_contact_email_address#1121)) AND (credentialing_date#231 <=> credentialing_date#1122)) AND (credentialing_method#232 <=> credentialing_method#1123)) AND (dea_number#233 <=> dea_number#1124)) AND (degree#234 <=> degree#1125)) AND (dob#235 <=> dob#1126)) AND (ecp_provider#236 <=> ecp_provider#1127)) AND (employed#237 <=> employed#1128)) AND (emr_type#238 <=> emr_type#1129)) AND (emr_version#239 <=> emr_version#1130)) AND (end_date#240 <=> end_date#1131)) AND (entity_type_code#241 <=> entity_type_code#1132)) AND (facility_address_line1#242 <=> facility_address_line1#1133)) AND (facility_address_line2#243 <=> facility_address_line2#1134)) AND (facility_city#244 <=> facility_city#1135)) AND (facility_clia#245 <=> facility_clia#1136)) AND (facility_county#246 <=> facility_county#1137)) AND (facility_fax#247 <=> facility_fax#1138)) AND (facility_name_dba#248 <=> facility_name_dba#1139)) AND (facility_state#249 <=> facility_state#1140)) AND (facility_telephone#250 <=> facility_telephone#1141)) AND (facility_type#251 <=> facility_type#1142)) AND (facility_zip_code#252 <=> facility_zip_code#1143)) AND (fee_schdule#253 <=> fee_schdule#1144)) AND (fee_schedule#254 <=> fee_schedule#1145)) AND (first_name#255 <=> first_name#1146)) AND (future_first_day_of_employment#256 <=> future_first_day_of_employment#1147)) AND (future_last_day_of_employment#257 <=> future_last_day_of_employment#1148)) AND (gender#258 <=> gender#1149)) AND (hide_in_directory#259 <=> hide_in_directory#1150)) AND (hiv_service_provider#260 <=> hiv_service_provider#1151)) AND (hospital_privileges#261 <=> hospital_privileges#1152)) AND (ipa_affiliation#262 <=> ipa_affiliation#1153)) AND (ipa_mailing_address_line1#263 <=> ipa_mailing_address_line1#1154)) AND (ipa_mailing_address_line2#264 <=> ipa_mailing_address_line2#1155)) AND (ipa_mailing_city#265 <=> ipa_mailing_city#1156)) AND (ipa_mailing_state#266 <=> ipa_mailing_state#1157)) AND (ipa_mailing_zip_code#267 <=> ipa_mailing_zip_code#1158)) AND (ipa_tin#268 <=> ipa_tin#1159)) AND (is_primary_practice_address#269 <=> is_primary_practice_address#1160)) AND (language_1#270 <=> language_1#1161)) AND (language_2#271 <=> language_2#1162)) AND (language_3#272 <=> language_3#1163)) AND (last_name#273 <=> last_name#1164)) AND (license#274 <=> license#1165)) AND (mailing_address_city#275 <=> mailing_address_city#1166)) AND (mailing_address_line1#276 <=> mailing_address_line1#1167)) AND (mailing_address_line2#277 <=> mailing_address_line2#1168)) AND (mailing_address_name#278 <=> mailing_address_name#1169)) AND (mailing_address_state#279 <=> mailing_address_state#1170)) AND (mailing_address_zip_code#280 <=> mailing_address_zip_code#1171)) AND (main_webpage_url#281 <=> main_webpage_url#1172)) AND (medicaid#282 <=> medicaid#1173)) AND (medicaid_facility_number#283 <=> medicaid_facility_number#1174)) AND (medicare#284 <=> medicare#1175)) AND (medicare_facility_number#285 <=> medicare_facility_number#1176)) AND (middle_name#286 <=> middle_name#1177)) AND (organization_name#287 <=> organization_name#1178)) AND (point_of_contact_email_address#288 <=> point_of_contact_email_address#1179)) AND (practice_address_line1#289 <=> practice_address_line1#1180)) AND (practice_address_line2#290 <=> practice_address_line2#1181)) AND (practice_city#291 <=> practice_city#1182)) AND (practice_county#292 <=> practice_county#1183)) AND (practice_fax#293 <=> practice_fax#1184)) AND (practice_manager_email_address#294 <=> practice_manager_email_address#1185)) AND (practice_name_dba#295 <=> practice_name_dba#1186)) AND (practice_npi#296 <=> practice_npi#68)) AND (practice_state#297 <=> practice_state#1187)) AND (practice_telephone#298 <=> practice_telephone#1188)) AND (practice_tin#299 <=> practice_tin#1189)) AND (practice_webpage_url#300 <=> practice_webpage_url#1190)) AND (practice_zip_code#301 <=> practice_zip_code#1191)) AND (preferred_communication_method#302 <=> preferred_communication_method#1192)) AND (provider_email_address#303 <=> provider_email_address#1193)) AND (provider_npi#304 <=> provider_npi#67)) AND (provider_type#305 <=> provider_type#1194)) AND (referral_required#306 <=> referral_required#1195)) AND (roster_specialty_override#307 <=> roster_specialty_override#1196)) AND (row_in_file#308 <=> row_in_file#1197)) AND (services_offered_text_or_url#309 <=> services_offered_text_or_url#1198)) AND (specialty#310 <=> specialty#1199)) AND (ssn#311 <=> ssn#1200)) AND (start_date#312 <=> start_date#1201)) AND (state_license#313 <=> state_license#1202)) AND (state_options#314 <=> state_options#1203)) AND (supervising_provider_group_or_ipa#315 <=> supervising_provider_group_or_ipa#1204)) AND (supervising_provider_npi#316 <=> supervising_provider_npi#1205)) AND (system_id#317 <=> system_id#1206)) AND (system_id_type#318 <=> system_id_type#1207)) AND (telemedicine_available#319 <=> telemedicine_available#1208)) AND (telemedicine_url#320 <=> telemedicine_url#1209)) AND (tertiary_care#321 <=> tertiary_care#1210)) AND (tpid#323 <=> tpid#1211)) AND (tradingpartnerid#324 <=> tradingpartnerid#1212)) AND (hc#325 <=> hc#1213)) AND (meta_data#326 <=> meta_data#1214)) THEN != ELSE ✓ END AS diff#1982, coalesce(tin#322, tin#66) AS tin#1983, CASE WHEN NOT (accepting_new_bhp_patients#200 <=> accepting_new_bhp_patients#1091) THEN accepting_new_bhp_patients#200 END AS Expected_accepting_new_bhp_patients#1984, CASE WHEN NOT (accepting_new_bhp_patients#200 <=> accepting_new_bhp_patients#1091) THEN accepting_new_bhp_patients#1091 END AS Outcome_accepting_new_bhp_patients#1985, CASE WHEN NOT (accreditation_body#201 <=> accreditation_body#1092) THEN accreditation_body#201 END AS Expected_accreditation_body#1986, CASE WHEN NOT (accreditation_body#201 <=> accreditation_body#1092) THEN accreditation_body#1092 END AS Outcome_accreditation_body#1987, CASE WHEN NOT (accreditation_expiration#202 <=> accreditation_expiration#1093) THEN accreditation_expiration#202 END AS Expected_accreditation_expiration#1988, CASE WHEN NOT (accreditation_expiration#202 <=> accreditation_expiration#1093) THEN accreditation_expiration#1093 END AS Outcome_accreditation_expiration#1989, CASE WHEN NOT (accreditation_status#203 <=> accreditation_status#1094) THEN accreditation_status#203 END AS Expected_accreditation_status#1990, CASE WHEN NOT (accreditation_status#203 <=> accreditation_status#1094) THEN accreditation_status#1094 END AS Outcome_accreditation_status#1991, CASE WHEN NOT (accreditation_type#204 <=> accreditation_type#1095) THEN accreditation_type#204 END AS Expected_accreditation_type#1992, CASE WHEN NOT (accreditation_type#204 <=> accreditation_type#1095) THEN accreditation_type#1095 END AS Outcome_accreditation_type#1993, CASE WHEN NOT (admitting_privileges_hospital_npi_1#205 <=> admitting_privileges_hospital_npi_1#1096) THEN admitting_privileges_hospital_npi_1#205 END AS Expected_admitting_privileges_hospital_npi_1#1994, CASE WHEN NOT (admitting_privileges_hospital_npi_1#205 <=> admitting_privileges_hospital_npi_1#1096) THEN admitting_privileges_hospital_npi_1#1096 END AS Outcome_admitting_privileges_hospital_npi_1#1995, CASE WHEN NOT (admitting_privileges_hospital_npi_2#206 <=> admitting_privileges_hospital_npi_2#1097) THEN admitting_privileges_hospital_npi_2#206 END AS Expected_admitting_privileges_hospital_npi_2#1996, CASE WHEN NOT (admitting_privileges_hospital_npi_2#206 <=> admitting_privileges_hospital_npi_2#1097) THEN admitting_privileges_hospital_npi_2#1097 END AS Outcome_admitting_privileges_hospital_npi_2#1997, CASE WHEN NOT (admitting_privileges_hospital_npi_3#207 <=> admitting_privileges_hospital_npi_3#1098) THEN admitting_privileges_hospital_npi_3#207 END AS Expected_admitting_privileges_hospital_npi_3#1998, CASE WHEN NOT (admitting_privileges_hospital_npi_3#207 <=> admitting_privileges_hospital_npi_3#1098) THEN admitting_privileges_hospital_npi_3#1098 END AS Outcome_admitting_privileges_hospital_npi_3#1999, CASE WHEN NOT (area_of_expertise#208 <=> area_of_expertise#1099) THEN area_of_expertise#208 END AS Expected_area_of_expertise#2000, CASE WHEN NOT (area_of_expertise#208 <=> area_of_expertise#1099) THEN area_of_expertise#1099 END AS Outcome_area_of_expertise#2001, CASE WHEN NOT (billing_email_address#209 <=> billing_email_address#1100) THEN billing_email_address#209 END AS Expected_billing_email_address#2002, CASE WHEN NOT (billing_email_address#209 <=> billing_email_address#1100) THEN billing_email_address#1100 END AS Outcome_billing_email_address#2003, CASE WHEN NOT (billing_fax#210 <=> billing_fax#1101) THEN billing_fax#210 END AS Expected_billing_fax#2004, CASE WHEN NOT (billing_fax#210 <=> billing_fax#1101) THEN billing_fax#1101 END AS Outcome_billing_fax#2005, ... 230 more fields]
+- Join FullOuter, (tin#322 <=> tin#66)
   :- Project [accepting_new_bhp_patients#200, accreditation_body#201, accreditation_expiration#202, accreditation_status#203, accreditation_type#204, admitting_privileges_hospital_npi_1#205, admitting_privileges_hospital_npi_2#206, admitting_privileges_hospital_npi_3#207, area_of_expertise#208, billing_email_address#209, billing_fax#210, billing_phone#211, billing_tin#212, board_certification_date#213, board_certification_status#214, board_certification_type#215, capitation_npi#216, capitation_tin#217, caqh#218, chp_provider#219, claim_remit_address_line1#220, claim_remit_address_line2#221, claim_remit_city#222, claim_remit_name#223, ... 104 more fields]
   :  +- LocalRelation [accepting_new_bhp_patients#200, accreditation_body#201, accreditation_expiration#202, accreditation_status#203, accreditation_type#204, admitting_privileges_hospital_npi_1#205, admitting_privileges_hospital_npi_2#206, admitting_privileges_hospital_npi_3#207, area_of_expertise#208, billing_email_address#209, billing_fax#210, billing_phone#211, billing_tin#212, board_certification_date#213, board_certification_status#214, board_certification_type#215, capitation_npi#216, capitation_tin#217, caqh#218, chp_provider#219, claim_remit_address_line1#220, claim_remit_address_line2#221, claim_remit_city#222, claim_remit_name#223, ... 103 more fields]
   +- Project [accepting_new_bhp_patients#1091, accreditation_body#1092, accreditation_expiration#1093, accreditation_status#1094, accreditation_type#1095, admitting_privileges_hospital_npi_1#1096, admitting_privileges_hospital_npi_2#1097, admitting_privileges_hospital_npi_3#1098, area_of_expertise#1099, billing_email_address#1100, billing_fax#1101, billing_phone#1102, billing_tin#1103, board_certification_date#1104, board_certification_status#1105, board_certification_type#1106, capitation_npi#1107, capitation_tin#1108, caqh#1109, chp_provider#1110, claim_remit_address_line1#1111, claim_remit_address_line2#1112, claim_remit_city#1113, claim_remit_name#1114, ... 104 more fields]
      +- Project [cast(null as string) AS accepting_new_bhp_patients#1091, cast(null as string) AS accreditation_body#1092, cast(null as string) AS accreditation_expiration#1093, cast(null as string) AS accreditation_status#1094, cast(null as string) AS accreditation_type#1095, cast(null as string) AS admitting_privileges_hospital_npi_1#1096, cast(null as string) AS admitting_privileges_hospital_npi_2#1097, cast(null as string) AS admitting_privileges_hospital_npi_3#1098, cast(null as string) AS area_of_expertise#1099, cast(null as string) AS billing_email_address#1100, cast(null as string) AS billing_fax#1101, cast(null as string) AS billing_phone#1102, cast(null as string) AS billing_tin#1103, cast(null as string) AS board_certification_date#1104, cast(null as string) AS board_certification_status#1105, cast(null as string) AS board_certification_type#1106, cast(null as string) AS capitation_npi#1107, cast(null as string) AS capitation_tin#1108, cast(null as string) AS caqh#1109, cast(null as string) AS chp_provider#1110, cast(null as string) AS claim_remit_address_line1#1111, cast(null as string) AS claim_remit_address_line2#1112, cast(null as string) AS claim_remit_city#1113, cast(null as string) AS claim_remit_name#1114, ... 103 more fields]
         +- LogicalRDD [this_column_is_not_normalized#65, tin#66, provider_npi#67, practice_npi#68], false

MapComparator is sensitive to key order

As pointed out in #184 (comment), this example shows that the map comparator identifies a Change where it should flag No change:

val left = Seq((1, """{"foo":"__foo__","bar":"__bar__"}""")).toDF("id", "json")
val right = Seq((1, """{"bar":"__bar__","foo":"__foo__"}""")).toDF("id", "json")
val leftMap = left.select($"id", from_json($"json", MapType(StringType, StringType)))
val rightMap = right.select($"id", from_json($"json", MapType(StringType, StringType)))
val options = DiffOptions.default.withComparator(DiffComparators.map[String, String], MapType(StringType, StringType))
new Differ(options).diff(leftMap, rightMap, "id").show
+----+---+--------------------------------+--------------------------------+
|diff|id |left_entries                    |right_entries                   |
+----+---+--------------------------------+--------------------------------+
|C   |1  |{foo -> __foo__, bar -> __bar__}|{bar -> __bar__, foo -> __foo__}|
+----+---+--------------------------------+--------------------------------+

Can we use this library in a Java project?

Can we use this library, specifically the diff in java. Right now when I import the dependency and do a diff on an expected vs actual data sets, the diff is not getting recognized as amethod.

Below is the method signature I am using this on:
image

I have imported the package as :
import uk.co.gresearch.spark.diff.*;

Comparators error when using pyspark

Trying to run example in documentation using pyspark but keep getting the following error -
AttributeError: 'DiffOptions' object has no attribute 'withComparator' .

Running this in a Glue notebook with Spark version 3.3 and spark-extension_2.12-2.8.0. Same issue when upgrading to spark-extension_2.13-2.11.0. Is this method supported for the python api?

Create 2 dataframes

df_1 = spark.createDataFrame([
    Row(id=1, value=1.0),
    Row(id=2, value=2.0),
    Row(id=3, value=3.0),
])


df_2 = spark.createDataFrame([
    Row(id=1, value=1.0),
    Row(id=2, value=2.02),
    Row(id=3, value=3.05),
])

Run Comparator method

from pyspark.sql.types import DoubleType
from gresearch.spark.diff import DiffOptions, DiffMode, DiffComparators

options = DiffOptions().with_change_column("changes")\
                       .withComparator(DiffComparators.epsilon(0.01).asRelative().asInclusive(), DoubleType)

df_1.diff_with_options(df_2, options, "id").show()

Error - AttributeError: 'DiffOptions' object has no attribute 'withComparator'

Allow diff values in DiffOptions to be empty

Diff values can be empty string, but must be unique. Currently,an empty string throws:

java.lang.IllegalArgumentException: requirement failed: Change diff value must not be empty
  at scala.Predef$.require(Predef.scala:281)
  at uk.co.gresearch.spark.diff.DiffOptions.<init>(DiffOptions.scala:36)
  ... 49 elided

Spark extension not working with 3.5.1

Hey all, can it be that the spark extension does not support spark 3.5.1. Once I updated I get below error message. Loading and saving parquet tables works as normal. On spark 3.5.0 it worked without any issue :)

Thanks already :)

Screenshot_2024-03-06-20-04-22-506_com google android youtube

DiffOption withChangeColumn("changes") throws datatype mismatch error

Scala version 2.11
maven : spark extension_2.11
1.1.0-2.4

`val originalDF = Seq((1,"gaurav","jaipur",550,70000),(2,"sunil","noida",600,80000),(3,"rishi","ahmedabad",510,65000))
                .toDF("id","name","city","credit_score","credit_limit")
val changedDF= Seq((1,"gaurav","jaipur",550,70000),(2,"sunil","noida",650,90000),(4,"Joshua","cochin",612,85000))
                .toDF("id","name","city","credit_score","credit_limit")
val options = DiffOptions.default.withChangeColumn("changes") 
val diff = originalDF.diff(changedDF, options, "id")
diff.show(false)`

throws error,

Exception in thread main : org.spark.sql.analysisException : cannot resolve 'concat(CASE WHEN (`name`<=>`name`) THEN array() ELSE array(`name) END, CASE WHEN(`city` <=>`city`)..
due to datatype mismatch : input to function concat should have StringType or BinaryType but it's [array<string>, array<string>,array<string>, array<string>];;

PySpark - Diff Epsilon Inclusive vs Exclusive

Pyspark

Is there anyway to mention Inclusive <= or Exclusive < for DiffComparators.epsilon in Pyspark ?

By Default what's the behaviour in pyspark ? Inclusive or Exclusive ?

options = DiffOptions() \
  .with_diff_column("diff") \
  .with_left_column_prefix("l") \
  .with_right_column_prefix("r") \
  .with_insert_diff_value("I") \
  .with_change_diff_value("Mismatch") \
  .with_delete_diff_value("D") \
  .with_nochange_diff_value("Match") \
  .with_change_column("changes") \
  .with_diff_mode(DiffMode.Default) \
  .with_data_type_comparator(DiffComparators.epsilon(sys.float_info.epsilon ** 0.5), DoubleType()) 

Scala with Spark

I could see the Methods for scala. Do we have anything for Python ?

Add option to ignore columns from compare

Adding option to ignore column list from comparing will be helpful when identifying inserts, updates, deletes for Slowly changing Dimension (SCD) type2 operations. Columns to be dropped could be audit value like effective_date etc... Having this option will streamline the process vs dropping audit columns from df and joining them back after the diff operation is performed.

left = sqlContext.createDataFrame([(1, "one", "2021-01-17"), (2, "two", "2021-09-17"), (3, "three", "2021-08-17")], ["id", "value", "effective_date"])
  
right = sqlContext.createDataFrame([(1, "one","2021-01-17"), (2, "Two", '2021-08-17'), (4, "four","2021-08-17")], ["id", "value", "effective_date"])

options = DiffOptions().with_change_column('changes')

result_df = left.diff_with_options(right,options, 'id')
display(result_df)

in above code effective_date is an audit variable which should be ignored from compare process. Currently we drop this value from the left and right df, then perform the compare, join the result_df back to get the audit values and then load data into destination.

Un-pivot / melt: implement un-pivot / melt function

Spark provides a pivot transformation (groupBy(Columns*).pivot(Column, Seq[Any])), but there is no equivalent opposite transformation like melt.

This melt, together with diff can be used to nicely solve these use cases: link link link link.

Can be implemented via arrayand explode:

import java.sql.Date

case class City(city: String, product: String, date: String, sale: Int, exp: Int, wastage: Int)

val df = Seq(
  City("city 1", "prod 1 ", "2017-09-29", 358, 975, 193),
  City("city 1", "prod 2 ", "2017-08-25", 50, 687, 201),
  City("city 1", "prod 3 ", "2017-09-09", 236, 431, 169),
  City("city 2", "prod 1 ", "2017-09-28", 358, 975, 193),
  City("city 2", "prod 2 ", "2017-08-24", 50, 687, 201),
  City("city 3", "prod 3 ", "2017-09-08", 236, 431, 169)
).toDS

df
  .select($"city", $"product", $"date", explode(array(array(lit("sale"), $"sale"), array(lit("exp"), $"exp"), array(lit("wastage"), $"wastage"))))
  .select($"city", $"product", $"date", $"col".getItem(0), $"col".getItem(1))
  .show

All non-primary columns are put into a single array column, which is the exploded. Each column is represented by a tuple of column name and value. After explode, the tuple is expanded into two columns. This becomes even more verbose for more columns. check sql query plan and performance

Add Diff app

Add an Spark application that diffs two files from hadoop filesystem and outputs statistics on stdout and diff to file.

This allows to use the package as a stand-alone Spark application ready to run on a cluster, e.g. as a step in a regression test pipeline.

the test-release.sh script is out of sync with repos from 2020

./test-release.sh

Testing Spark 3.3.0 and Scala 2.13

--2022-10-10 08:45:43-- https://archive.apache.org/dist/spark/spark-
Resolving archive.apache.org (archive.apache.org)... 2a01:4f8:172:2ec5::2, 138.201.131.134
Connecting to archive.apache.org (archive.apache.org)|2a01:4f8:172:2ec5::2|:443... connected.
HTTP request sent, awaiting response... 404 Not Found
2022-10-10 08:45:45 ERROR 404: Not Found.

The details: it's looking for a URL with this as part of it

                    hadoop="hadoop3.3-scala2.13"

But it is no longer "hadoop3.3" just "hadoop3".

I fixed that problem in my local copy and hit a further one:

./test-release.sh

Testing Spark 3.3.0 and Scala 2.13

--2022-10-10 15:53:46-- https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3-scala2.13.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 306344454 (292M) [application/x-gzip]
Saving to: ‘STDOUT’

 0K ........ ........ ........ ........ 10% 10.8M 24s

32768K ........ ........ ........ ........ 21% 9.68M 22s
65536K ........ ........ ........ ........ 32% 11.4M 19s
98304K ........ ........ ........ ........ 43% 15.5M 14s
131072K ........ ........ ........ ........ 54% 10.4M 12s
163840K ........ ........ ........ ........ 65% 7.43M 10s
196608K ........ ........ ........ ........ 76% 15.4M 6s
229376K ........ ........ ........ ........ 87% 18.0M 3s
262144K ........ ........ ........ ........ 98% 17.0M 0s
294912K .... 100% 13.7M=25s

2022-10-10 15:54:11 (11.9 MB/s) - written to stdout [306344454/306344454]

https://oss.sonatype.org/content/groups/staging/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/vn53el7/.ivy2/cache
The jars for the packages stored in: /home/vn53el7/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
uk.co.gresearch.spark#spark-extension_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b8ef18c9-94e4-4b7e-94f6-3584326fca51;1.0
confs: [default]
:: resolution report :: resolve 2352ms :: artifacts dl 0ms
:: modules in use:
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 0 | 0 |
---------------------------------------------------------------------

:: problems summary ::
:::: WARNINGS
module not found: uk.co.gresearch.spark#spark-extension_2.13;2.3.0-3.3-SNAPSHOT

Help please! This looks like an easy fix (for you) but it's keeping me from using your system! Thanks.

Diff: diff method throws IllegalArgumentException when specifying multiple id columns

Specifying multiple id columns in the Python diff method throws an IllegalArgumentException.

from gresearch.spark.diff import *

left = spark.createDataFrame([(1, 2, "one"), (2, 2, "two"), (3, 2, "three")], ["id_1", "id_2", "value"])
right = spark.createDataFrame([(1, 2, "one"), (2, 2, "Two"), (4, 2, "four")], ["id_1", "id_2", "value"])

left.diff(right, "id_1,id_2").show()

Result:

IllegalArgumentException: requirement failed: Some id columns do not exist: id_1,id_2 missing among id_1, id_2, value

Same exception in Scala.

import uk.co.gresearch.spark.diff._

val left = Seq((1, 2, "one"), (2, 2, "two"), (3, 2, "three")).toDF("id_1", "id_2", "value")
val right = Seq((1, 2, "one"), (2, 2, "Two"), (4, 2, "four")).toDF("id_1", "id_2", "value")

left.diff(right, "id_1,id_2").show()

Result:

IllegalArgumentException: requirement failed: Some id columns do not exist: id_1,id_2 missing among id_1, id_2, value

The Scala diff method with id columns as Seq works:

import uk.co.gresearch.spark.diff._

val left = Seq((1, 2, 3, "one"), (2, 2, 3, "two"), (3, 2, 3, "three")).toDF("id_1", "id_2", "info", "value")
val right = Seq((1, 1, 3, "one"), (2, 2, 3, "Two"), (4, 2, 3, "four")).toDF("id_1", "id_2", "info", "value")

left.diff(right, Seq("id_1", "id_2"), Seq("info")).show()

It seems that the id columns string value could not be converted to list.

Thank you for checking.

NoSuchMethodError: scala.Product.$init$

I ran:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.13:2.2.0-3.3 step4.py

And the result was:

Traceback (most recent call last):
  File "/home/vn53el7/msc-dataflow/data_pipelines/keyword.nope/tasks/step4.py", line 79, in <module>
    diffdf = sortdf.diff(sort_u_df, "SideBySide")
  File "/home/vn53el7/.ivy2/jars/uk.co.gresearch.spark_spark-extension_2.13-2.2.0-3.3.jar/gresearch/spark/diff/__init__.py", line 490, in diff
  File "/home/vn53el7/.ivy2/jars/uk.co.gresearch.spark_spark-extension_2.13-2.2.0-3.3.jar/gresearch/spark/diff/__init__.py", line 425, in diff
  File "/home/vn53el7/.ivy2/jars/uk.co.gresearch.spark_spark-extension_2.13-2.2.0-3.3.jar/gresearch/spark/diff/__init__.py", line 358, in _to_java
  File "/home/vn53el7/.ivy2/jars/uk.co.gresearch.spark_spark-extension_2.13-2.2.0-3.3.jar/gresearch/spark/diff/__init__.py", line 343, in _to_java
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.uk.co.gresearch.spark.diff.DiffOptions.
: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
	at uk.co.gresearch.spark.diff.DiffOptions.<init>(DiffOptions.scala:80)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

And it isn't the SIdebySide parameter, as I get the same result with that in place and missing.

Thanks.

Best way to create a DiffComparator for json comparison of json strings

Hi there, we have datasets with json variant columns, and need to do a json comparison on the strings they contain so that, for example, {"foo":"__foo__","bar":"__bar__"} -> {"bar":"__bar__","foo":"__foo__"} is reported as an N and not a C.

Additionally, we would ideally not have to specify the schema of the json and have it inferred automatically.

What's the best way to approach this? I've tried creating a DiffComparator like this:

 val jsonComparator: DiffComparator =
    (left: Column, right: Column) =>
      from_json(left, schema_of_json(left)) <=>
        from_json(right, schema_of_json(right))

But keep getting this error:

[info]   org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json/schema_of_csv functions instead of schema_of_json(`json_column`)
[info]   at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:40)
[info]   at org.apache.spark.sql.catalyst.expressions.JsonToStructs.<init>(jsonExpressions.scala:537)
[info]   at org.apache.spark.sql.functions$.from_json(functions.scala:4125)
[info]   at org.apache.spark.sql.functions$.from_json(functions.scala:4108)
[info]   at <snip>.$anonfun$jsonComparator$1(Diff.scala:52)
[info]   at uk.co.gresearch.spark.diff.Differ.$anonfun$doDiff$5(Diff.scala:214)
[info]   at scala.collection.immutable.List.map(List.scala:297)
[info]   at uk.co.gresearch.spark.diff.Differ.doDiff(Diff.scala:213)
[info]   at uk.co.gresearch.spark.diff.Differ.diff(Diff.scala:290)
[info]   at uk.co.gresearch.spark.diff.package$DatasetDiff.diff(package.scala:161)

I also tried creating an Eqiv comparator, but was unable to get even the example from the docs to work.

What am I missing?

Thanks!

Python test-release.py isn't working

Running python test-release.py gives:

file "./test-release.py", line 20, in <module>
    left.diff(right).show()
  File "/home/vn53el7/spark-extension/python/gresearch/spark/diff/__init__.py", line 518, in diff
    return Differ().diff(self, other, *id_columns)
  File "/home/vn53el7/spark-extension/python/gresearch/spark/diff/__init__.py", line 428, in diff
    jdiffer = self._to_java(jvm)
  File "/home/vn53el7/spark-extension/python/gresearch/spark/diff/__init__.py", line 361, in _to_java
    jdo = self._options._to_java(jvm)
  File "/home/vn53el7/spark-extension/python/gresearch/spark/diff/__init__.py", line 345, in _to_java
    self.diff_mode._to_java(jvm),
  File "/home/vn53el7/spark-extension/python/gresearch/spark/diff/__init__.py", line 34, in _to_java
    return jvm.uk.co.gresearch.spark.diff.DiffMode.withNameOption(self.name).get()
TypeError: 'JavaPackage' object is not callable

Diff Issue

Error:

AttributeError: 'DiffOptions' object has no attribute '_get_object_id'

Steps to recreate an issue:

Run the below code:

# required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import datetime
from pyspark.sql.types import StructType
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.types import *

import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

my_conf = SparkConf()
my_conf.set("spark.app.name", "w12_Spark_1")
my_conf.set("spark.master", "local[*]")
# my_conf.set("spark.driver.extraClassPath", "C:/Users/Vinoth/Downloads/sqljdbc_12.4.2.0_enu/sqljdbc_12.4/enu/jars/mssql-jdbc-12.4.2.jre8.jar")
my_conf.set('spark.jars.packages', 'uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5')

spark = SparkSession.builder.config(conf = my_conf).getOrCreate()
spark

from gresearch.spark.diff import *

left = spark.createDataFrame([(1, 1.0), (2, 2.0), (3, 3.0)]).toDF("id", "value")
right = spark.createDataFrame([(1, 1.0), (2, 2.02), (3, 3.05)]).toDF("id", "value")
left.diff(right, "id").show()

options = DiffOptions() \
  .with_diff_column("d") \
  .with_left_column_prefix("l") \
  .with_right_column_prefix("r") \
  .with_insert_diff_value("i") \
  .with_change_diff_value("c") \
  .with_delete_diff_value("d") \
  .with_nochange_diff_value("n") \
  .with_change_column("changes") \
  .with_diff_mode(DiffMode.Default) \
  .with_default_comparator(DiffComparators.epsilon(0.01)) \
  .with_data_type_comparator(DiffComparators.epsilon(0.001), DoubleType()) \
  .with_column_name_comparator(DiffComparators.epsilon(0.001), "float_column")



left.diff(right, options, "id").show()

Diff Failure on AWS Glue

Hi

Thanks for this awesome lib!

Hey, looking for some guidance on an issue I'm having

I'm trying to compare two dataframes for equality. It's not a requirement to know what's different just if they're different.

It works great when both dataframes are small (1m to 10m rows) rows, but fails when both frames are over 10 million. The same 59 columns exist in both frames. No crazy data types. Fairly sparse/Fair amount of NULLs

Any ideas or things I should try? Any additional details I can provide?

Additional Details

  • AWS Glue
  • 10 G.2X (32 gigs of ram, 8 vCPUs) Workers (i tried up to 50 workers)
  • Spark 3.1
  • spark-extension_2.12-2.1.0-3.1.jar

Code

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
import boto3

import rsg_gis_utils.core as rsg
import rsg_gis_utils.extract_core as rsg_extract

from gresearch.spark.diff import *

glue_client = boto3.client("glue")
ss = rsg.get_db_secret('pg_wh_db')


# glue and spark stuff
sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session


a_frame = (
    spark.read.format("jdbc")
    .option("url", ss["jdbc_url"])
    .option("user", ss["username"])
    .option("password", ss["password"])
    .option("dbtable", "(SELECT * FROM table WHERE something) as t")
    .option("driver", ss["driver"])
    .load()
)

b_frame = (
    spark.read.format("jdbc")
    .option("url", ss["jdbc_url"])
    .option("user", ss["username"])
    .option("password", ss["password"])
    .option("dbtable", "(SELECT * FROM table WHERE something) as t")
    .option("driver", ss["driver"])
    .load()
)

print('dataframes are the same', a_frame.diff(b_frame).where("diff != 'N'").count() == 0)

Errors that I've gotten
An error occurred while calling o118.count. Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 14) (10.226.42.94 executor 25): ExecutorLostFailure (executor 25 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

An error occurred while calling o110.count. Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (10.226.42.117 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 652032 ms

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.