Giter Site home page Giter Site logo

wallaroolabs / wally Goto Github PK

View Code? Open in Web Editor NEW
1.5K 71.0 67.0 274.85 MB

Distributed Stream Processing

Home Page: https://www.wallaroolabs.com

License: Apache License 2.0

Pony 60.40% Python 16.57% Shell 4.71% HCL 0.42% Makefile 7.63% Elixir 3.37% JavaScript 5.78% HTML 0.05% C 0.88% Dockerfile 0.13% SCSS 0.05%
stream-processing stream-processor stream-processing-engine pony-language python linux golang go framework api

wally's Introduction

CircleCI GitHub license GitHub version Groups.io

What is Wally?

Wally is a fast stream-processing framework. Wally makes it easy to react to data in real-time. By eliminating infrastructure complexity, going from prototype to production has never been simpler.

When we set out to build Wally, we had several high-level goals in mind:

  • Create a dependable and resilient distributed computing framework
  • Take care of the complexities of distributed computing "plumbing," allowing developers to focus on their business logic
  • Provide high-performance & low-latency data processing
  • Be portable and deploy easily (i.e., run on-prem or any cloud)
  • Manage in-memory state for the application
  • Allow applications to scale as needed, even when they are live and up-and-running

Getting Started

Wally can be installed via our handy Wallaroo Up command. Check out our installation page to learn more.

APIs

The primary API for Wally is written in Pony. Wally applications are written using this Pony API.

Usage

Once you've installed Wally, Take a look at some of our examples. A great place to start are our word_count or market spread examples in Pony.

"""
Word Count App
"""
use "assert"
use "buffered"
use "collections"
use "net"
use "serialise"
use "wallaroo_labs/bytes"
use "wallaroo"
use "wallaroo_labs/logging"
use "wallaroo_labs/mort"
use "wallaroo_labs/time"
use "wallaroo/core/common"
use "wallaroo/core/metrics"
use "wallaroo/core/sink/tcp_sink"
use "wallaroo/core/source"
use "wallaroo/core/source/tcp_source"
use "wallaroo/core/state"
use "wallaroo/core/topology"

actor Main
  new create(env: Env) =>
    Log.set_defaults()
    try
      let pipeline = recover val
        let lines = Wallaroo.source[String]("Word Count",
          TCPSourceConfig[String].from_options(StringFrameHandler,
                TCPSourceConfigCLIParser("Word Count", env.args)?, 1))

        lines
          .to[String](Split)
          .key_by(ExtractWord)
          .to[RunningTotal](AddCount)
          .to_sink(TCPSinkConfig[RunningTotal].from_options(
            RunningTotalEncoder, TCPSinkConfigCLIParser(env.args)?(0)?))
      end
      Wallaroo.build_application(env, "Word Count", pipeline)
    else
      env.err.print("Couldn't build topology")
    end

primitive Split is StatelessComputation[String, String]
  fun name(): String => "Split"

  fun apply(s: String): Array[String] val =>
    let punctuation = """ !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ """
    let words = recover trn Array[String] end
    for line in s.split("\n").values() do
      let cleaned =
        recover val s.clone().>lower().>lstrip(punctuation)
          .>rstrip(punctuation) end
      for word in cleaned.split(punctuation).values() do
        words.push(word)
      end
    end
    consume words

class val RunningTotal
  let word: String
  let count: U64

  new val create(w: String, c: U64) =>
    word = w
    count = c

class WordTotal is State
  var count: U64

  new create(c: U64) =>
    count = c

primitive AddCount is StateComputation[String, RunningTotal, WordTotal]
  fun name(): String => "Add Count"

  fun apply(word: String, state: WordTotal): RunningTotal =>
    state.count = state.count + 1
    RunningTotal(word, state.count)

  fun initial_state(): WordTotal =>
    WordTotal(0)

primitive StringFrameHandler is FramedSourceHandler[String]
  fun header_length(): USize =>
    4

  fun payload_length(data: Array[U8] iso): USize ? =>
    Bytes.to_u32(data(0)?, data(1)?, data(2)?, data(3)?).usize()

  fun decode(data: Array[U8] val): String =>
    String.from_array(data)

primitive ExtractWord
  fun apply(input: String): Key =>
    input

primitive RunningTotalEncoder
  fun apply(t: RunningTotal, wb: Writer = Writer): Array[ByteSeq] val =>
    let result =
      recover val
        String().>append(t.word).>append(", ").>append(t.count.string())
          .>append("\n")
      end
    wb.write(result)

    wb.done()

Documentation

Are you the sort who just wants to get going? Dive right into our documentation then! It will get you up and running with Wally.

Wally currently exists as a mono-repo. All the source that is Wally is located in this repo. See repo directory structure for more information.

