Giter Site home page Giter Site logo

grailbio / bigslice Goto Github PK

View Code? Open in Web Editor NEW
547.0 547.0 35.0 2.72 MB

A serverless cluster computing system for the Go programming language

Home Page: https://bigslice.io/

License: Apache License 2.0

Go 100.00%
bigdata cluster computing etl go golang machinelearning mapreduce

bigslice's People

Contributors

anirban6908 avatar awiss avatar cosnicolaou avatar dependabot[bot] avatar dvrkps avatar grailbio-bot avatar jcharum avatar josh-newman avatar mariusae avatar psampaz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bigslice's Issues

Support heterogeneous clusters

Support clusters composed of different (EC2) instance types. This is useful when dealing with occasional instance scarcity. We've seen conditions in which it is difficult to launch a m4.4xlarge, but it's possible to launch a m4.2xlarge.

Perhaps we could allow users to specify an ordered list of instance types to try.

Proposal: memory reclamation

Today Bigslice does not implement explicit memory management of task outputs and state. Instead it serializes task output to disk and relies on the operating system's page caching system to keep the output in memory. Memory associated with intermediate state is managed by a set of heuristics (e.g., canary sizes, spill thresholds, etc.)

We propose to introduce explicit memory management into Bigslice. This will accomplish two objectives: (1) it will help us keep un-serialized task output in memory, so that follow-on tasks can be run without an extra decode; (2) operations like cogroup and reduce can be made more efficient as they can use more memory, relying on actual runtime memory footprints instead of heuristics.

How can we reclaim memory?

Since Bigslice calls into user code, it cannot control memory allocation in a fine-grained way. (Contrast with, e.g., a query processing engine with limited data types and operations.)

Instead, we can treat the Bigslice program as a black box that allocates and frees memory. We can monitor memory usage and apply a simple control regime to reclaim memory and avoid OOMing.

We have at least three mechanisms for reducing memory footprint in such a black box: spilling outputs to disk, and pausing task processing, or aborting task processing.

Abstractly, a controller could define a set of watermarks and apply reclamation actions with increasing aggressiveness until memory utilization is driven below a low watermark. One such scheme defines two watermarks: a low watermark, and a high watermark. If the process exceeds the high watermark, it pauses task processing and begins spilling outputs to disk. Outputs are spilled in order of their recency. Once memory utilization decreases to below the low watermark, spilling stops. If the low watermark is not reached after all available outputs have been spilled, the worker can begin to abort tasks until memory utilization dips below the low watermark. If the low watermark has not been reached still, then either (1) there is a space leak in Bigslice itself, or (2) user code has allocated memory associated with global references that cannot be cleared by Bigslice. These conditions can both be treated as OOM conditions.

A second controller could adjust the job's load factor based on the rate of aborted tasks, so that future tasks are less likely to be aborted.

APIs for reclamation

Bigslice could support reclamation through an API that lets various components register reclaimable objects in a central registry.

type Reclaimable interface {
	// Reclaim reclaims the resource represented by
	// this object.
	Reclaim() error
	
	// This type could include methods to indicate priority,
	// cost, etc.
}

// Register registers the a reclaimable object.
func (*Reclaimer) Register(r Reclaimable)

// Reclaim reclaims the next object in the reclamation 
// queue. Returns true when a reclamation was performed,
// or an error if the reclamation failed.
func (*Reclaimer) Reclaim() (ok bool, err error)

Prior work

An earlier change implemented memory reclamation for combiners.

bigslice run: command-line arguments parsing bug

When I run the example from the docs:

GO111MODULE=on bigslice run shake.go -local

it returns:

bigslice: exec: "-local": executable file not found in $PATH

Added some logging to trace the problem, it turns out that bigslicecmd.Run receives only [-local] as args. There seems to be a bug in cmd/bigslice/run.go.

Proposal: streaming, incremental computing

This issue describes a design for adding streaming, incremental computing to Bigslice.

