Giter Site home page Giter Site logo

vonnegut's Introduction

Erleans

CircleCIcodecov

Erleans is a framework for building distributed applications in Erlang and Elixir based on Microsoft Orleans.

Requirements

Rebar3 3.13.0 or above or Elixir 1.9+. Easiest way to get the latest Rebar3:

$ rebar3 local upgrade
...
$ export PATH=~/.cache/rebar3/bin:$PATH

Components

Grains

Stateful grains are backed by persistent storage and referenced by a primary key set by the grain. An activation of a grain is a single Erlang process in on an Erlang node (silo) in an Erlang cluster. Activation placement is handled by Erleans and communication is over standard Erlang distribution. If a grain is sent a message and does not have a current activation one is spawned.

Grain state is persisted through a database provider with an always increasing change id or etag. If the change id or etag has been by another activation the activation attempting to save state will stop.

Activations are registered through lasp_pg.

Stateless Grains

Stateless grains have no restriction on the number of activations and do not persist state to a database.

Stateless grain activations are pooled through sbroker while being counted by a gproc resource counter. This allows for the use of sbroker to select an activation if available and to create a new activation if none were available immediately and the number currently activated is less than the max allowed.

Reminders (TODO)

Timers that are associated with a grain, meaning if a grain is not active but a reminder for that grain ticks the grain is activated at that time and the reminder is delivered.

Observers (TODO)

Processes can subscribe to grains to receive notifications for grain specific events. If a grain supports observers a group is created through lasp_pg that observers are added to and to which notifications are sent.

Providers

Interface that must be implemented for any persistent store to be used for grains.

Streams have a provider type as well for providing a pluggable stream layer.

Differences from gen_server

No starting or linking, a grain is activated when it is sent a request if an activation is not currently running.

Grain Placement

  • prefer_local: If an activation does not exist this causes the new activation to be on the same node making the request.
  • random: Picks a random node to create any new activation of a grain.
  • stateless: Stateless grains are always local. If no local activation to the request exists one is created up to a default maximum value.
  • {stateless, Max :: integer()}: Allows for up to Max number of activations for a grain to exist per node. A new activation, up until Max exist on the node, will be created for a request if an existing activation is not currently busy.

Erlang Example

The grain implementation test_grain is found in test/:

-module(test_grain).

-behaviour(erleans_grain).

...

placement() ->
    prefer_local.

provider() ->
    in_memory.

deactivated_counter(Ref) ->
    erleans_grain:call(Ref, deactivated_counter).

activated_counter(Ref) ->
    erleans_grain:call(Ref, activated_counter).

node(Ref) ->
    erleans_grain:call(Ref, node).

state(_) ->
    #{activated_counter => 0,
      deactivated_counter => 0}.