You can also take a look at our FAQ.

Need Help?

Trying to figure out how to get started? Drop us a line:

Contributing

We welcome contributions. Please see our Contribution Guide

For your pull request to be accepted you will need to accept our Contributor License Agreement

License

Wally is licensed under the Apache version 2 license.

wally's People

Contributors

aturley avatar caj-larsson avatar camonz avatar christianwitts avatar cristaloleg avatar dipinhora avatar enilsen16 avatar gitbook-bot avatar goldsteink avatar jonbrwn avatar jtfmumm avatar lispmeister avatar nisanharamati avatar nitbix avatar numbermumbler avatar ppat avatar pvmsikrsna avatar pzel avatar rachelblasucci avatar remh avatar seantallen avatar slfritchie avatar voxadam avatar wallaroolabs-wally avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

wally's Issues

Plug together types in a general way

  • Currently when inputs are connected to outputs, we have hardcoded the type as I32. But we need to allow more generality. The current approach won't work because the input and output types are opaque once steps have been registered.
  • The core of the problem is probably in the way the topology is initialized in leader-tcp.pony.

Session Recovery

[Sessions are reestablished, using message windows and acking, but there is currently a seg fault issue possibly due to attempts to deserialize incomplete or corrupted data]

Per TCP Session (sender-receiver pair)
Keep application level sequence numbers.
Ack on N reads from other side.

Monotonically increasing ids. Periodically ack.

If session is dropped, recover by getting last processed from receiver. Should be at least equal in value to our last ack’d.
Assuming this is just a network failure and not a node failure, the other side picks up and there’s no lost state and no lost data.

Can be done via application-level framing (ack messages or message windows)
Or via protocol-level framing (ack byte windows)

Needs #25
Needs #33

Needs #110

technically #33 is optional but we've sequenced that way.

Word Count in Pony

  • Takes Sentences from Giles.
    • Splits them on word boundaries (whitespace)
    • Count words
    • Outputs streaming count of words to Giles sink
  • Should be able to shard/partition words to different actors based on (word first-letter) key ranges
  • Map -> 1-to-many output
  • Reduce -> many-to-1 input
  • Shard by Lambda (key selector / ??) [user-defined partitioning function]
  • Counter data structure is an unsorted map

Needs #25
Needs #72
Needs #74

Market Spread: Reference Data

Current biz rule is we reject any trade that has no corresponding value for the symbol in market data. We should be able to load "start of day" reference values for every symbol such that they are loaded before any data starts running.


Client defines starting data set. When Buffy initializes topology, it feeds the appropriate partitioners maps of initialization data.
Read file into map in client app code. Pass map to Partition constructor. Partition uses map to pass correct constructor data to Step actors it creates.

Bootstrapping actors' initial state
Actor Creator is a thing that can be provided with a map of initial values per actor (on whatever distinguishes partitions)
There should be an option to choose between spinning up all actors in the initial values map immediately or on the fly (as messages arrive for each actor)

Should load initial symbol values from a file on topology initialization.

Needs #41

Metrics from Userland: Double and Divide Stats

[The Buffy side of this issue is complete. It has been tested with a mock Metrics Receiver to make sure TCP messages are being sent properly. Once the Metrics Receiver is ready, this should plug right in.]

  • Metrics Collector ->
    ** Outgoing Component (to Metrics Receiver) - 2 days
    ** Incoming Component (from actors in node) - 1 day
  • Throughput of stream at sink, node 1 to node 2, source
    ** Computation latency
    ** Non Computation latency
  • Ref: https://docs.google.com/document/d/1W3SoYVeF6gCNkwQgVvXMdpYheUyD-tENacGf_2_Pt_M/edit#
  • Metrics collector per node -> metrics receiver -> monitoring hub
  • Collector Is responsible for aggregating for sending from all actors in a node and forwarding that to the Metrics Receiver Node

Metrics receiver actor should already be done. We need to integrate with it.

Depends on giles integration with double and divide
Most requires double and divide app proxy work

Needs #27
Needs #28
Needs #29
Needs #25

Dagon: Allow for more than 1 canary node

Canary nodes are nodes that are told to start once the cluster is ready. Currently only allows for a single one. For marketspread and other apps that use more than 1 giles, we will need to be able to start several.

Needs #31

Wesley test of Word Count

Initially runs against Python Word Count version. Should be able to be switched over to testing Pony Word Count version with no changes once that is available.

Control messages vs. Buffy messages

  • Decide if we should separate out control message decoding/encoding from Buffy message decoding/encoding. The currently reside together in tcp-messages.pony.
  • Control messages should be in their own subproject.
  • If yes, do it!

Add MapStep type

Add a new Step type that expects the computation to return a sequence, which it then sends to the output one at a time in distinct Messages

