Giter Site home page Giter Site logo

commanded / eventstore Goto Github PK

View Code? Open in Web Editor NEW
1.0K 23.0 140.0 1.67 MB

Event store using PostgreSQL for persistence

License: MIT License

Elixir 96.31% PLpgSQL 2.19% HTML 1.49%
cqrs eventstore elixir event-sourcing postgresql database cqrs-es commanded

eventstore's Introduction

Commanded

Use Commanded to build your own Elixir applications following the CQRS/ES pattern.

Provides support for:

  • Command registration and dispatch.
  • Hosting and delegation to aggregates.
  • Event handling.
  • Long running process managers.

Commanded provides a solid technical foundation for you to build on. It allows you to focus on modelling your domain, the most important part of your app, creating a better application at a faster pace.

You can use Commanded with one of the following event stores for persistence:

Please refer to the CHANGELOG for features, bug fixes, and any upgrade advice included for each release.

Requires Erlang/OTP v21.0 and Elixir v1.11 or later.


Sponsors

Alembic


MIT License

Build Status Join the chat at https://gitter.im/commanded/Lobby


This README and the following guides follow the master branch which may not be the currently published version.

Read the documentation for the latest published version of Commanded on Hex.

Overview


Used in production?

Yes, see the companies using Commanded.

Example application

Conduit is an open source, example Phoenix 1.3 web application implementing the CQRS/ES pattern in Elixir. It was built to demonstrate the implementation of Commanded in an Elixir application for the Building Conduit book.

Learn Commanded in 20 minutes

Watch Bernardo Amorim introduce CQRS and event sourcing at Code Beam SF 2018. Including a tutorial on how to implement an Elixir application using these concepts with Commanded.

Contributing

Pull requests to contribute new or improved features, and extend documentation are most welcome.

Please follow the existing coding conventions, or refer to the Elixir style guide.

You should include unit tests to cover any changes. Run mix test to execute the test suite.

Contributors

Commanded exists thanks to the following people who have contributed.

Need help?

Please open an issue if you encounter a problem, or need assistance. You can also seek help in the #commanded channel in the official Elixir Slack.

eventstore's People

Contributors

astery avatar bamorim avatar cdegroot avatar christianjgreen avatar datafoo avatar derekkraan avatar dvic avatar esse avatar ftes avatar jamescheuk91 avatar jsmestad avatar kaikuchn avatar kazw avatar kianmeng avatar maedhr avatar mryawe avatar nicholasjhenry avatar olemchls avatar ryoung786 avatar scudelletti avatar silvadanilo avatar slashdotdash avatar sroze avatar stuartc avatar tcoopman avatar vasspilka avatar victorolinasc avatar voughtdq avatar yamilquery avatar yordis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventstore's Issues

Allow consumer to specify an event and payload serializer

Serialization of an event's payload and headers is handled by the consumer of the eventstore.

The serializer should be configured so that it can be used by the eventstore to return deserialized events to any subscriber or reader.

Setting custom metadata on events

I'm using Eventstore with Commanded. I want to be able to set custom metadata on an event (the user who created it, etc). Is there anyway to do this?

I've had a bit of a look and it looks like Aggregate.persist_events doesn't pass in any metadata to Mapper.map_to_event_data.

Thanks

Ack handled events in subscribers

A subscription should manually acknowledge received events.

The existing single and all stream subscriptions ack events once they have been sent to their subscriber. This doesn't guarantee that they have either been received or successfully processed.

Requiring the subscriber to confirm receipt ensures that no events are missed.

defmodule EventStore.Subscriber do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, :ok, [])
  end

  def init(:ok) do
    {:ok, []}
  end

  def handle_info({:events, events, subscription} = message, state) do
    # confirm receipt of events
    send(subscription, {:ack, List.last(events).event_id})

    {:noreply, state ++ events}
  end
end

No longer runs on Elixir 1.4

The latest version uses the child_spec methods, which are a feature of Elixir 1.5. Causes this error:

** (Mix) Could not start application eventstore: EventStore.Application.start(:normal, []) returned an error: an exception was
 raised:                                                       
    ** (UndefinedFunctionError) function Supervisor.child_spec/2 is undefined or private                                      
        (elixir) Supervisor.child_spec({Registry, [keys: :unique, name: EventStore.Subscriptions.Subscription]}, [id: :event_store_subscriptions])                                                                                                                  (eventstore) lib/event_store/supervisor.ex:15: EventStore.Supervisor.init/1
        (stdlib) supervisor.erl:294: :supervisor.init/1                                                                       
        (stdlib) gen_server.erl:365: :gen_server.init_it/2                                                                    
        (stdlib) gen_server.erl:333: :gen_server.init_it/6                                                                            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3                       

Create command fails to setup database correctly.

On only versions above v0.13.0 I am having trouble creating a fresh database.

On the latest master branch (Release v0.13.2), when I run:

MIX_ENV=test mix event_store.drop
MIX_ENV=test mix event_store.create
mix test

The output is:

=INFO REPORT==== 12-Nov-2017::21:44:40 ===
    application: logger
    exited: stopped
    type: temporary
** (Mix) Could not start application eventstore: EventStore.Application.start(:normal, []) returned an error: shutdown: failed to start child: EventStore.Publisher
    ** (EXIT) an exception was raised:
        ** (FunctionClauseError) no function clause matching in EventStore.Storage.QueryLatestEventNumber.handle_response/1
            (eventstore) lib/event_store/storage/query_latest_event_number.ex:11: EventStore.Storage.QueryLatestEventNumber.handle_response({:error, %Postgrex.Error{connection_id: 59103, message: nil, postgres: %{code: :undefined_table, file: "parse_relation.c", line: "1160", message: "relation \"event_counter\" does not exist", pg_code: "42P01", position: "26", routine: "parserOpenTable", severity: "ERROR", unknown: "ERROR"}}})
            (eventstore) lib/event_store/publisher.ex:26: EventStore.Publisher.start_link/1
            (stdlib) supervisor.erl:365: :supervisor.do_start_child/2
            (stdlib) supervisor.erl:348: :supervisor.start_children/3
            (stdlib) supervisor.erl:314: :supervisor.init_children/2
            (stdlib) gen_server.erl:365: :gen_server.init_it/2
            (stdlib) gen_server.erl:333: :gen_server.init_it/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

As you can see from the error message, the expected table does not exist. When I check the postgres instance, there are no tables created in the new database. The mix event_store.create does not output any errors or warnings.

I get the same response when checking out v0.13.1 and v0.13.0. Any releases before that, such as v0.12.1, work as expected and setup the database correctly. Also the test suites pass.

My postgres version is 9.6.2.

Allow PostgreSQL port number to be defined in config

Extend eventstore config to include port setting:

config :eventstore, EventStore.Storage,
  username: "postgres",
  password: "postgres",
  database: "eventstore_dev",
  hostname: "localhost",
  port: 5432,
  pool_size: 10

Support running on a cluster of nodes

EventStore is currently restricted to run on a single node only.

Support for running on a cluster would allow:

  • Availability - allow our application to continue running when a node fails.
  • Scalability - add nodes to distribute load.
  • Deployment - no downtime deployment via rolling upgrades.

Use:

  • Swarm to distribute stream processes.

Allow `event_store.create` to initialize existing databases

I've started evaluating event_store for production usage over here.

One of the things I've noticed is that the event_store.create mix task refuses to run the initialization commands on an existing database.

We don't want to create another database solely for the event store, so we ended up running Storage.Initializer.run!/1 manually ~ It would be convenient to have an option to run those even if the db is already created.

Would this be an interesting addition? I would have a go at it if that's the case.

Support multiple event stores

Allow multiple event stores to be used in an application by providing an EventStore macro to use in your own module, as per Ecto's repo.

Define your event store module:

defmodule MyApp.EventStore do
  use EventStore, 
    otp_app: :my_app

  # Optional `init/1` function to modify config at runtime.
  def init(config) do
    {:ok, config}
  end
end

Configure the event store:

config :my_app, MyApp.EventStore,
  serializer: EventStore.TermSerializer,
  username: "postgres",
  password: "postgres",
  database: "eventstore_dev",
  hostname: "localhost",
  pool_size: 10,
  pool_overflow: 5

