isarn / isarn-sketches-spark Goto Github PK
View Code? Open in Web Editor NEWRoutines and data structures for using isarn-sketches idiomatically in Apache Spark
License: Apache License 2.0
Routines and data structures for using isarn-sketches idiomatically in Apache Spark
License: Apache License 2.0
Kullback-Leibler Divergence Estimation of Continuous Distributions
Fernando P ́erez-Cruz
Department of Electrical Engineering
Princeton University
Princeton, New Jersey 08544
Email: [email protected]
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.422.5121&rep=rep1&type=pdf
If I can figure out how to do this, it would make things cleaner, e.g. #8
I had second-level data aggregated to minute level using a TDigest, and wanted to further aggregate the persisted data frame to hour level and could not do so as there is no way to aggregate the TDigestSQL objects themselves in an aggregate function in Spark SQL.
To work around this, I copied and tweaked the TDigestUDAF which allowed me to aggregate TDigestSQL instances resulting from the first TDigestUDAF. Using this, you can get as high level as you need, re-aggregating the TDigest results.
// A t-digest is deterministic, but it is only statistically associative or commutative
// and spark will merge partition results in nondeterministic order. That makes
// the result of the aggregation statistically "deterministic" but not strictly so.
def deterministic: Boolean = false
def inputSchema: StructType = StructType(StructField("tdigest", TDigestUDT) :: Nil)
def bufferSchema: StructType = StructType(StructField("tdigest", TDigestUDT) :: Nil)
def dataType: DataType = TDigestUDT
def initialize(buf: MutableAggregationBuffer): Unit = {
buf(0) = TDigestSQL(TDigest.empty(deltaV, maxDiscreteV))
}
//Since every value will be a TDigestSQL, update acts just like merge and combines TDigestSQL
//objects by combining their inner TDigests.
def update(buf: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buf(0) = TDigestSQL(buf.getAs[TDigestSQL](0).tdigest ++ input.getAs[TDigestSQL](0).tdigest)
}
}
def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = {
buf1(0) = TDigestSQL(buf1.getAs[TDigestSQL](0).tdigest ++ buf2.getAs[TDigestSQL](0).tdigest)
}
def evaluate(buf: Row): Any = buf.getAs[TDigestSQL](0)
}
Might be a useful addition to the library; though you may have a better way of achieving the same :).
Hi, @erikerlandson , @JonathanTaws , I'd like to report a vulnerable dependency in org.isarnproject:isarn-sketches-spark_2.12:0.5.2-sp3.0.
I noticed that org.isarnproject:isarn-sketches-spark_2.12:0.5.2-sp3.0 directly depends on org.apache.spark:spark-core_2.12:3.0.1 in the pom. However, as shown in the following dependency graph, org.apache.spark:spark-core_2.12:3.0.1 sufferes from the vulnerability which the C library zstd(version:1.4.4) exposed: CVE-2021-24032.
org.apache.spark:spark-core_2.12:3.2.0 (>=3.2.0) has upgraded this vulnerable C library zstd
to the patch version 1.5.0.
Java build tools cannot report vulnerable C libraries, which may induce potential security issues to many downstream Java projects. Could you please upgrade this vulnerable dependency?
Thanks for your help~
Best regards,
Helen Parr
Easy in scala - have to figure out how to do this in pyspark layer.
I am trying to save the TDigest
object (in Python) to a format that I can use to recreate it. In the past (version isarn-sketches-spark_2.11:0.3.1-sp2.2-py2.7
), I was able to access the below parameters and save these, and then recreate a TDigest
by calling the constructor with those parameters.
https://github.com/isarn/isarn-sketches-spark/blob/v0.3.1/python/isarnproject/sketches/udt/tdigest.py#L115
With the latest version, aside from the renaming of some of parameters, the constructor for TDigest
does not accept the same parameters:
The __repr__
representation includes the nclusters
parameter, which is not in the constructor signature (rightfully), meaning I can't use the __repr__
string to construct a new object (e.g. by using eval(repr(tdigest))
) without some hacking around.
isarn-sketches-spark/python/isarnproject/sketches/spark/tdigest.py
Lines 239 to 241 in e7d3136
When using cdfInverse on a T-Digest created from a dataset, I get the following:
On this graph, I have a distribution of values with their probability on the y axis and on the x axis the actual values. I generate the value using cdfInverse, as follow:
xs = [td.cdfInverse(i/1000.) for i in range(1001)]
ys = [i/1000. for i in range(1001)]
When I dive deeper into the distribution, I can see that, even though my distribution should be monotonically increasing, I get some values in xs that are unordered, and thus I get the following result (look at 1k, just before 4k, and after 8k):
My assumption was that I would get only increasing values in my xs when generating them from the cdfInverse method, as I am increasing the value of the probability/percentile rank when looping.
A workaround for now is to generate the values, order them, and then call cdf on the ordered values, but it adds extra steps and I'm not sure this is the right method.
To give more example, here are the results of the following:
print(td.cdf(8517.442))
>> 0.6443371631522132
print(td.cdfInverse(0.629))
>> 8517.442135224697
print(td.cdfInverse(0.644))
>> 8509.811889971521
I would expect td.cdfInverse(0.629) to give a smaller value than td.cdfInverse(0.644)(as the probability of the former is smaller than the latter).
If you have a TDigest, for example:
from isarnproject.sketches.udaf.tdigest import *
from random import gauss
from pyspark.sql.types import *
data = sc.parallelize([[gauss(0,1)] for x in xrange(1000)]).toDF(StructType([StructField("x", DoubleType())]))
agg = data.agg(tdigestDoubleUDAF("x"))
td = agg.first()[0]
And you go ahead to broadcast it to the executors:
sc.broadcast(td)
The following error appears:
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/broadcast.py", line 83, in dump
pickle.dump(value, f, 2)
File "python/isarnproject/sketches/udt/tdigest.py", line 144, in __reduce__
AttributeError: 'int' object has no attribute 'self'
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-631244049223455329.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-631244049223455329.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/context.py", line 802, in broadcast
return Broadcast(self, value, self._pickled_broadcast_vars)
File "/usr/lib/spark/python/pyspark/broadcast.py", line 74, in __init__
self._path = self.dump(value, f)
File "/usr/lib/spark/python/pyspark/broadcast.py", line 90, in dump
raise pickle.PicklingError(msg)
PicklingError: Could not serialize broadcast: AttributeError: 'int' object has no attribute 'self'
This looks like it's due to a bug in the code of the TDigest class reduce method that is used for serialization, where there is a missing comma after maxDiscrete : https://github.com/isarn/isarn-sketches-spark/blob/develop/python/isarnproject/sketches/udt/tdigest.py#L144
Great library, but we need an official version for Spark 3.2. Thanks!
I am currently using TDigests to obtain cdf and inverse cdfs. I have a DataFrame that now contains TDigests via TDigestSQL. However i want to be able to get back the CDF etc via the Spark SQL DSL api.
So to do that I am using this:
val tDigestCdf: UserDefinedFunction =
UserDefinedFunction(getCDFFromTDigest _, DoubleType, Some(Seq(TDigestUDT, DoubleType)))
.withName("getCDFFromTDigest")
def getCDFFromTDigest(tdigestsql: TDigestSQL, percent: Double): Double = {
tdigestsql.tdigest.cdf(percent)
}
Then to use this i do:
dataFrame.select( tDigestCdf(col("mydigest)) )
But unfortunately i am then getting this error:
Wrapped by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ClassCastException: MySparkFunctions$$anonfun$3 cannot be cast to scala.Function1
org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:98)
org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:71)
Should it be possible to use the provided UDT in this fashion? I read something about UDT's not being exposed any more but that seems a bit weird to me.
The cdfInverse
method is supposed to accept qq
values in range [0,1]
as explained in the doc (note that the doc mentions q
, while qq
is the actual parameter used):
isarn-sketches-spark/python/isarnproject/sketches/spark/tdigest.py
Lines 305 to 309 in e7d3136
However, if I give the value 1
, I get an IndexError: list index out of range
. The error comes from here:
How to reproduce (using the example on the github page https://github.com/isarn/isarn-sketches-spark#sketch-a-numeric-column-python):
from random import gauss, randint
from isarnproject.sketches.spark.tdigest import *
data = spark.createDataFrame([[randint(1,10),gauss(0,1)] for x in range(1000)])
udf1 = tdigestIntUDF("_1", maxDiscrete = 25)
udf2 = tdigestDoubleUDF("_2", compression = 0.5)
agg = data.agg(udf1, udf2).first()
td = agg[0]
td.cdfInverse(1)
Result:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/mnt/tmp/spark-2bfe3869-9322-41af-b0e2-5a289d198492/userFiles-ebc89fbe-3b18-4851-9923-cc04eca13d6d/org.isarnproject_isarn-sketches-spark_2.12-0.5.0-sp3.0.jar/isarnproject/sketches/spark/tdigest.py", line 318, in cdfInverse
IndexError: list index out of range
This code runs isarn-sketches-spark_2.12:0.5.0-sp3.0
. However, in version isarn-sketches-spark_2.11:0.3.1-sp2.2-py2.7
, the cdfInverse
used to accept a value of 1
and run with it.
Hi
I was able to run my script in spark-shell, but when I submitted the job to run, it failed. The error message contains the following information:
Exception in thread "main" java.lang.AssertionError: assertion failed: unsafe symbol TDigestSQL (child of <none>) in runtime reflection universe at scala.reflect.internal.Symbols$Symbol.<init>(Symbols.scala:205) at scala.reflect.internal.Symbols$TypeSymbol.<init>(Symbols.scala:3030) at scala.reflect.internal.Symbols$ClassSymbol.<init>(Symbols.scala:3222) at scala.reflect.internal.Symbols$StubClassSymbol.<init>(Symbols.scala:3522) at scala.reflect.internal.Symbols$class.newStubSymbol(Symbols.scala:191) at scala.reflect.internal.SymbolTable.newStubSymbol(SymbolTable.scala:16) at scala.reflect.internal.Symbols$Symbol.newStubSymbol(Symbols.scala:521) at scala.reflect.internal.pickling.UnPickler$Scan.readExtSymbol$1(UnPickler.scala:258) at scala.reflect.internal.pickling.UnPickler$Scan.readSymbol(UnPickler.scala:286) at scala.reflect.internal.pickling.UnPickler$Scan.readSymbolRef(UnPickler.scala:651) at scala.reflect.internal.pickling.UnPickler$Scan.readType(UnPickler.scala:419) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef$$anonfun$7.apply(UnPickler.scala:734) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef$$anonfun$7.apply(UnPickler.scala:734) at scala.reflect.internal.pickling.UnPickler$Scan.at(UnPickler.scala:179) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef.completeInternal(UnPickler.scala:734) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef.complete(UnPickler.scala:761) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1535) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.info(SynchronizedSymbols.scala:195) at scala.reflect.internal.Symbols$SymbolContextApiImpl.typeSignature(Symbols.scala:160) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$typeSignature(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeSignature$1.apply(SynchronizedSymbols.scala:129) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeSignature$1.apply(SynchronizedSymbols.scala:129) at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeSignature(SynchronizedSymbols.scala:129) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.typeSignature(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.typeSignature(SynchronizedSymbols.scala:195) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$2.apply(ScalaReflection.scala:974) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$2.apply(ScalaReflection.scala:973) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:973) at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:631) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) at MultiTdigestToPostgres$.main(multi_feature_Tdigest.scala.scala:414) at MultiTdigestToPostgres.main(multi_feature_Tdigest.scala.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 2018-12-13 18:22:46 INFO SparkContext:54 - Invoking stop() from shutdown hook
Any clue of what had caused that? Any help would be greatly appreciated.
Richard
I had a request to serialize the Dataframes resulting from TDigest UDAFs directly, to something like parquet or other formats.
Hi,
I'm trying out this package, but consistently got this NoClassDefFoundError.
I'm using Spark 2.3, Scala 2.11, and tested in Spark shell. My Python version is 3.7.
spark-shell --jars /tmp/isarn-sketches-spark_2.11-0.3.1-sp2.3-py3.6.jar
scala> import org.isarnproject.sketches.udaf., org.apache.spark.isarnproject.sketches.udt.
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._
scala> val udaf = tdigestUDAF[Double]
java.lang.NoClassDefFoundError: org/isarnproject/sketches/TDigest$
at org.isarnproject.sketches.udaf.package$.tdigestUDAF(package.scala:43)
... 53 elided
Caused by: java.lang.ClassNotFoundException: org.isarnproject.sketches.TDigest$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 54 more
Would anyone be able to help?
Thanks very much in advance!
Richard
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.