Giter Site home page Giter Site logo

storey's Introduction

Storey

CI

Storey is an asynchronous streaming library, for real time event processing and feature extraction.

In This Document

API Walkthrough

A Storey flow consist of steps linked together by the build_flow function, each doing it's designated work.

Supported Steps

Input Steps

  • Source -
  • AsyncSource -
  • ReadCSV -

Processing Steps

  • Filter -
  • Map -
  • FlatMap -
  • MapWithState -
  • Batch(max_events, timeout) - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.
  • Choice -
  • JoinWithV3IOTable -
  • JoinWithHttp -
  • AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None) - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.
  • QueryAggregationByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None) - Similar to to AggregateByKey, but this step is for serving only and does not aggregate the event.
  • Persist(cache) - Persists the data in cache to its associated storage by key.

Output Steps

  • Complete -
  • Reduce -
  • WriteToV3IOStream -

Usage Examples

Using Aggregates

The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.

from storey import build_flow, Source, Cache, V3ioDriver, AggregateByKey, FieldAggregator, Persist
from storey.dtypes import SlidingWindows

v3io_web_api = 'https://webapi.change-me.com'
v3io_acceess_key = '1284ne83-i262-46m6-9a23-810n41f169ea'
cache = Cache('/bigdata/my_features', V3ioDriver(v3io_web_api, v3io_acceess_key))

def enrich(event, state):
    if 'first_activity' not in state:
        state['first_activity'] = event.time
    event.body['time_since_activity'] = (event.time - state['first_activity']).seconds
    state['last_event'] = event.time
    event.body['total_activities'] = state['total_activities'] = state.get('total_activities', 0) + 1
    return event, state

controller = build_flow([
    Source(),
    MapWithState(cache, enrich, group_by_key=True, full_event=True),
    AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
                                    SlidingWindows(['1h','2h', '24h'], '10m')),
                    FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
                                    SlidingWindows(['1h','2h', '24h'], '10m')),
                    FieldAggregator("failed_activities", "activity", ["count"],
                                    SlidingWindows(['1h'], '10m'),
                                    aggr_filter=lambda element: element['activity_status'] == 'fail'))],
                   cache),
    Persist(cache),
    WriteToV3IOStream(V3ioDriver(v3io_web_api, v3io_acceess_key), 'features_stream')
]).run()

We can also create a serving function, which sole purpose is to read data from the feature store and emit it further

controller = build_flow([
    Source(),
    QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
                                           SlidingWindows(['1h','2h', '24h'], '10m')),
                           FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
                                           SlidingWindows(['1h','2h', '24h'], '10m')),
                           FieldAggregator("failed_activities", "activity", ["count"],
                                           SlidingWindows(['1h'], '10m'),
                                           aggr_filter=lambda element: element['activity_status'] == 'fail'))],
                           cache)
]).run()

storey's People

Contributors

taliguaz avatar tebeka avatar dinal avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.