config :my_app, event_stores: [MyApp.EventStore]

Usage:

:ok = MyApp.EventStore.append_to_stream(stream_uuid, expected_version, events)

Append to stream is limited to 7,281 events

Events are appended to a stream using a single INSERT statement. This is to ensure they are inserted with contiguous event ids and for performance reasons. Each event uses nine parameters, and there are an additional three parameters for event id, stream id and stream version. PostgreSQL can support a maximum of 65,535 parameters in a single query. Therefore you are limited to 7,281 events in a single INSERT statement (7,281 * 9 + 3 = 65,532).

Attempting to insert too many events results in the following error:

Postgrex.Protocol (#PID<0.17104.79>) disconnected: ** (RuntimeError) postgresql protocol can not handle 104268 parameters, the maximum is 65535

The events should be inserted in batches (e.g. 1,000 events at a time) within a single database transaction to ensure they are contiguous. This would allow an unlimited number of events to be atomically appended to a single stream in one request.

Could read_all_events_forward miss events?

The read_all_events_forward will miss events on concurrent inserts to the events table.
The values from event_id ar assigned from the sequence on insert. An insert with a higher event_id might be committed and become visible to other transactions before an insert of an event with a lower event_id.

tx 1 INSERT INTO events(...) -- event_id = 1000
tx 2 INSERT INTO events(...) -- event_id = 1001
tx 2 COMMIT
tx 3 SELECT * FROM events WHERE event_id >= 1000 -- returns event with event_id 1001
tx 1 COMMIT
tx 4 SELECT * FROM events WHERE event_id >= 1002 -- returns no events

Json/Term Serializer

Hi, I tried to use the Json serializer, but the data in postgre is still as BYTEA [binary], and I would like to have it stored in JSON.

Also trying to use config :serializer: EventStore.TermSerializer, but the test runs with aprox 6 errors.

Stream all events forward

Extend the read stream forward API to support returning a stream.

Proposal

EventStore.stream_all_forward(start_event_id, read_batch_size)

The optional read_batch_size argument determines how many events are read from the database at a time. Defaults to 1,000 events.

Event store not able to use Ecto adapters

** (Mix) Could not start application eventstore: EventStore.Application.start(:normal, []) returned an error: shutdown: failed to start child: EventStore.Writer
    ** (EXIT) an exception was raised:
        ** (UndefinedFunctionError) function Ecto.Adapters.SQL.Sandbox.start_link/2 is undefined or private
            (ecto) Ecto.Adapters.SQL.Sandbox.start_link(Postgrex.Protocol, [types: true, hostname: "localhost", username: "pauli", adapter: Ecto.Adapters.Postgres, pool: Ecto.Adapters.SQL.Sandbox, database: "eventstore_test", extensions: [{Postgrex.Extensions.Calendar, []}]])
            (eventstore) lib/event_store/writer.ex:21: EventStore.Writer.init/1
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

is this by design? or am i doing something wrong?

Subscribe to a stream from a specified start position

Allow subscriptions to a single stream, or all streams, to optionally specify a given start position.

  • :origin - subscribe to events from the start of the stream (identical to using 0). This is the current behaviour and will remain the default.
  • :current - subscribe to events from the current version.
  • version - provide an exact stream version to subscribe from.

Subscribe to a single stream

subscribe_to_stream(stream_uuid, subscription_name, subscriber, :origin)  # same as using 0
subscribe_to_stream(stream_uuid, subscription_name, subscriber, :current)

stream_version = 1234
subscribe_to_stream(stream_uuid, subscription_name, subscriber, stream_version)

Subscribe to all streams

subscribe_to_all_streams(subscription_name, subscriber, :origin)  # same as using 0
subscribe_to_all_streams(subscription_name, subscriber, :current)

last_seen_event_id = 1234
subscribe_to_all_streams(subscription_name, subscriber, last_seen_event_id)

Allow optimistic concurrency check on write to be optional

Appending events to a stream requires an expected version. This is used for optimistic concurrency checking to ensure writers have not missed an event appended by another process.

:ok = EventStore.append_to_stream(stream_uuid, expected_version, events)

This check should be made optional to allow writers to append events without having first read them from the stream.

Proposal

The expected_version argument should be extended to allow the following options:

  • :any_version - no concurrency checking; allow any stream version (including no stream).
  • :no_stream - ensure the stream does not exist.
  • :stream_exists - ensure the stream exists.
  • an integer value - optimistic concurrency check against expected version (current behaviour).

Example usage

:ok = EventStore.append_to_stream(stream_uuid, :any_version, events)

Allow snapshots to be deleted

The EventStore provides EventStore.record_snapshot and EventStore.read_snapshot functions. But it is not possible to delete a snapshot once recorded.

The Event Store API should be extended with an EventStore.delete_snapshot function.

Disabling `:info` level logging

In our app we are attaching a Logger backend to transport logs to an online logging service (Timber). Since eventstore as a single Logger.info running after every event is recorded, it produces a lot of log noise (events are expected at a very short interval over long periods of time).

Is it possible to convert this into a :debug log or have a configurable logger.

Or is there any way Logger can be turned off for certain apps?

defp handle_response({:ok, %Postgrex.Result{num_rows: num_rows, rows: rows}}, events) do
event_numbers = List.flatten(rows)
_ = Logger.info(fn -> "Appended #{num_rows} event(s) to stream \"#{stream_uuid(events)}\" (event numbers: #{Enum.join(event_numbers, ", ")})" end)
{:ok, event_numbers}
end

Publisher only notifies first pending event batch

Events are appended to streams concurrently, successful writes are sent to the EventStore.Publisher process. They are then published to subscribers in the correct ordered (by event id).

It's possible for events to be received by the publisher out of order. This is handled by storing the pending events and later publishing them in order, once the missing events have been received and published.

An issue has been discovered where only the first batch on pending events are published. This occurs until another event is appended to any stream.

Database schema migration task

To support database schema migrations made between releases, it would be helpful to provide a mix task to migrate an Event Store database:

mix event_store.migrate

The behaviour would be similar to Ecto's Ecto.Migrator.

  1. Define a migration SQL script (e.g. ALTER TABLE events ADD causation_id text;).
  2. Record execution of each migration script to ensure it executes only once.

schema_migrations table name conflicts with ecto table

The schema_migrations table name introduced in v0.10.0 conflicts with the ecto table schema_migrations, yet it has different columns.

This means you are unable to use the same database for eventstore and your phoenix app.

Is there some way we could change the table name to not conflict, or make it configurable?

Thanks

Doc: what is a stream version number?

i looked in the code a bit for docs, and i see version number documented a little bit, but the concept is not obvious, and i still don't understand it or how to use it. i only see it being set to 0 in examples.

Example Serializer using Poison does not work

When using the example serializer shown in the readme I get an error when it's trying to deserialize the Event:

** (UndefinedFunctionError) function :ExampleEvent.__struct__/0 
is undefined (module :ExampleEvent is not available)

I think there is a problem with this code:

  type -> type |> String.to_existing_atom |> struct

type is a string containing ExampleEvent which is converted to an atom :ExampleEvent
But the struct\2 function can't convert Atoms to Structs. Or am I getting something wrong?

Use :binary_id for correlation_id and causation_id

Is there any reason why these two id fields are saved as type text in Postgres?
It's not the end of the world but causing me issues because im using these id's as primary keys in ecto schemas and it feels weird to use :string there when i know that these are UUIDs.

Buffer events between publisher and subscriber

Prevent a subscriber to a single or all streams from being overloaded with published events. A limited size buffer should be used by the subscription.

Each subscriber will ack received events (#18), so the subscription knows how far behind it is. This can be used as a form of back pressure.

A subscription can catch up by going to storage after a spike in published events that it cannot keep up with.

The subscription will buffer events until the subscriber is ready to receive, or an overflow occurs. At which point it will move into a catch-up mode and query events and replay them from storage until caught up.

Storage reset broken in version 0.10.1?

I've been using EventStore.Storage.Initializer.reset!/1 to reset the EventStore database between test runs. For version 0.9.0, this worked fine, but after upgrading, the reset triggers a DBConnection error.

IEx output below from a fresh mix app with only {:eventstore, "~> 0.10.1"} in deps and no further code added, except for default EventStore config from Getting Started in config.exs. Steps taken are the same as in the test helper used in the application where I first encountered the error.

To be sure: I'm entirely open to other suggestions for truncating / resetting the eventstore between database runs if that is easier than to fix this! I though it would be good to open this issue anyway in case this is a sign of something more severe.

IEx output:

iex(1)> :ok = Application.stop(:eventstore)
:ok

14:21:19.568 [info]  Application eventstore exited: :stopped
iex(2)> eventstore_config = Application.get_env(:eventstore, EventStore.Storage)
[serializer: EventStore.TermSerializer, username: "postgres",
 password: "postgres", database: "eventstore_reset_dev", hostname: "localhost",
 pool_size: 10, pool_overflow: 5]
iex(3)> {:ok, conn} = Postgrex.start_link(eventstore_config)
{:ok, #PID<0.208.0>}
iex(4)> EventStore.Storage.Initializer.reset!(conn)
** (EXIT from #PID<0.202.0>) evaluator process exited with reason: an exception was raised:
    ** (FunctionClauseError) no function clause matching in DBConnection.Connection.handle_call/3
        (db_connection) lib/db_connection/connection.ex:192: DBConnection.Connection.handle_call({:checkout, #Reference<0.2456580215.3043753987.104870>, true}, {#PID<0.202.0>, #Reference<0.2456580215.3043753987.104871>}, %{after_connect: nil, after_connect_timeout: 15000, after_timeout: :backoff, backoff: %DBConnection.Backoff{max: 30000, min: 1000, state: {1000, 10000, {:rand, {%{jump: #Function<16.15449617/1 in :rand.mk_alg/1>, max: 288230376151711743, next: #Function<15.15449617/1 in :rand.mk_alg/1>, type: :exsplus}, [248613990448091245 | 111711591027812860]}}}, type: :rand_exp}, broker: nil, client: nil, idle: :passive, idle_time: 0, idle_timeout: 1000, lock: nil, mod: Postgrex.Protocol, opts: [types: Postgrex.DefaultTypes, serializer: EventStore.TermSerializer, username: "postgres", password: "postgres", database: "eventstore_reset_dev", hostname: "localhost", pool_size: 10, pool_overflow: 5], queue: {[], []}, regulator: nil, state: %Postgrex.Protocol{buffer: "", connection_id: 90169, connection_key: 1883835500, null: nil, parameters: #Reference<0.2456580215.3043753987.104864>, peer: {{127, 0, 0, 1}, 5432}, postgres: :idle, queries: #Reference<0.2456580215.3043885059.104861>, sock: {:gen_tcp, #Port<0.5348>}, timeout: 15000, transactions: :naive, types: {Postgrex.DefaultTypes, #Reference<0.2456580215.3043885060.104801>}}, timer: nil})
        (connection) lib/connection.ex:488: Connection.handle_call/3
        (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
        (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
        (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

Use a UUID field for the `event_id` column

To track the correlation and causation identifiers for an event it's necessary to identify each event using a UUID. Postgres natively supports UUID data types.

The proposed change is to:

  • Rename the existing event_id field (bigint) to event_number and add a unique index to the field.
  • Add a new event_id field (UUID) as the primary key.

This will allow the existing correlation_id and causation_id fields to be populated from an event_id as required. This depends on issue #51 (pending pull request #57).

It will also be necessary to provide a SQL migration script to alter any existing databases with these changes.

Ability to configure poolboy options

At the moment, the Postgres workers pool defaults to 10. It would be nice to be able to change it.

I'd like to propose two keys: pool_size and pool_max_overflow.
Totally open to different names, and more than happy to do a PR for this.

Backend agnostic

Hi!

I think that it could be interesting to separate the storage backend and make it configurable.
I actually like PostgreSQL but it would be nice to be able to use something like Apache Kafka, which is built with partitioning in mind, or for less demanding scenarios, NoSQL databases.

Thanks.

Provide better feedback when database creation fails

Before reporting the error, many thanks for sharing the eventstore!

$ mix event_store.create
The EventStore database has been created.
** (Postgrex.Error) ERROR (syntax_error): syntax error at or near "NOT"
(db_connection) lib/db_connection.ex:440: DBConnection.query!/4
(elixir) lib/enum.ex:608: Enum."-each/2-lists^foreach/1-0-"/2
(elixir) lib/enum.ex:608: Enum.each/2
lib/mix/tasks/event_store.create.ex:29: Mix.Tasks.EventStore.Create.run/1
(mix) lib/mix/cli.ex:58: Mix.CLI.run_task/2

wrong_expected_version

I'm getting a

no case clause matching: {{:error, :wrong_expected_version}, %EventStore.Streams.Stream{serializer: Commanded.Serialization.JsonSerializer, stream_id: 38, stream_uuid: "f4b16d5f-d731-4d89-bd60-071d79ccd50c", stream_version: 0}}

Any idea what might be causing this? I upgraded to v0.12.0 not long ago. Might this be related?

Thanks

Make event_store Mix task logic available independently of Mix

Distillery releases do not have Mix or the commandline tool mix available. In order to better support this deployment option, it would be useful to have the functionality of the following tasks available without any Mix dependencies:

  • event_store.create
  • event_store.drop
  • event_store.init
  • event_store.migrate

Support `events.causation_id`

In addition to correlation_id for tracing a thread of execution, in a number of production systems we've found it helpful to also have a causation_id being the ID of the message that directly triggered the creation of the events.

To implement, this is usually a matter of not only copying the correlation_id from the inbound message but also populating the causation_id from the ID of the message itself.

Writer per event stream

Currently Event Store uses a single writer process (EventStore.Writer) to append events to PostgreSQL. The writer assigns the monotonically incrementing event id during insert.

To support running on a cluster of nodes (#53), we need to support multiple writers, distributed amongst the cluster: one writer per event stream.

This means that event id generation must also move to PostgreSQL. It's possible to use a function to assign a value in PostgreSQL (e.g. event_id) from a value in another table during insert. This is to achieve gapless event ids (which are not guaranteed when using a sequence).

In addition, the subscription pub/sub will need to be reworked to support multiple publishers (writers). Persisted events will need to be sent to subscribers ordered by event_id, but may be received by the publisher out of order.


Single writer process (current implementation)

Single EventStore.Writer process to:

  1. Assign the incrementing event id.
  2. Append events to PostgreSQL database.
  3. Publish events to subscriptions.

single_writer_process

Multiple writer processes (proposal)

Each EventStore.Streams.Stream process will append its events to PostgreSQL database. The event id will be assigned by the database. Published events will be sent to subscribers sorted by event id. This will require a pending event buffer as events may be received by the subscriptions process out of order due to multiple writer processes.

multiple_writer_processes

Publisher doesn't appear to resume between restarts

I can't quite seem to work out the best way to set up a Subscriber that receives events after a catch up.

I have GenServer with EventStore.subscribe_to_all_streams("jobs_subscriber", self), and with a bit of logging can see previous events being notified as the server starts up.

However when I append events, the Publisher adds them to pending since it expects the first event to have been published:

[debug] attempting to create stream a1e1ac15-f799-4ce5-95a6-b5f33762a35f
[debug] created stream a1e1ac15-f799-4ce5-95a6-b5f33762a35f with id 39
[info] appended 1 events to stream id 39
[debug] Publisher.handle_cast/2 [:notify_events]
  expected_event_id: 1
  initial_event_id: 39
  last_event_id: 39
iex(9)> [debug] Publisher.handle_cast/2 [:notify_pending_events]
  last_published_event_id: 0

I can see in the AllStreamsSubscription module does a lot more than the example on the README. I mimic'd what I saw in the tests:

    subscription =
      AllStreamsSubscription.new
      |> AllStreamsSubscription.subscribe("$all", to_string(__MODULE__), self)
      |> AllStreamsSubscription.catch_up

But when I append, the publisher still adds it to pending. How should I go about setting up a subscriber or the publisher?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.