Metrics from Userland: Metrics Receiver

Metrics receiver actor, we can start a metrics receiver node via distributed startup
Metrics receiver dumps to standard out

not yet connected to divide and double application
Needs #21

D&D Metrics UI

UI to display Buffy Metrics

Displays:
Overall: (sum(actors))
Node(s): (sum(actors-in-node))

Needs #35

Make dagon config play nicely with Docker

Dagon takes config files by name

'dagon' and expect those to be in config/NAME.ini

this makes testing adhoc changes with Docker containers painful.
If we could make full paths instead this would be much easier to use.

Wesley Market Spread: Test 2

  • Load initial data
  • Market stream
  • Trades stream

Trades are partitioned into two sets: one that gets rejected, and another that gets accepted

Verify that all trades in Accept set are accepted and all trades in Reject set are rejected.

Input: Input data + Market state for symbol at the time of input (to determine accept/reject value)

Expected Output: symbol + accept/reject status

Needs #44

Giles Integration with Divide and Double: TCP Transport

Switch Giles to using TCP for transport.

  • Open issue: what is the protocol between nodes.
  • Will need further adjustment to integrate into Giles Source in Buffy when we get to that.
  • Talks to Buffy via OSC protocol.
  • Sends everything as a string

Giles receiver doesn't exit on SIGTERM

It wil, upon receiving SIGTERM if it was previously connected to a giles sender that has exited. If connected to a still running buffy node then it doesn't shut down.

Issue would be at least in part in the WithoutDagonCoordinator.finished() method.

Double and Divide App

  • 2 nodes. 1 has an actor that doubles. 1 has an actor that divides in half:
Node A: Internal Data Generator -> Doubler -> Proxy
Node B: Proxy -> Divider -> Outstream
  • Proxy Objects for internode actor links as well as ordinary intranode actor links

Needs #21

Spike unit tests

nothing in lib/spike has unit tests. if we get to alpha.8 and the code is stable, then we can feel ok that putting unit tests around the logic isn't wasted time. until then, its quite possible it all changes.

EXPERTS: @SeanTAllen

Wesley: Market Spread: Test 1

To be done against Python buffy that lives on the python-buffy branch.

Needs to be updated to work with Giles feeding in files (1 for market data, 1 for trade data). Update to Python code will be needed to make it handle 2 sources and to output data to Giles.

Use Pony wesley framework to test that for each trade data message that enters the system, that a corresponding accept or reject is received by the giles-receiver.

Single node Buffy clusters don't work

You get

StepManager: Could not forward message. If you set up a cluster that is

giles => leader => giles

requires at least 1 worker to function. Running with just a leader should be acceptable though

Distributed Startup

Compile everything together:

  • Startup as either leader or worker.
  • “1 leader”: Some number of workers
  • Leader sends command and control messages on known port to workers that can tell it what actors to start up.
  • Hardcode startup of actors. (send message of “type” and we can match and start actor of that type).
  • Use OSC protocol.

Market Spread: Logic

  • data can be generated using existing tools
  • 2 input streams: 1 market data, 1 trade data.
  • 1 output stream: accept/reject each order (no output for market data)
    • Same rules as Python version.
  • Market data, capturing latest value per symbol
  • Trade data we do magic calculation to see if it is in range.
    • Reject if out of range, accept if in range
  • Reject if symbol is for value without market data value
  • should have same stats and monitoring hub integration as word count/double divide

Needs #73
Needs #75

Add ability to spike an outgoing proxy

We only need to spike the incoming side, since we are simulating a dropped connection and it doesn't matter which side drops it.

Blocked by Segmentation Fault: 11 problem. So
Needs #40

Get CI working with pony stable

We are using pony stable to manage dependencies now. Need to update our CI and general build processes to use it.

For this, each pony app should have a

  • bundle.json of any non standard library dependencies that have to be fetched from github. (stable fetch)

when building rather than doing ponyc --arg we do stable env ponyc --args.

  • needs to work for local and CI builds.
  • needs to work with cross compilation

Integrate Spike into Network Boundaries

Spiking a session:

Wrap a spike notifier around a real TCP notifier.

Should be deterministic.

Buffer and delay on sending and receiving sides (output, input)
We wait X amount of time/data in spike and then hand off to real notifier.
Seed/Configure for amount of data to wait for before sending.

Drop a session and force it to go through session recovery
Close the socket. Don’t pass send/receive data through. FU.

Drop a session and kill the node
Close the socket. Don’t pass send/receive data through. FU. keep state not to allow any additional pass through.

Needs #21

Giles needs unit tests

At minimum, that data generation and message formatting works correctly.

IE given data X we get correct output message format

AND

given input message Y, we store into received correctly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.