Giter Site home page Giter Site logo

automi's Introduction

automi

A data stream processing API for Go (alpha)


GoDoc example workflow Go Report Card

Automi is an API for processing streams of data using idiomatic Go. Using Automi, programs can process streaming of data chunks by composing stages of operations that are applied to each element of the stream.

Concept

Automi streaming concepts


The Automi API expresses a stream with four primitives including:

  • An emitter: an in-memory, network, or file resource that can emit elements for streaming
  • The stream: represents a conduit whithin which data elements are streamed
  • Stream operations: code which can be attached to the stream to process streamed elements
  • A collector: an in-memory, network, or file resource that can collect streamed data.

Automi streams use Go channels internally to route data. This means Automi streams automatically support features such as buffering, automatic back-pressure queuing, and concurrency safety.

Using Automi

Now, let us explore some examples to see how easy it is to use Automi to stream and process data.

See all examples in the ./example directory.

Example: streaming from a slice into stdout

This first example shows how easy it is to compose and express stream operations with Automi. In this example, rune values are emitted from a slice and are streamed invidividually. Stream operator method Filter is applied to filter out unwanted rune values and the Sort operator method sorts the remaining items. Lastly, a collector is used to collect the result into an io.Writer and piped to stdout.

func main() {
	strm := stream.New([]rune("B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw"))

	strm.Filter(func(item rune) bool {
		return item >= 65 && item < (65+26)
	}).Map(func(item rune) string {
		return string(item) 
	}).Batch().Sort() 
	strm.Into(collectors.Writer(os.Stdout))

	if err := <-strm.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See the full source code.

How it works
  1. Create the stream with an emitter source. Automi supports several types of sources including channels, io.Reader, slices, etc. (see list of emitters below). Each element in the slice will be streamed individually.
strm := stream.New([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw`))
  1. Apply user-provided or built-in stream operations as shown below:
strm.Filter(func(item rune) bool {
    return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
    return string(item)
}).Batch().Sort()
  1. Collect the result. In this example, the result is collected into an io.Writer which further streams the data into standard output:
strm.Into(collectors.Writer(os.Stdout))
  1. Lastly, open the stream once it is properly composed:
if err := <-strm.Open(); err != nil {
    fmt.Println(err)
    return
}  

Example: streaming from an io.Reader into collector function

The next example shows how to use Automi to stream data from an io.Reader emitting buffered string values from an in-memory source in 50-byte chunks. The data is processed with a Map and Filter opertor methods and the result is sent to a user-provided collector function which prints the result.

func main() {
	data := `"request", "/i/a", "00:11:51:AA", "accepted"
"response", "/i/a/", "00:11:51:AA", "served"
"response", "/i/a", "00:BB:22:DD", "served"...`

 	reader := strings.NewReader(data)
    
	// create stream from a buffered io.Reader emitter,
	// emitting 50-byte chunks.
	stream := stream.New(emitters.Reader(reader).BufferSize(50))
	stream.Map(func(chunk []byte) string {
		str := string(chunk)
		return str
	})
	stream.Filter(func(e string) bool {
		return (strings.Contains(e, `"response"`))
	})
	stream.Into(collectors.Func(func(data interface{}) error {
		e := data.(string)
		fmt.Println(e)
		return nil
	}))

	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See complete example here.

Example: streaming using CSV files

The following example streams data from a CSV source file. Each row is mapped to a custom type, filtered, then mapped to a slice of strings which is then collected into another CSV file.

type scientist struct {
	FirstName string
	LastName  string
	Title     string
	BornYear  int
}

func main() {
    // creates a stream using a CSV emitter
    // emits each row as []string
    stream := stream.New("./data.txt")

    // Map each CSV row, []string, to type scientist
    stream.Map(func(cs []string) scientist {
        yr, _ := strconv.Atoi(cs[3])
        return scientist{
            FirstName: cs[1],
            LastName:  cs[0],
            Title:     cs[2],
            BornYear:  yr,
        }
    })
    stream.Filter(func(cs scientist) bool {
        return (cs.BornYear > 1930)
    })
    stream.Map(func(cs scientist) []string {
        return []string{cs.FirstName, cs.LastName, cs.Title}
    })
    stream.Into("./result.txt")

    // open the stream
    if err := <-stream.Open(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    fmt.Println("wrote result to file result.txt")
}

See complete example here.

Example: streaming HTTP requests and responses

The following example shows how to use Automi to stream and process data using HTTP requests and responses. The following HTTP server program streams data from the request Body, encodes it using base64, and streams the result into the HTTP response:

func main() {

	http.HandleFunc(
		"/",
		func(resp http.ResponseWriter, req *http.Request) {
			resp.Header().Add("Content-Type", "text/html")
			resp.WriteHeader(http.StatusOK)

			strm := stream.New(req.Body)
			strm.Process(func(data []byte) string {
				return base64.StdEncoding.EncodeToString(data)
			}).Into(resp)

			if err := <-strm.Open(); err != nil {
				resp.WriteHeader(http.StatusInternalServerError)
				log.Printf("Stream error: %s", err)
			}
		},
	)

	log.Println("Server listening on :4040")
	http.ListenAndServe(":4040", nil)
}

See complete example here.

Streaming gRPC service payload

The following example shows how to use Automi to stream data items from a gRPC streaming sevice. The following gRPC client setups an Automi emitter to emit time values that are streamed from a gRPC time service:

// setup an Automi emitter function to stream from the gRPC service
func emitStreamFrom(client pb.TimeServiceClient) <-chan []byte {
	source := make(chan []byte)
	timeStream, err := client.GetTimeStream(context.Background(), &pb.TimeRequest{Interval: 3000})
	...
	go func(stream pb.TimeService_GetTimeStreamClient, srcCh chan []byte) {
		defer close(srcCh)
		for {
			t, err := stream.Recv()
			srcCh <- t.Value
		}
	}(timeStream, source)

	return source
}

func main() {
	...
	client := pb.NewTimeServiceClient(conn)
	// create automi stream
	stream := stream.New(emitStreamFrom(client))
	stream.Map(func(item []byte) time.Time {
		secs := int64(binary.BigEndian.Uint64(item))
		return time.Unix(int64(secs), 0)
	})
	stream.Into(collectors.Func(func(item interface{}) error {
		time := item.(time.Time)
		fmt.Println(time)
		return nil
	}))

	// open the stream
	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

}

See complete example here.

More Examples

Examples - View a long list of examples that cover all aspects of using Automi.

Automi components

Automi comes with a set of built-in components to get you started with stream processing including the followings.

Emitters

  • Channel
  • CSV
  • io.Reader
  • io.Scanner
  • Slice

Operators

  • Stream.Filter
  • Stream.Map
  • Stream.FlatMap
  • Stream.Reduce
  • Stream.GroupByKey
  • Stream.GroupByName
  • Stream.GroupByPos
  • Stream.Sort
  • Stream.SortByKey
  • Stream.SortByName
  • Stream.SortByPos
  • Stream.SortWith
  • Stream.Sum
  • Stream.SumByKey
  • Stream.SumByName
  • Stream.SumByPos
  • Stream.SumAllKeys

Collectors

  • CSV
  • Func
  • Null
  • Slice
  • Writer

Licence

Apache 2.0

automi's People

Contributors

functionary avatar mremond avatar vladimirvivien avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

automi's Issues

Concurrency

Is it possible to set concurrency for each stage? If not then that would be a useful feature especially wrt io operations.

Dodo bird dead ?

This looks very nice.
The abstracts are really useful as well as the plugin model.

So did this die due to technical dead end or just other things tempting you ?

Reactivex

Is the plan to implement some/most of the functions from Reactivex? Will you add support to emit multiple items such as Storm does with tuples? How will you handle errors?

Thresholding

Not sure if this is in the roadmap but thresholding would be an awesome add. I would be willing to help implement when the time comes.

Stream.GroupBy Tasks

Add/complete more functionalities for GroupBy

  • Add GroupByInt tests with more scenarios
  • Add support for GroupBy(field-name) / tests
  • Add support for GroupBy(function{})
  • More tests for additional enhancements

nifty overlap!

hi! just wanted to drop a line to note that i'd written something similar to this a while back - https://github.com/sdboyer/transducers-go

i ended up abandoning it, and i suspect that you probably have all the patterns explored in there present in here. but, given the similarities, i figured i'd just drop a link 😸

Tag 0.01

apply API stabilization tag to master.

Create a Null Sink

As the name implies, this sinks simply drops incoming stream data. Useful for testing.

Reorg source code layout

Target layout should be

- api
  + tuple/
  + context/
  - types.go
  - operations.go
+ stream/
- sources/
  + file/
  + db/
  ...
- sinks/
  + file/
  + db/
  ...

Add a slice sink node

A slicesink is a sink node that stores its content into a []interface{} . The slice can safely be accessed once method sink.Open returns.

Create a ChanSource

Create ChanSource which would be a generic processor that uses a channel as a source. This will help in implementing #2.

CSV example

This looks really useful. I have a project at the moment where I need to pull a few CSV files over http, filter out some crap rows, map the data into my structs, and lump them into the dB.

Basically ETL stuff that your example in the read me dies.

Is the example and the data that drives it anywhere ?

Thanks again for this ....

DB Sink

Create a sink that uses a database to save data.

Implement Auxiliary Plan

AuxPlan is to process auxChan stream with a full plan instead of a simple endpoint. This would allow users to create a flow around auxiliary streams.

Remove all dependencies

Reduce the dependency surface of core API to zero. The plan is to keep the core API super lightweight. Sources and Sinks are to be placed in their own repos.

Stream.SumBy()

SumBy(K) should aggregate the sum of keyed or indexed values V where V[K] = []numeric. The operator should collect results in a map[K]float64.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.