Giter Site home page Giter Site logo

spark-timeseries's Introduction

Build Status

Time Series for Spark (The spark-ts Package)

A Scala / Java / Python library for interacting with time series data on Apache Spark.

Post questions and comments to the Google group, or email them directly to mailto:[email protected].

Note: The spark-ts library is no longer under active development by me (Sandy). I unfortunately no longer have bandwidth to develop features, answer all questions on the mailing list, or fix all bugs that are filed.

That said, I remain happy to review pull requests and do whatever I can to aid others in advancing the library.

Docs are available at http://sryza.github.io/spark-timeseries.

Or check out the Scaladoc, Javadoc, or Python doc.

The aim here is to provide

  • A set of abstractions for manipulating large time series data sets, similar to what's provided for smaller data sets in Pandas, Matlab, and R's zoo and xts packages.
  • Models, tests, and functions that enable dealing with time series from a statistical perspective, similar to what's provided in StatsModels and a variety of Matlab and R packages.

The library sits on a few other excellent Java and Scala libraries.

Using this Repo

Building

We use Maven for building Java / Scala. To compile, run tests, and build jars:

mvn package

To run Python tests (requires nose):

cd python
export SPARK_HOME=<location of local Spark installation>
nosetests

Running

To run a spark-shell with spark-ts and its dependencies on the classpath:

spark-shell --jars target/sparkts-$VERSION-SNAPSHOT-jar-with-dependencies.jar

Releasing

To publish docs, easiest is to clone a separate version of this repo in some location we'll refer to as DOCS_REPO. Then:

# Build main doc
mvn site -Ddependency.locations.enabled=false

# Build scaladoc
mvn scala:doc

# Build javadoc
mvn javadoc:javadoc