activate(_, State=#{activated_counter := Counter}) ->
    {ok, State#{activated_counter => Counter+1}, #{}}.
$ rebar3 as test shell
...
> Grain1 = erleans:get_grain(test_grain, <<"grain1">>).
> test_grain:activated_counter(Grain1).
{ok, 1}

Elixir Example

defmodule ErleansElixirExample do
  use Erleans.Grain,
    placement: :prefer_local,
    provider: :postgres,
    state: %{:counter => 0}

  def get(ref) do
    :erleans_grain.call(ref, :get)
  end

  def increment(ref) do
    :erleans_grain.cast(ref, :increment)
  end

  def handle_call(:get, from, state = %{:counter => counter}) do
    {:ok, state, [{:reply, from, counter}]}
  end

  def handle_cast(:increment, state = %{:counter => counter}) do
    new_state = %{state | :counter => counter + 1}
    {:ok, new_state, [:save_state]}
  end
end
$ mix deps.get
$ mix compile
$ iex --sname a@localhost -S mix

iex(a@localhost)1> ref = Erleans.get_grain(ErleansElixirExample, "somename")
...
iex(a@localhost)2> ErleansElixirExample.get(ref)
0
iex(a@localhost)3> ErleansElixirExample.increment(ref)
:ok
iex(a@localhost)4> ErleansElixirExample.get(ref)
1

Failure Semantics

Contributing

Running Tests

Because the tests rely on certain configurations of apps it is easiest to run the tests with the alias test:

$ epmd -daemon
$ rebar3 test

Note that the distributed tests will only work on a node with hostname fanon. This will be easily avoidable in the next release of OTP but until need to figure out another work around. Running rebar3 ci will run all the tests but dist tests.

vonnegut's People

Contributors

evanmcc avatar getong avatar tsloughter avatar varnerac avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vonnegut's Issues

Error codes

Implement use of error codes related to the support kafka features.

add operational tooling

It'd be nice to add a module that can be accessed from the console or an external cli program that can do simple operational things. Current ideas:

  • delete topic
  • describe topic
  • restart topic
  • lists running topics
  • regenerate topic index
  • freeze topic data and move it aside so ingest can continue and old data can be recovered manually
  • validate topic data files
  • easily tail a topic

grafana dashboard

There are prometheus metrics being exported. We need a dashboard for viewing them.

metadata mix up

In #58, I didn't understand how multi-chain topic creation was supposed to work, and messed up the ensure_topic api. When we get a return from ensure_topic we need to check the chain included in the metadata response and make sure that it's the current chain. Otherwise we need to reconnect. This will simplify the somewhat tangled logic currently in place, since I didn't realize that the chain manager could return another chain than the one that we're on, or create the chain non-locally.

terminate after

Right now active segments only hibernate after a period of no messages, we should terminate them after a longer period of no messages to close the open files.

Likely change the hibernate to a much lower timeout and have terminate at a couple minutes.

write a PropEr test exercising topic file length wrapping

In the cleanup test, we do:

    application:set_env(vonnegut, segment_bytes, 86),
    application:set_env(vonnegut, index_max_bytes, 18),
    application:set_env(vonnegut, index_interval_bytes, 24),

this has shaken out some bugs elsewhere. It would be nice to have a property based test that mutates these values, then writes some moderate number of messages (large enough to wrap the topic into several files, but not enough to take forever), then reads various ranges to make sure that all of the messages are being read correctly.

For proper random range checking, it might be nice to add an upper offset limit in addition to the maximum bytes to be read, but if that's not possible in the protocol, we can keep message lengths around to calculate maximum bytes.

Timestamp?

Kafka 0.10 added a timestamp to records. Could at least be useful to provide for metrics.

remove log_segment processes

The only process for a topic log segment should be the active log segment.

We need another, but still efficient way, of finding all existing log segments that match a search (all >= some offset) and looking up offsets inside them.

reconnect on 131/129

Currently we don't do this, and in the situation where kubernetes restarts with the nodes switched while the pools are attempting to reconnect, we can end up talking to the wrong node and stuck. The client should restart all pools and start over when it receives these errors, so the client program can proceed without a restart.

Fetch with limit request

We should support a fetch that does 2 index lookups (offset and offset+limit) to find the section of a file to return.

I think this should work both from the Erlang interface, fetch_with_limit/3, and be an extension to the kafka client interface. Maybe a -define(FETCH_WITH_LIMIT_REQUEST, 1001). and instead of MinBytes replace it with a limit.

Data integrity

in #100 I added a "data validation" task, but I'm breaking it out into a bigger issue here (#23 is also related) to capture my current thinking on data consistency and recovery from disk corruption or loss. I think that we should skip data validation for now, but have some specific recommendations at the end.

Some things to think about:

  • chain tail has corrupted data for an offset, but other chain parts don't, which blocks reads of that offset, but could be corrected.
  • non-tail node has corrupted data, which doesn't block reads, but silently reduces replication for long term storage.
  • a node comes up clean and must recover (we need to do this ASAP, but I think that it's not bad to write with existing primitives, as soon as we add a 'verify CRC mode' to the client).
  • nodes may have different index and log size settings, making bulk comparison difficult or impossible. that is, we can't simply md5 the whole file, which is fast, but must scan and compare per-value (and possibly across the whole chain) which can make things slow.
  • nodes may be at different stages in garbage collection, making bulk comparison difficult or impossible, same thing as before. Strongly consistent metadata could help here.
  • it's not clear to me where the right place to check the crc is. checking on the server is expensive and centralized, but checking at the client means that the error is detected far away from the source, which makes fixing it sometimes unclear.

Plan of action?

Things to do:

  • clean node recovery
  • single offset repair
  • AAE

clean node recovery is a little bit of a project, but is fairly pressing if we ever plan to grow chains. it might be possible to sidestep by never growing chains and draining them to bigger chains instead, but for disaster recovery, we should have this. I think that fully specifying the project is outside of the scope of this issue, but the primary issues are: detecting the need, how to report to clients while we're repairing, and how we decide on what the proper value to use for repair is.

simple single offset repair would be nice to have for a "read-repair" type issue, but I think an initial version could be designed to be triggered manually. we also need to decide on how to chose the right value, as above.

AAE is a big feature but important over the long run. standard merkle-tree implementations seems work well enough. keeping them from thrashing the cache or CPU too badly is the primary concern here, I think.

I don't know how to prioritize these. recovery can be done semi-manually by copying files and then letting write repair catch up the chain. single offset repair similarly, and it's not clear whether there is demand for AAE. does the data that we care about live long enough? or are people mostly running out their retention window before data corruption is likely?

Check vg_log_segments:last_in_index/3

I don't know if this is actually working... I was just playing around in the shell and it was always return {error, badarg}, which would mean these always just pass through to {0,0}.

               case file:pread(Index, {eof, 6}, 6) of
                    {ok, <<Offset:24/signed, Position:24/signed>>} ->
                        {Offset, Position};
                    _ ->
                        {0, 0}
                end

verify that multi-fetch properly treats error and non-error cases

It's possible for partial failures to happen on multifetch. I'm fairly certain that the existing code converts this to total failure, but we should accept what we get without error to reduce re-fetching. This will need some subtle testing, so punting on it briefly.

Design document

Add a doc/design.md document detailing how we plan to implement:

  • topic lookup for reads and writes
  • replication
  • chain assignment (which nodes host what links in a chain)
  • chain consensus
  • chain monitoring
  • guarantees
  • configuration

Send acks to previous node

On some interval each node must send an ack with the latest id it has received and written to the previous node in the chain. Needs to also implement the api for receiving the acks which will remove from the pending writes ets table for the topic.

node roles need to be enforced

Currently, fetch and produce requests don't check if the current node is the head or the tail of the chain before responding. To maintain the the invariants of chain replication, non-head nodes must reject writes, and non-tail nodes must reject reads.

For our use-case the read invariant can be relaxed by passing the tail's last acked ID back up the chain, then anything can follow the log from any node, albeit at a delay. This adds some complexity to the read pathway, which currently is just a sendfile of the rest of the file. We'll need to calculate what's valid to send if we decide to do this.

Checksum <<MagicByte:8, Attributes:8, Key/Value>>

The checksum sent from the client should be of <<MagicByte:8, Attributes:8, Key/Value>> not just the value. This is a little odd for the direct Erlang client to send, so maybe we have the server be able to tell if it is getting a full checksum or just a checksum of the record value.

current sendfile fetch is racy

Since logfile reads and writes are happening on different processes, we need to make sure that we aren't sending any partially written results. Instead of using filelib:file_size/1, we should use the index to calculate the correct number of bytes to send.

Topic migration

Adding chains when using one of the strategies in #104 means support for migrating a topic to another chain will be necessary.

consider AAE

For long-retention files, we should add functionality to either the cleaner or an AAE process that compares the checksums of older log files and makes sure that they match, and updates corrupted files to match.

Verify checksum

The server does not verify that a checksum from the client for a record matches the record. We should compare with one generated on the server and error if they don't match.

Keep checksum from produce request

I just remembered an important point @slfritchie made to me when we spoke :). We need to be checksumming the messages on the client side and using that for the storage.

Right now the Erlang disterl client does not checksum when sending and partially because of this we also do not keep the checksum from the kafkaesque client around on the server but instead reproduce it on the server later.

The disterl client needs to add it as a requirement to be included and the decoded produce request in vg_protocol needs to keep it around when returning the record set. Only the id should have to be calculated by the server process.

Track node role and next in line

Each topic process needs to know who to replicate their writes to and each server process needs to know the role it has when receiving requests for reads or writes.

Partitions and chain layout

Currently partitions are basically not used. Everything is hard-coded to only have 1 partition, 0.

With the way topics are laid out across chains through sorting it occurred to me there is no good way to support partitions as simply suffixes on the topic, since they would all be bunched together on the same chain.

Is there a way to support partitions in such a way that they will be spread apart? If not, we likely should make partitions confined to the kafka protocol and remove all other current usage.

Node recovery

A node recovering from a crash will no longer have the pending writes table, it will have to get the latest written id from the next node in the chain and repopulate its pending writes table and resend the the writes.

Cache index

Currently each lookup in the index requires reading in the index from disk to do the search (even those running in parallel do not share). Caching the read-only index for a topic to share between requests and releasing it after some period of not being accessed would be a better option.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.