Giter Site home page Giter Site logo

streamline's Introduction

streamline go reference Sourcegraph

pipeline codecov Go Report Card benchmarks

Transform and handle your data, line by line.

go get go.bobheadxi.dev/streamline

Overview

streamline offers a variety of primitives to make working with data line by line a breeze:

  • streamline.Stream offers the ability to add hooks that handle an io.Reader line-by-line with (*Stream).Stream, (*Stream).StreamBytes, and other utilities.
  • pipeline.Pipeline offers a way to build pipelines that transform the data in a streamline.Stream, such as cleaning, filtering, mapping, or sampling data.
  • pipe.NewStream offers a way to create a buffered pipe between a writer and a Stream.
    • streamexec.Start uses this to attach a Stream to an exec.Cmd to work with command output.

When working with data streams in Go, you typically get an io.Reader, which is great for arbitrary data - but in many cases, especially when scripting, it's common to either end up with data and outputs that are structured line by line, or want to handle data line by line, for example to send to a structured logging library. You can set up a bufio.Reader or bufio.Scanner to do this, but for cases like exec.Cmd you will also need boilerplate to configure the command and set up pipes, and for additional functionality like transforming, filtering, or sampling output you will need to write your own additional handlers. streamline aims to provide succint ways to do all of the above and more.

Add prefixes to command output

bufio.Scanner streamline/streamexec
func PrefixOutput(cmd *exec.Cmd) error {
    reader, writer := io.Pipe()
    cmd.Stdout = writer
    cmd.Stderr = writer
    if err := cmd.Start(); err != nil {
        return err
    }
    errC := make(chan error)
    go func() {
        err := cmd.Wait()
        writer.Close()
        errC <- err
    }()
    s := bufio.NewScanner(reader)
    for s.Scan() {
        println("PREFIX: ", s.Text())
    }
    if err := s.Err(); err != nil {
        return err
    }
    return <-errC
}
func PrefixOutput(cmd *exec.Cmd) error {
    stream, err := streamexec.Start(cmd)
    if err != nil {
        return err
    }
    return stream.Stream(func(line string) {
        println("PREFIX: ", line)
    })
}

Process JSON on the fly

bufio.Scanner streamline
func GetMessages(r io.Reader) error {
    s := bufio.NewScanner(r)
    for s.Scan() {
        var result bytes.Buffer
        cmd := exec.Command("jq", ".msg")
        cmd.Stdin = bytes.NewReader(s.Bytes())
        cmd.Stdout = &result
        if err := cmd.Run(); err != nil {
            return err
        }
        print(result.String())
    }
    return s.Err()
}
func GetMessages(r io.Reader) error {
    return streamline.New(r).
        WithPipeline(jq.Pipeline(".msg")).
        Stream(func(line string) {
            println(line)
        })
}

Sample noisy output

bufio.Scanner streamline
func PrintEvery10th(r io.Reader) error {
    s := bufio.NewScanner(r)
    var count int
    for s.Scan() {
        count++
        if count%10 != 0 {
            continue
        }
        println(s.Text())
    }
    return s.Err()
}
func PrintEvery10th(r io.Reader) error {
    return streamline.New(r).
        WithPipeline(pipeline.Sample(10)).
        Stream(func(line string) {
            println(line)
        })
}

Transform specific lines

This particular example is a somewhat realistic one - GCP Cloud SQL cannot accept pgdump output that contains certain EXTENSION-related statements, so to pgdump a PostgreSQL database and upload the dump in a bucket for import into Cloud SQL, one must pre-process their dumps to remove offending statements.

bufio.Scanner streamline
var unwanted = []byte("COMMENT ON EXTENSION")

func Upload(pgdump *os.File, dst io.Writer) error {
    s := bufio.NewScanner(pgdump)
    for s.Scan() {
        line := s.Bytes()
        var err error
        if bytes.Contains(line, unwanted) {
            _, err = dst.Write(
                // comment out this line
                append([]byte("-- "), line...))
        } else {
            _, err = dst.Write(line)
        }
        if err != nil {
            return err
        }
    }
    return s.Err()
}
var unwanted = []byte("COMMENT ON EXTENSION")

func Upload(pgdump *os.File, dst io.Writer) error {
    _, err := streamline.New(pgdump).
        WithPipeline(pipeline.Map(func(line []byte) []byte {
            if bytes.Contains(line, unwanted) {
                // comment out this line
                return append([]byte("-- "), line...)
            }
            return line
        })).
        WriteTo(dst)
    return err
}

Background

Some of the ideas in this package started in sourcegraph/run, which started as a project trying to build utilities that made it easier to write bash-esque scripts using Go - namely being able to do things you would often to in scripts such as grepping and iterating over lines. streamline generalizes on the ideas used in sourcegraph/run for working with command output to work on arbitrary inputs, and sourcegraph/run now uses streamline internally.

streamline's People

Contributors

bobheadxi avatar

Stargazers

 avatar  avatar Waleed AlMalki avatar Asdine El Hrychy avatar Jean-Hadrien Chabran avatar Sean Gosiaco avatar Michael Lin avatar Yuvraj avatar Ellen Liu avatar

Watchers

 avatar

streamline's Issues

bufio: buffer full

Hey @bobheadxi as I was working with release tooling which uses sourcegraph/run, I stumbled across a bufio: buffer full error.

Here is a test case to reproduce the error:

// stream_test.go
func TestPOF(t *testing.T) {
	newStream := func() *streamline.Stream {
		veryLongLine := "long"
		for i := 0; i < 1000; i++ {
			veryLongLine += "loooooooong"
		}
		return streamline.New(strings.NewReader(veryLongLine))
	}

	s := newStream()
	err := s.Stream(func(line string) {
	})
	if err != nil {
		t.Log(err.Error())
		t.Fail()
	}
}

While the example here is a bit exaggerated, I got the same error while streaming the response of a curl to an API that sends back JSON, so my original case is rather legitimate.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.