Giter Site home page Giter Site logo

stark's Introduction

STARK - Spatio-Temporal Data Analytics on Spark

testing

STARK is a framework that tightly integrates with Apache Spark and add support for spatial and temporal data types and operations.

Installation

Using STARK is very simple. Just clone the repository and build the assembly file with

sbt assembly

The resulting assembly file can be found in $STARKDIR/target/scala-2.11/stark.jar. Just add this library to your classpath and STARK is ready to use.

Usage

STARK uses the STObject class as its main data structure. This class can hold your spatio-temporal data. STARK also has implicit conversion methods that add spatio-temporal operations to your RDD of STObject .

To be able to use the spatio-temporal operations, you have to have an Pair-RDD where the STObject is in first position:

import dbis.stark._
import org.apache.spark.SpatialRDD._

val countries = sc.textFile("/data/countries") // assume Schema ID;Name;WKT String
    .map(line => line.split(';'))
    .map(arr => (STObject(arr(2)), (arr(0).toInt, arr(1)) ) // ( STObject, (Int, String) )

// find all geometries that contain the given point    
val filtered = countries.contains(STObject("POINT( 50.681898 10.938838 )"))

Features

Spatial Partitioning

Currently STARK implements two spatial partitioners. The first one is a fixed grid partitioner that applies a grid over the data space where each grid cell corresponds to one partition. The second one is a cost based binary split partitioning where partitions are generated based on the number of contained elements.

When the geometry which the partitioner has to assign to a partition is not a point, we use the center point of that geometry. That means, although a polygon might span over multiple partitons, the assigned partition solely depends on the polygons center point.

The paritioners extend Spark's Partitioner interface and thus can be applied using the repartition method.

Fixed Grid partitioner

The fixed grid partitoner needs at least two parameters. The first one is the RDD to partition and the second one is the number of partitons per dimension (ppD).

val countries: RDD[( STObject, (Int, String) )] = ... // see above

val gridPartitioner = new SpatialGridPartitioner(countries, partitionsPerDimension = 10)

val partionedCountries = countries.partitionBy(gridPartitioner)
// further operations

In the above example, since the countries data uses two dimensional lat/lon coordinates, the partitionsPerDimension = 10 value results in a grid of 10 x 10 cells, i.e., 100 partitions.

Cost based Binary Space Partitioning

The BSP first divides the data space into quadratic cells of defined side length using a fixed grid. Then a histogram of the number of contained data items per cell is created. Along these cells a partition is split into two partitions of equal cost, which is the number of contained points, if the total number of points is greater than a given maximum cost.

The BSP requires at least three parameters: the RDD, the maximum cost, and the side lengths for the quadratic cells.

The advantage of the BSP is that the resulting partitions are of (almost) equal size, i.e., number of contained data items. This achieves an equal workload balance among the executors. However, partitioning may take longer than fixed grid partitioning.

val countries: RDD[( STObject, (Int, String) )] = ... // see above

val bsp = new BSPartitioner(countries, _sideLength = 0.5, _maxCostPerPartition = 1000)

val partionedCountries = countries.partitionBy(bsp)
// further operations

Indexing

To speed up query processing, STARK allows to optionally index the contents of partitions. As such an index structure STARK currently supports the R-tree that comes with JTS. To support for k nearest neighbor queries (see below) this tree implementeation was extended, based on the JTSPlus implementation.

STARK allows two modes for indexing: live and persistent. Both modes are transparent to further queries so that filter or join commands do not differ for unindexed use cases.

  • Live Indexing: The index is built upon execution for each partition, queried according to the current predicate, and then thrown away.
val countries: RDD[( STObject, (Int, String) )] = ... // see above
// order is the maximum number of elements in an R-tree node
val result = countries.liveIndex(order = 5).contains(STObject("POINT( 50.681898, 10.938838 )"))
// apply a spatial partitioning before Indexing
val bsp = new BSPartitioner(countries, _sideLength = 0.5, _maxCostPerPartition = 1000)
val result2 = countries.liveIndex(bsp, order = 5).contains(STObject("POINT( 50.681898, 10.938838 )"))
  • Persistent Indexing: Persistent indexing allows to create the index and write the indexed RDD to disk or HDFS. Then this stored index can be loaded again to save the cost for generating it. Storing can be done with saveAsObjectFile:
val countries: RDD[( STObject, (Int, String) )] = ... // see above
val bsp = new BSPartitioner(countries, _sideLength = 0.5, _maxCostPerPartition = 1000)
// order is the maximum number of elements in an R-tree node
val countriesIdx = countries.index(bsp, order = 5) // Schema: RDD[RTree[STObject, (STObject, (Int, String))]]
countriesIdx.saveAsObjectFile("/data/countries_idx")

The index is loaded using objectFile:

val countriesIdx = sc.objectFile("/data/countries_idx", 4) // sample no. partitions
val result = countriesIdx.contains(STObject("POINT( 50.681898, 10.938838 )"))

Operators

All operators can be executed on spatial partitioned or unpartitioned data. If a spatial partitioning was applied, the operators can omit partitions that cannot, e.g., contain a point based on the spatial bounds of the respective partition.

Filter

Supported predicates are:

  • rdd.intersects(STObject): Find all entries that intersect with the given object
  • rdd.contains(STObject): Find all entries that contain the given object
  • rdd.containedBy(STObject): Find all entries that are contained by the given object
  • rdd.withinDistance(STObject, maxDist, distFunc): Find all entries that are within a given maximum distance to the given object according to the given distance function

Join

Joins of two spatial RDD are performed using the join operator:

leftRDD.join(rightRDD, JoinPredicate.CONTAINS)

Clustering

STARK includes a clustering operator that implements DBSCAN. The operator needs to uniquely identify data objects during execution. Thus, your tuples need a field that identifies the tuple, similar to a primary key. When calling the clustering method, besides the ordinary parameters to DBSCAN the key-extractor function needs to be passed to extract that primary key field from your tuple.

Suppose you have a dataset with the following schema:

date_time: string id: string gps_long: double gps_lat: double wkt: string

To run clustering, create an RDD, transform it into a 2-tuple format and then call cluster function

val raw: RDD[(String, String, Double, Double, String)] = // see above

val spatialRDD = raw.keyBy(_._5).map{case (k,v) => (STObject(k), v)} // results in RDD[(STObject, (String, String, Double, Double, String))]
// meaning: (stobject, (date_time, id, gps_long, gps_lat, wkt))

val clusters = spatialRDD.cluster(epsilon = 0.5, minPts = 20, case { (g,v) => v._2 }) // where v._2 is id

If data is already a 2-tuple, where the second (in this case the Long) element is a primary key:

val raw = RDD[(String, Long)] = ...
val spatialRDD = raw.map{case (g,v) => (STObject(g), v)} // results in (STObject, Long)
val clusters = spatialRDD.cluster(20, 0.5, {case (g,v) => v }) // where v is id

k Nearest Neighbors

val qry = STObject("POINT( 50.681898, 10.938838 )")
val neighbors = rdd.kNN(qry, k = 8)

stark's People

Contributors

hag0r avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stark's Issues

Unresolved dependencies when building Stark

Hi, thanks for this tool!
I downloaded the zip file from GitHub and execute sbt assembly within Stark directory but I've got the attached Unresolved Dependencies error:

StartBuildingError.txt

It seems that set is searching sbt-assembly and sbt-scoverage with wrong Scala version (2.12) but I don't know how to solve this issue

Outputting incorrect data

I’ve successfully gotten the library to output data after taking in a RDD of points and a RDD of polygons, but after manually testing the results of the join (contains, and intersect) operations, the results don’t actually seem to be accurate when plotted on a map.

The RDDs are of the format

(STObject(WKTstring), (arr(id).toLong, WKTstring))

. The Point RDD has 10,000 items, while the Polygon RDD has 500,000+. My join command is
polygonsRDDA.join(pointsRDDA, JoinPredicate.CONTAINS)
I'm fairly certain the format is correct, as are the WKTstrings, since I'm getting a valid
[(polygon_id, WKT)(point_id, WKT)] output RDD, with substantial data.

Here is one row of the output:

[7968,POINT (77.2221885273425 28.5089347347766)]|
|[929587445047033467,POLYGON ((77.24398775026202 28.61936221830547, 77.24380536004901 28.61944234929979, 77.24360687658191 28.61956941895187, 77.24423987790942 28.620445327833295, 77.24442763254046 28.62033703364432, 77.24459392949939 28.620238127186894, 77.2441808693111 28.61965893767774, 77.24398775026202 28.61936221830547))]

Plugging into a WKT visualizer, you can observe that the polygon and point are in fact far away from eachother.

Any help would be appreciated!

Question about data size limits

Hi, first of all thanks for the application! I've been trying it out with different datasets and it works great with the smaller ones! But the application stalls with bigger datasets. My particular case is a dataset of 120GB with 2000 million records, and I want to run DBScan with an eps of 0.0001. I don't know if maybe I'm configuring the parameter ppd badly (with a value of 100 it stalls indefinitely, but with smaller values there seems to be a progress...even though it still hangs), or if it won't work with this large dataset and such a small eps.
Is there any chance that I'm configuring it wrong? Thanks in advance!

Question about Cluster operation

Hi, Thank you for this great application. I am using it for clustering certain geospatial data with lon/lat. I have a question about the unit of epsilon parameter in your DBSCAN implementation. If I want to set 0.5 km for eps, should I calculate the epsilon by 0.5 / kms_per_radian ? I have tried it and got memory overflow or exceed kryoserializer buffer size. I think the reason is the CellSize for BSPartitioner is too small due to the input epsilon value.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.