# Build Python doc
cd python
export SPARK_HOME=<location of local Spark installation>
export PYTHONPATH=$PYTHONPATH::$SPARK_HOME/python:$SPARK_HOME/python/lib/*
make html
cd ..

cp -r target/site/* $DOCS_REPO
cp -r python/build/html/ $DOCS_REPO/pydoc
cd $DOCS_REPO
git checkout gh-pages
git add -A
git commit -m "Some message that includes the hash of the relevant commit in master"
git push origin gh-pages

To build a Python source distribution, first build with Maven, then:

cp target/sparkts-$VERSION-SNAPSHOT-jar-with-dependencies.jar python/sparkts/
cd python
python setup.py sdist

To release Java/Scala packages (based on http://oryxproject.github.io/oryx/docs/how-to-release.html):

mvn -Darguments="-DskipTests" -DreleaseVersion=$VERSION \
    -DdevelopmentVersion=$VERSION-SNAPSHOT release:prepare

mvn -s private-settings.xml -Darguments="-DskipTests" release:perform

To release Python packages (based on http://peterdowns.com/posts/first-time-with-pypi.html):

python setup.py register -r pypi
python setup.py sdist upload -r pypi

spark-timeseries's People

Contributors

ahmed-mahran avatar cdalzell avatar dmsuehir avatar ekote avatar gliptak avatar josepablocam avatar jrachiele avatar lnicalo avatar mbaddar1 avatar mbaddar2 avatar pegli avatar polymorphic avatar rcastbergw avatar rchanda avatar simonouellette35 avatar souellette-faimdata avatar srowen avatar sryza avatar sunheehnus avatar tiagodinisfonseca avatar wgorman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-timeseries's Issues

Uberjar does not include dependency on nscala-time

When using the sparkts-0.1.0-SNAPSHOT-jar-with-dependencies library, this library does not contain the https://github.com/nscala-time/nscala-time dependency which is necessary according to the documentation for:

def uniform(start: github.nscala_time.time.Imports.DateTime, end: github.nscala_time.time.Imports.DateTime, frequency: Frequency): UniformDateTimeIndex

This complicates deployment of this tool in closed environments since this additional dependency must also be provided.

Doc path: http://cloudera.github.io/spark-timeseries/0.1.0/scaladocs/index.html#com.cloudera.sparkts.DateTimeIndex$

I am attempting to create a TimeSeriesRDD based on this example:

https://github.com/sryza/spark-ts-examples/blob/master/scala/src/main/scala/com/cloudera/tsexamples/Stocks.scala

Support partitioned DateTimeIndices?

This is a follow-up in the discussion that started in #82. The question is first of all whether or not we want to support individual time series whose cardinality exceeds 2^32 datapoints (which is not currently possible, due to the use of standard scala Arrays).

As discussed, there are quite a few real use cases we (contributors) are facing that would require being able to handle time series with more than 2^32 data points.

My suggestion is to add support, rather than replace the current design (because for use cases where a time series fits in memory, it would be sub-optimal to partition them along the time axis anyway), for a notion of partitioned date time indices which will allow us to have time series partitioned across machines along the time axis. @sryza I know you are reticent about this.

Instruments? Factors?

When trying to run the Historical VAR example I stumbled over the missing 'instruments' directory, and 'factors' as well. I can infer what files might go in the 'instruments' directory ( historical files from Yahoo - they are being parsed that way ) but the contents of 'factors' is more of a mystery. A small example of data that goes there would be most appreciated. Maybe just enough detail that we can extrapolate from.

Thanks, this is really exciting stuff !

Investigate integration of Scalation primitives and expose them to RDDs

Scalation, or, Scalable Simulation, has a pretty robust set of primitives for modeling time series data. They are porting a few of the more complicated algorithms from R to Scala. In fact, they are currently migrating from Google Code to Github and are working on some new model algorithms like ARIMA and Seasoned ARIMA. Perhaps we could get some mileage out of using them.

ARGARCH/GARCH: "MathIllegalStateException unable to bracket optimum in line search"

Awesome project, I'm excited to see it develop. Unfortunately, I'm receiving an optimization error when running both ARGARCH and GARCH models on both raw and centered/differenced data. Generating a simple DenseVector as follows:

val simpvec = DenseVector(0.1,-0.2, ..... ,0.00,-0.1)
val argarchModel= ARGARCH.fitModel(simpvec)

yields:
org.apache.commons.math3.exception.MathIllegalStateException: unable to bracket optimum in line search.

Any directions on how to proceed? I've tried different generated input lengths, including ts data generated from sampling an AR model, all seem to fail. I was able to generate a model using a similar test vector approach a few days ago. Thanks very much.

Support for microsecond and nanosecond frequencies

If we want to be thorough and finance-friendly, it is crucial that our Datetime indices support microsecond and nanosecond frequency (e.g. high-frequency financial data).

Unfortunately because right now we're leveraging Joda Time (which doesn't support higher than millisecond precision) -- we're limited. We should figure out a way to "break out of" the Joda Time limitation by using some wrapper object that uses Joda Time when convenient, but uses another internal structure when requiring higher precision time.

This is a concrete problem that I'm facing right now (need to handle high-frequency tick data), so I will address it soon but I'd like to hear everyone's thoughts on this.

ADF test p-values

I'm not sure I understand how the p-value of the adftest is calculated from the adf test statistic, and in fact it seems erroneous to me.

I would expect that if the adf test statistic is smaller than the critical value (around -2.8 according to the MacKinnon tables, in the 95%, constant but no trend scenario that i'm testing with), the p-value returned would be less than 0.05. That is the null hypothesis is of a unit root, and the smaller the test statistic, the more evidence we have to reject the null hypothesis (thus the smaller the p-value).

However what happens when I create an artificially mean reverting data vector and call adftest (with the "c" regression type, and 0 lag), is that adfstat = -3.73, yet p-value = 1.0. That doesn't sound right to me.

Can someone confirm?

support for ARIMAX - adding multiple exogeneous variables

Hi all,
I am currently modeling time-series data of channel sales using auto-ARIMA. I need to add exogeneous variables to the ARIMA model. The variables are inflation, unemployment rate. I don't see the current auto-ARIMA model supports exogeneous variables. In R, the exogeneous variable can be added as newxreg to the forecast or predict function. Are we going to support ARIMAX?

Thanks!

Differential function for IrregularDateTimeIndexed TimeSeries

Currently the difference() function simply subtracts the difference between the two contiguous values in the time series.

For TimeSeries built on an IrregularDateTimeIndex, this is not necessarily (and in fact, most commonly not) the wanted behaviour. Instead we want the differential to be:

(y(t+1) - y(t)) / ((t+1) - t) for a given base frequency.

Tick frequency?

I notice that increasingly we end up having to implement algorithms for a sequence and then the same algorithm for a time step. In theory a time series is something that is based on time, not on indexing (like a sequence is), but we seem to be using both more or less interchangeably.

Maybe if we added a Frequency, say TickFrequency, that represents indexing rather than a notion of time, we could unify each version of each algorithm (lagging, differencing, etc.) into one, thus simplifying the API?

Create a TimeSeries-RDD

Based on the idea of the TimeSeries Bucket, I suggest to define a "TimeSeriesRDD" for which several core functions are well defined, such as smoothing, filtering, "filling the gap", FFT, etc. The TimeSeriesRDD would help to abstract the storage details away and offers ways to project, aggregate or even expand the dataset.

In some cases we need event data, in other cases the spectrum is required in contextual normalization using the Time Resolved Relevance Index is an example for integration of additional structural information into the time series analysis procedure. Data wrangling is often not trivial. Because of this, it seems to be useful if a set of primitive transformations is already available as part of a specialized RDD.

I suggest to add algorithms for Bi-Variate and Multi-Variate Correlation Analysis.

Implementation of Event-Synchronization, Granger-Causality and Cross-Correlation (for pairs of time series) as well as Partial-Correlation analysis (for triples) are implemented in Java.

There is also a test-data generator to prep time series for plausibility tests.

Some of the algorithms and the concept of the TimeSeries-Bucket are also explained here: http://www.ijcaonline.org/archives/volume74/number17/12974-0233

Help regarding creating a TimeseriesRDD from a dataframe of "timestamp" and "values"

Hi,
Thanks for this wonderful implementation.

I have one confusion, I am using spark with Cassandra and already have dataframe which contains "timestamp" and "value" columns.

mydf = df.select(df("timestamp"), df("value")

I am trying to figure out, how to convert it into a TimeseriesRDD, a function I came across was "timeSeriesRDDFromObservations", but then it requires a datetimeindex as its first param. Any pointers would be really helpful.

Triple Exponential Smoothing Model

It's nice there's an exponentially weighted moving average. I think that could lend itself to more complex moving averages, like HoltWinters models, for instance, which include a weight for the season.

Identically timed transactions are not stored correctly in TimeSeriesRDD

val d1 = ZonedDateTime.parse("2015-08-03T12:30:40Z[GMT]")
val d2 = ZonedDateTime.parse("2015-08-04T12:30:40Z[GMT]")

val dates = Array(d1, d1, d2)

val rdd = (sc.parallelize(Seq(
  Row(new Timestamp(d1.toInstant.toEpochMilli), "A1", 10.0),
  Row(new Timestamp(d1.toInstant.toEpochMilli), "A1", 20.0),
  Row(new Timestamp(d2.toInstant.toEpochMilli), "A2", 30.0))))

val fields = Seq(StructField("timestamp", TimestampType, true),
  StructField("symbol", StringType, true),
  StructField("price", DoubleType, true))

val df = sqlContext.createDataFrame(rdd, StructType(fields))

val dtIndex = DateTimeIndex.irregular(dates)

val tsRdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, df, "timestamp", "symbol", "price")

tsRdd.cache()
tsRdd.take(2)

Yields:

res63: Array[(String, org.apache.spark.mllib.linalg.Vector)] = Array((A1,[NaN,20.0,NaN]), (A2,[NaN,NaN,30.0]))

Note that the first transaction for A1 does not appear. A time offset of at least 1ms appears necessary for the transaction to be correctly stored.

Bug in frequency calculations

Period.getXs() methods don't return the total number of Xs i.e. don't convert the whole period into units of X's

For example, assume a period of (1Year, 1Month, 1Day);
period.getDays returns 1
period.getMonths returns 1
period.getYears returns 1

Bug in differences()

I believe there is a bug in differences. I tested the function with the following input data matrix:

5 1 22
6 2 21
7.5 4 19
6.5 8 16
4.5 13 11
6.6 7 3

calling differences(1) on this gives an output of 4 lines, rather than 5, and the difference of the two most recent observations is missing. Here is the output:

1 1 -1
1.5 2 -2
-1 4 -3
-2 5 -5

These values are correct, but I would also expect to see, as the last line:

2.1 -6 -8

Please confirm that this is indeed a bug.

Support Spark 1.6

It would be nice to add support to build SparkTS for Spark 1.6.
I have managed to build it skipping the tests.

# Adding the following dep:
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-math</artifactId>
      <version>2.2</version>
    </dependency>

mvn -Dspark.version=1.6.0-SNAPSHOT clean package

Regression with Auto Regressive Residuals

Based on
https://www.otexts.org/fpp/9/1
in this issue we will implement the model

Yt = A+Bi*Xi,t +nt
where nt (residuals) are assumed to be auto regressive process of a given order q AR(q)
the steps are
1-estimate OLS regression model for given regressors Xt
2-Estimate parameters for AR(q) model , then update model coefficients in 1
3-Iterate between 1 and 2 till convergence.

@sryza comments ?

Lagging irregular time series

I am now faced with the problem of regressing on lags of a time series that is irregular. So next I will tackle the problem of lagging an irregular time series. I'd be curious to see if anyone has ideas or opinions on how to handle this problem.

A high-level overview of my proposal at the moment is as follows.

1 - The caller of the function specifies the base frequency to use. We can later add a function that automatically discovers the base frequency by looking at the minimal time gap that exists in the time series. But it's good in any case to have a manual override, so I will start there.

2 - The caller will provide an interpolation function that tells us what to do when there is no value at time (t - lag). This function will have the form (prev, next) => Double, where prev and next will represent the values surrounding time (t - lag) on the lower and upper boundaries.

3 - The resulting lags will look something like this if we specify lag periods of 0.1 and 0.2:

Time, Value
0.1, 100
0.2, 200
0.24, 123
0.25, 300
0.4, 400
0.5, 500

becomes:

Time, Lag0.1(Value), Lag0.2(Value)
0.3, 200, 100
0.4, f(300, 400), 200
0.5, 400, f(300, 400)

where of course f is the user-provided interpolation function.

Notice that the time instant 0.3 gets created even though it didn't exist in the previous dataset. Also, value (0.24, 123) gets completely ignored: there is loss of information because the specified base frequency is lower than the "true" base frequency of the dataset.

Documentation should include comprehensive examples of how to use the different constructs

I'm looking through the code to figure out how to use the DenseVector and the UniformTimeIndex, for instance, and I've had parse quite a bit of code to figure them out.

I think it would be massively useful to establish a good user manual or some type of documentation module now while the project is still young rather than trying to do it later when there's many more algorithms.

Bug in DatetimeIndex.locAtDateTime?

The comment says that if the datetime doesn't exist in the index, it returns a location of -1. However all the implementation does (in the irregular index) is call java.util.Arrays.binarySearch(), which returns, according to the documentation:

"This method returns index of the search key, if it is contained in the array, else it returns (-(insertion point) - 1). The insertion point is the point at which the key would be inserted into the array: the index of the first element greater than the key, or a.length if all elements in the array are less than the specified key."

Therefore it seems to actually be returning negative numbers, but potentially other than -1.

It's an easy fix, and I've fixed it locally, I just want to confirm that I'm not hallucinating before proceeding with this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.