Bigslice provides a useful model for performing batch computing, but many uses of Bigslice could also benefit from streaming and incremental computing: some datasets are unbounded (e.g., the output of a web crawler), and others could benefit from using incremental computing as a checkpoint mechanism (for very large datasets). Incremental and streaming computing also provides a mechanism to overlap compute with I/O: for example, data processing can begin while sequencer outputs are still being uploaded. Streaming computing is also useful for monitoring purposes: for example, to highlight QC concerns at a very early stage.

The primitives in Bigslice were designed with incremental computation in mind; we aim to add incremental and streaming computation to Bigslice with few changes to its core.

The main idea is to endow Bigslice tasks with temporal sharding: as well as sharding by data ("space"), we also shard by discrete intervals of time ("epochs"). By so doing, we can effectively partition unbounded inputs in a user-controlled manner (e.g., windowing in fixed-time intervals, or windowing by data sizes), while providing incremental and streaming computing semantics on top of Bigslice with minimal changes to its core runtime semantics.

Computing tasks with time sharding requires few changes to the core APIs in Bigslice. We modify Slice to account for temporal sharding directly, and Reader is modified to provide a checkpointing mechanism, discussed further below.

type Slice interface {
	// (The rest of interface Slice is identical.)

	// NumEpoch returns the number of epochs of data provided
	// by this Slice operation.
	NumEpoch() int
	
	// Reader returns a Reader for a shard and epoch of this Slice. The
	// reader itself computes the shard's values on demand. The caller
	// must provide Readers for all of this shard's dependencies,
	// constructed according to the dependency type (see Dep). The
	// dependencies provided are for the same shard and epoch.
	Reader(shard, epoch int, deps []sliceio.Reader) sliceio.Reader
}

With these modifications, we can subtly alter the execution model so that incremental, streaming computing is treated as a superset of Bigslice's current batch-oriented model. When a Reader is instantiated from a Slice, it is provided with readers to its dependencies that represent the same epoch. Incremental computation requires no additional changes to the operators; streaming computing requires that some operators emit data only for those records that would be emitted for that epoch. For example, a streaming join emits records only for those keys present in the current epoch, while an incremental join must emit all records. Likewise, a windowed reduce must store enough state to maintain a window of values for which values accumulated in the window are emitted.

We now discuss the implications of this model for the Bigslice runtime. First, since some operators are stateful, they must be able to checkpoint their state. We provide this by allowing a sliceio.Reader to implement an interface that lets it persist state to an underlying storage mechanism state managed by Bigslice:

// Checkpointer can be implemented by stateful sliceio.Readers if 
// it wishes for the runtime to maintain state.
type Checkpointer interface {
	// Save checkpoints the state of this reader to the provided
	// Storage.
	Save(Storage) error

	// Restore restores the state of this reader from the provided
	// storage, as checkpointed by Save.
	Restore(Storage) error
}

For example, a streaming join may use this interface to persist a mapio table to perform streaming joins. Checkpoints are managed by epoch: they are always restored before processing an epoch, and saved on completion.

Time sharding changes the meaning of a task's output. It is now indexed on (task, epoch, shard) instead of just (task, shard). The task state is also changed to include its current epoch (starting at 0). A task at epoch E indicates that outputs for epochs e<E are available; the output for epoch E is available only if the task state is TaskOk.

Finally, task scheduling must change to accomodate time sharding. The basics of Bigslice's current task scheduling does not change: tasks maintain a single state, and their outputs are tracked by the executor. (Though the outputs are indexed by epoch as well, as described above.)

When evaluating a task graph, we also maintain a target epoch E: the role of task evaluation is to get the frontier tasks to an epoch e>=E. For simplicity, we assume that graph evaluation is monotonic in epochs: we don't go back in time, though there's nothing in the model that would prohibit this, and it could be implemented later. It is also possible to have overlapping evaluations of multiple epochs; this is supported by simply running multiple evaluations concurrently with different target epochs. Thus, task evaluation proceeds as it does today, but nodes are considered ready only if its dependency nodes are at epoch e>=E. Evaluation of a task is performed step-wise by epoch: a task at epoch e is evaluated to epoch e+1. This is also a restriction that can be removed later.

Note that this model also allows seamless integration of batch computing with streaming computing: A batch computing Slice would set NumEpoch() to 1; its dataset would be computed once and be seamlessly integrated with streaming Slices.

Address pain points from latest onboarding

We had a few recent new Bigslice users. This is an umbrella issue that covers several small things we're going to do to try to address some of the pain and confusion they experienced.

  • Make the requirement for fat binaries when we have driver-worker architecture mismatch more prominent.
  • Add OS X to our bigslice and bigmachine CI (just to make sure if folks' first touch is on OS X local execution, it all will work). #98 for Bigslice; grailbio/bigmachine#50 for Bigmachine, as part of migration to GitHub Actions.
  • Make clear that we only currently support x86/Linux workers. #106
  • Add documentation of usage of Err() to Scan(), so that it’s harder to miss. #101
  • Create a codewalk that makes this (and other parts of a starter program) more clear upon first encounter.
  • Add an example test of a simple user-created slice operation. #109

Metrics should follow data-flow

In the current implementation of metrics (#18), metrics are collected and aggregated from all tasks involved at the completion of a session.Run. This has a few drawbacks. First, all tasks may not be available at the end of session.Run (e.g., due to failed machines). Second, it ties stats collection to evaluation: we may in the future, persist task outputs to, e.g., s3, and clear task outputs from individual machines.

The solution to this is clear: we can flow Scopes along with task outputs; the executor can then aggregate them as the data flows.

a question on sharding..

I have a large input set in a dynamodb table that I'd like to dump it to a sharded set of output files that support lookups (grailbio/recordio is the underlying format). I'd like to mod shard the original keys so that given a key I know which file to look the key up in. I have a writerfuncs set up to match the number of input shards and ideally I'd like bigslice to invoke the write func for shard n with all and only the mod-sharded values of my input keys. Is there any way to do this? I don't see any guaranteed way of doing so? My alternative of course is to have each writerfunc be able to write all of the sharded output files, but that seems contrary to the writerfunc model.

I should say that Reshuffle doesn't appear to work as I would expect it to, which would be drive all keys with the same value to the same shard.

Thanks!

Support cluster shrinking

For workloads that intersperse Bigslice computations with other long-running computations, there could be significant cost savings by shrinking or eliminating worker machines in between the Bigslice computations.

It's possible to workaround this today by spawning different Bigslice computations, but that is clunky. You have to deal with spawning processes and possibly serializing results.

This may be made much simpler depending on an implementation of #99. If use of a backing store were a prerequisite for shrinking, we could shut down machines with impunity.

Support for Rocks (SGE) backend

I am interested in how this could be extended to support other HPC backends. In particular, a Rocks-based system, a common bioinformatics HPC platform. I have very briefly looked (some) of the code. Do you have documentation / guidelines / pointers-to-relevant-source about how to implement something like this? I would be interested in working on this kind of extension.

Thanks,
Glen

PS> Whoa! Sorry, I forgot to say how awesome this is! :-)

Provide user-controlled sharding

As outlined in #16, it's often useful to extend fine-grained control of sharding to the user. It can be solved by wrapping integers with an identity hash function, but that seems less than ideal. It might be useful to provide this functionality as part of bigslice.Reshuffle.

Support different instance types per computation

Support different instance types per computation. It's sometimes the case that users want different instance types for different invocations. For example, one computation may require instances with GPUs. It would be useful to be able to specify instance type per computation.

We could potentially do this by allowing a customization of Func with a different configuration, plumbed through Bigmachine.

Support shared backing store

Bigslice workers currently store their task outputs locally. These stored outputs may then be read by other workers when needed by direct connections between machines.

When machines are especially flaky, e.g. high spot market contention in EC2, progress on a computation can grind to a halt, as machines are lost frequently enough that a large portion of time is spent recomputing lost results.

Workers could instead write to a more durable shared backing store. If workers are lost, their results would remain available. This would allow computations to always make forward progress at the cost of extra (read: slow) data transfer.

There is already a nod to implementation in the code. There's work to be done to plumb it through.

Amazon FSX for Lustre may be a good option, as it's basically designed for this sort of use case:

The open source Lustre file system is designed for applications that require fast storage – where you want your storage to keep up with your compute. Lustre was built to quickly and cost effectively process the fastest-growing data sets in the world, and it’s the most widely used file system for the 500 fastest computers in the world. It provides sub-millisecond latencies, up to hundreds of gigabytes per second of throughput, and millions of IOPS.

We could also implement something like asynchronous copy to a shared backing store, first preferring worker-worker transfer but falling back to the shared backing store if the machine is no longer available.

It would be good to benchmark various approaches.

Checksum mismatch on github.com/grailbio/[email protected]

Steps to reproduce:

$ git clone https://github.com/grailbio/bigslice
$ cd bigslice
$ go test

Output of go test::

go: downloading github.com/grailbio/testutil v0.0.1
verifying github.com/grailbio/[email protected]: checksum mismatch
	downloaded: h1:s6IeIZsZHQZXcUnmEKqz22cSn05QsTH5AwHnrxMRKEs=
	go.sum:     h1:RzGxJO5krJooQGu7pOOgA7RdrwF9L+PTGEIuO3O/M0g=

SECURITY ERROR
This download does NOT match an earlier download recorded in go.sum.
The bits may have been replaced on the origin server, or an attacker may
have intercepted the download attempt.

For more information, see 'go help module-auth'.

Go version:

go version go1.13 linux/amd64

go env:

GO111MODULE=""
GOARCH="amd64"
GOBIN=""
GOCACHE="/home/ps/.cache/go-build"
GOENV="/home/ps/.config/go/env"
GOEXE=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="linux"
GONOPROXY=""
GONOSUMDB=""
GOOS="linux"
GOPATH="/home/ps/go"
GOPRIVATE=""
GOPROXY="https://proxy.golang.org,direct"
GOROOT="/usr/local/go"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/usr/local/go/pkg/tool/linux_amd64"
GCCGO="gccgo"
AR="ar"
CC="gcc"
CXX="g++"
CGO_ENABLED="1"
GOMOD="/home/ps/Code/bigslice/go.mod"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -pthread -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build504681712=/tmp/go-build -gno-record-gcc-switches"

exec: add some form of log.Flush.Sync mechanism

As called out in PR #21, it appears that the last few lines of log output can be lost when an error is reported to the master. The exact reason is not clear, but adding a sleep worker.Init helps. Investigate this and provide a more robust mechanism to ensure that log lines are not lost.

Filter and counts, maybe consider mapreduce counters

I've attempted to use Filter in two real world examples now but for one I had compromise on my stats reporting and for the other I had to back out of using it. I need to report the # of original inputs and the # post filtering which doesn't seem to be easily possible. I can use Scan or some other side-effect support operation, but then I need to make the side effect work across multiple machines which is annoying and expensive for something as simple as a count. Google's mapreduce offered counters which could be used for this task as this; this mechanism though seems overly general and was extensively abused so I'm not necessarily advocating for it. Given bigslice's ability to carry results through the graph via the slices themselves, maybe it makes sense to add stats to the core slice structure to report on progress through the graph - the simplest being # invocations per operation, but more detailed/extensive ones also being possible.

Is there a way to distinguish the master from workers?

I have a use-case where I need to run logic on the master before I kick off the bigslice session, which then goes and runs the workers. The specific use-case is that I have a single binary that must capture and dump a state file (in this case Kafka offsets) before the bigslice session, for correctness.

Is there a simple way to distinguish the master from the workers? I've looked through the various bigslice packages, and haven't found anything. Ideally I'd have something like this:

package main

func main() {
    if bigslice.IsMaster() {
        // Do master-only stuff, before running the bigslice session
    }

    fn := bigslice.Func(...)
    exec.Must(...)
}

ReaderFunc for multiple files..

I would expect a ReaderFunc created with 2 shards, one for each of two files, to read the two files concurrently. However, it seems that they are read sequentially, at least when run in local mode.

slice = bigslice.ReaderFunc(len(files), NewReaderFunc(files))
....

where NewReaderFunc looks like:

func NewReaderFunc(filenames []string) interface{} {
	type state struct {}
	return func(shard int, state *state, entities []wikientities.Entity) (n int, err error) {
                ....
		if state.Scanner == nil {
		....
			fmt.Printf("processing: shard: %v file: %v\n", shard, filenames[shard])

		}
.....

is this expected?

Proposal: row streaming

This issue describes a design for implementing row-level streaming in Bigslice.

Bigslice streams all of its data during processing in micro-batches. For example, a bigslice.Map operator is given a vector of values over which to apply the operation, and fills a downstream vector of processed values.

Operators like bigslice.Flatmap, which can emit multiple rows for each input row, implement internal buffering if the user-returned data exceeds the requested chunk sizes. Likewise, bigslice.Cogroup, which implements data joining, treats joined data as a single row—i.e., each row contains within it all rows that match the join criteria.

This is limiting: namely, we cannot perform joins where the number of rows for a key exceeds what can fit in memory. (This is exacerbated by the fact that Bigslice itself operates on batches of rows.) We also cannot perform flat-map operations where the result of a computation for a single row does not fit in memory.

In other words, streaming in Bigslice is only “up to” rows.

We propose to repair this by:

  1. providing a generalized data processing function that permits streaming;
  2. changing the underlying data chunking to allow for streaming.

Generalized streaming operator

We propose a generalized streaming operator: Each. Each is invoked for each row of a slice. Its inputs are the slice columns. The last (optional) argument of each is an “emit” function that produces rows downstream. For example, the following acts as a combined map and filter: it filters out odd values, and emits the column “even” to each row.

bigslice.Each(slice, func(value int, emit func(string, int)) {
	if value%2 == 0 {
		emit(“even”, value)
	}
})

Functions passed to Each can also accept streams. These are functions that act as scanners to underlying streams of values.
For example, if we co-grouped the above slice, we could access the stream of values as follows, emitting their sum, producing a slice of ints.

bigslice.Each(slice, func(key string, values func(*int) bool, emit func(int)) {
	var sum, datum int
	for values(&datum) {
		sum += datum
	}
	emit(sum)
})

Note that this is a streaming equivalent of

bigslice.Map(slice, func(key string, values []int) int {
	var sum int
	for _, datum := range values {
		sum += datum
	}
	return sum
})

For complex co-groups, each co-grouped slice corresponds to a single streaming reader. For example:

slice1 := // Slice<int, string, string>
slice2 := // Slice<int, float64, int>
slice := bigslice.Cogroup(slice1, slice2)
slice = bigslice.Each(slice, func(key int, first func(*string, *string) bool, second func(*float64, *int) bool, emit func(…)) { … })

Streaming readers, data layout

The above API can be implemented using the current reader APIs, but the implementation would have to buffer (and perhaps spill to disk). This can be addressed by introducing nested readers: a sliceio.Reader would be amended to support sub-streams:

type Reader {
    // ...
    Child(index int) Reader
}

the new Child method would return a reader to an indexed sub-stream. The physical layout of output files would remain as chunks of rows, but would be prefixed by the stream index. Any stream reference must occur after the full sub-stream has been materialized. This allows a reader to, for example, spill a sub-stream to disk, and re-read it on demand.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.