Giter Site home page Giter Site logo

walex's Introduction

WalEx

Simple and reliable Postgres Change Data Capture (CDC) in Elixir.

WalEx allows you to listen to change events on your Postgres tables then send them on to destinations or perform callback-like actions with the data via the DSL. For example:

  • Stream database changes to an event service like EventRelay
  • Send a user a welcome email after they create a new account
  • Augment an existing Postgres-backed application with business logic
  • Send events to third party services (analytics, CRM, webhooks)
  • Update index / invalidate cache whenever a record is changed

You can learn more about CDC and what you can do with it here: Why capture changes?

Credit

This library borrows liberally from realtime from Supabase, which in turn draws heavily on cainophile.

Installation

If available in Hex, the package can be installed by adding walex to your list of dependencies in mix.exs:

def deps do
  [
    {:walex, "~> 3.8.0"}
  ]
end

PostgreSQL Configuration

Logical Replication

WalEx only supports PostgreSQL. To get started, you first need to configure PostgreSQL for logical replication:

ALTER SYSTEM SET wal_level = 'logical';

Docker Compose:

command: [ "postgres", "-c", "wal_level=logical" ]

Publication

When you change the wal_level variable, you'll need to restart your PostgreSQL server. Once you've restarted, go ahead and create a publication for the tables you want to receive changes for:

All tables:

CREATE PUBLICATION events FOR ALL TABLES;

Or just specific tables:

CREATE PUBLICATION events FOR TABLE user, todo;

Filter based on row conditions (Postgres v15+ only):

CREATE PUBLICATION user_event FOR TABLE user WHERE (active IS TRUE);

Replica Identity

WalEx supports all of the settings for REPLICA IDENTITY. Use FULL if you can use it, as it will make tracking differences easier as the old data will be sent alongside the new data. You'll need to set this for each table.

Specific tables:

ALTER TABLE user REPLICA IDENTITY FULL;
ALTER TABLE todo REPLICA IDENTITY FULL;

Also, be mindful of replication gotchas.

AWS RDS

Amazon (AWS) RDS Postgres allows you to configure logical replication.

When creating a new Postgres database on RDS, you'll need to set a Parameter Group with the following settings:

rds.logical_replication = 1
max_replication_slots = 5
max_slot_wal_keep_size = 2048

Elixir Configuration

Config

# config.exs

config :my_app, WalEx,
  hostname: "localhost",
  username: "postgres",
  password: "postgres",
  port: "5432",
  database: "postgres",
  publication: "events",
  subscriptions: ["user", "todo"],
  # optional
  destinations: [
    # WalEx assumes your module names match this pattern: MyApp.Events.User, MyApp.Events.ToDo, etc
    # but you can also specify custom modules like so:
    # modules: [MyApp.CustomModule, MyApp.OtherCustomModule],
    webhooks: ["https://webhook.site/c2f32b47-33ef-425c-9ed2-f369529a0de8"],
    event_relay_topic: "todos"
  ],
  webhook_signing_secret: "9da89f5f8f4717099c698a17c0d3a1869ee227de06c27b18",
  event_relay: [
    host: "localhost",
    port: "50051",
    token:
      "cmpiNmpFSGhtNVhORFVubDFzUW9OR1JqTlFZOVFFcjRwZWMxS2VWRzJIOnY5NkFRQVFjSVp0TWVmc3hpRl8ydVZuaW9FTC0wX3JrZjhXcTE4MS1EbnVLU1p5VF9OZkpBZGs1SlFuQlNNdVg="
  ],
  name: MyApp

It is also possible to just define the URL configuration for the database

# config.exs

config :my_app, WalEx,
  url: "postgres://username:password@hostname:port/database"
  publication: "events",
  subscriptions: ["user", "todo"],
  name: MyApp

You can also dynamically update the config at runtime:

WalEx.Configs.add_config(MyApp, :subscriptions, ["new_subscriptions_1", "new_subscriptions_2"])
WalEx.Configs.remove_config(MyApp, :subscriptions, "subscriptions")
WalEx.Configs.replace_config(MyApp, :password, "new_password")

Application Supervisor

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {WalEx.Supervisor, Application.get_env(:my_app, WalEx)}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Usage

Event

Returned change data is a List of %Event{} structs with changes. UPDATE event example where name field was changed):

[
  %Walex.Event{
    name: :user,
    type: :update,
    source: %WalEx.Event.Source{
      name: "WalEx",
      version: "3.8.0",
      db: "todos",
      schema: "public",
      table: "user",
      columns: %{
        id: "integer",
        name: "varchar",
        created_at: "timestamptz"
      }
    },
    new_record: %{
      id: 1234,
      name: "Chase Pursley",
      created_at: #DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>
    },
    # we don't show old_record for update to reduce payload size
    # however, you can see any old values that changed under "changes"
    old_record: nil,
    changes: %{
      name: %{
        new_value: "Chase Pursley",
        old_value: "Chase"
      }
    },
    timestamp: ~U[2023-12-18 15:50:08.329504Z]
  }
]

Destinations

Event Module

If your app is named MyApp and you have a subscription called :user (which represents a database table), WalEx assumes you have a module called MyApp.Events.User that uses WalEx Event. But you can also define any custom module, just be sure to add it to the modules config under destinations.

Note that the result of events is a list. This is because WalEx returns a List of transactions for a particular table when there's a change event. Often times this will just contain one result, but it could be many (for example, if you use database triggers to update a column after an insert).

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  # any subscribed event
  on_event(:all, fn events ->
    IO.inspect(events: events)
  end)

  # any user event
  on_event(:user, fn users ->
    IO.inspect(on_event: users)
    # do something with users data
  end)

  # any user insert event
  on_insert(:user, fn users ->
    IO.inspect(on_insert: users)
  end)

  on_update(:user, fn users ->
    IO.inspect(on_update: users)
  end)

  on_delete(:user, fn users ->
    IO.inspect(on_delete: users)
  end)
Filters

A common scenario is where you want to "unsubscribe" from specific records (for example, temporarily for a migration or data fix). One way to accomplish this is to have a column with a value like event_subscribe: false. Then you can ignore specific events by specifying their key and value to unwatched_records.

Another scenario is you might not care when just certain fields change. For example, maybe a database trigger sets updated_at after a record is updated. Or a count changes, or several do that you don't need to react to. In this case, you can ignore the event change by adding them to unwatched_fields.

Additional filter helpers available in the WalEx.TransactionFilter module.

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  @filters %{
    unwatched_records: %{event_subscribe: false},
    unwatched_fields: ~w(event_id updated_at todos_count)a
  }

  on_insert(:user, @filters, fn users ->
    IO.inspect(on_insert: users)
    # resulting users data is filtered
  end)
end
Functions

You can also provide a list of functions (as atoms) to be applied to each Event (after optional filters are applied). Each function is run as an async Task on each event. The functions must be defined in the current module and take a single event argument. Use with caution!

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  @filters %{unwatched_records: %{event_subscribe: false}}
  @functions ~w(send_welcome_email add_to_crm clear_cache)a

  on_insert(:user, @filters, @functions, fn users ->
    IO.inspect(on_insert: users)
    # resulting users data is first filtered then functions are applied
  end)

  def send_welcome_email(user) do
    # logic for sending welcome email to new user
  end

  def add_to_crm(user) do
  # logic for adding user to crm system
  end

  def clear_cache(user) do
  # logic for clearing user cache
  end
end

You can optionally configure WalEx to automatically send events to the following destinations without needing to know Elixir:

Webhooks

Send subscribed events to one or more webhooks. WalEx supports the Standard Webhooks spec via the webhoox library (which can also be used to receive webhooks in Elixir).

EventRelay

If you need something more durable and flexible than webhooks, check out EventRelay.

In EventRelay, you'll need to create a topic matching what's in the WalEx destinations config. So, if your event_relay_topic is called todos (usually this is the database name), then your topic name in EventRelay should be todos. Here's how to do it via grpcurl:

grpcurl -H "Authorization: Bearer {api_key_token}" -plaintext -proto event_relay.proto -d '{"name": "todos"}' localhost:50051 eventrelay.Topics.CreateTopic

Coming Soon

More destinations coming. Pull requests welcome!

Test

You'll need a local Postgres instance running

MIX_ENV=test mix walex.setup
MIX_ENV=test mix test

Help?

I would love to have your help! I do ask that if you do find a bug, please add a test to your PR that shows the bug and how it was fixed.

Thanks!

walex's People

Contributors

cpursley avatar dohankim avatar drumusician avatar kianmeng avatar oortlieb avatar pasm94 avatar phanmn avatar rohfosho avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

walex's Issues

Provided setup example has a minor bug

Copy/pasting the example from the README yielded the following error:

** (Mix) Could not start application XXX: XXX.Application.start(:normal, []) returned an error: shutdown: failed to start child: WalEx.Supervisor
    ** (EXIT) an exception was raised:
        ** (Jason.DecodeError) unexpected byte at position 0: 0x65 ("e")
            (jason 1.3.0) lib/jason.ex:92: Jason.decode!/2
            (walex 0.7.4) lib/walex/supervisor.ex:40: WalEx.Supervisor.init/1
            (stdlib 3.16) supervisor.erl:330: :supervisor.init/1
            (stdlib 3.16) gen_server.erl:423: :gen_server.init_it/2
            (stdlib 3.16) gen_server.erl:390: :gen_server.init_it/6
            (stdlib 3.16) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

The relevant part of my :walex config:

config :walex,
 db_host: "localhost",
 db_user: "postgres",
 db_name: "postgres",
 // omitting some stuff
 max_replication_lag_in_mb: 0,
 publications: ["example"], // this line causes the error
 // more unrelated stuff

Digging into it a bit, it looks like the :publications key is being JSON decoded. The README could be updated to indicate that value should be a JSON-decodable string, or the Jason.decode() could be removed from lib/walex/supervisor.ex:40. In my case, updating the :publications value to publications: "[\"example\"]" avoids the error.

Add Tests

Add tests after refactoring to use Postgrex Replication.

See Supabase realtime for existing test coverage.

How does it work on multiple nodes?

Let's say I have a cluster of 5 different nodes, all running walex. What would happen in that scenario? Would they all trigger when an event was published? Do they round robin them? Something else more clever? Etc.?

Great project, appreciate it.

Event Struct: Combine records and expose Postgres type

I've been kicking around the idea of sending through a combined old_record and record, showing changes and type. But that would be majorly breaking. Something like this:

%WalEx.Event{
    table: :contact,
    type: :update,
    record: %{
      id: %{
        type: "uuid",
        value: "12c48306-769e-4f92-8d9c-0beca1453d29",
        old_value: "12c48306-769e-4f92-8d9c-0beca1453d29",
        changed: false
      },
      phone_number: %{
        type: "varchar",
        value: "555-444-3312",
        old_value: "555-444-3311",
        changed: true
      },
      location_geography: %{
        type: "geography",
        value: "0101000020E6100000303DAB16A21855C0585AA1A3B0E34040",
        old_value: "0101000020E6100000303DAB16A21855C0585AA1A3B0E34040",
        changed: false
      },
      appraisals_count: %{
        type: "int4",
        value: 41,
        old_value: 41,
        changed: false
      },
      updated_at: %{
        type: "timestamptz",
        value: #DateTime<2023-12-05 09:30:44.295628-05:00 -05 Etc/UTC-5>,
        old_value: #DateTime<2023-11-30 09:19:03.827733-05:00 -05 Etc/UTC-5>,
        changed: true
      }
      # ... etc
    },
    commit_timestamp: ~U[2023-12-05 14:30:44.310865Z]
}

This would allow (in theory) for the table and types to be reconstituted elsewhere with the correct postgres types.

Show Old_record on update

is there any way / options to enable old_record when update is performed?
this is very reasonable to avoid query overhead when performing update on elasticsearch / clickhouse or others

DSL for handling events

Instead of:

def process(txn) do
  cond do
    insert_event?(:user_account, txn) ->
      {:ok, user_account} = event(:user_account, txn)
      IO.inspect(user_account_insert_event: user_account)
      # do something with user_account data

    update_event?(:user_account, txn) ->
      {:ok, user_account} = event(:user_account, txn)
      IO.inspect(user_account_update_event: user_account)

    # you can also specify the relation
    delete_event?("public.user_account", txn) ->
      {:ok, user_account} = event(:user_account, txn)
      IO.inspect(user_account_delete_event: user_account)

    true ->
      nil
  end
end

A DSL like this would be neat:

on_update(:user_account) do
  # code here
end

Columns: Allowed whitelist

Allow for optional whitelist of allowed (or disallowed) columns to be published (for security/sensitive data, performance/payload reasons). Perhaps also for which column changes to listen to / care about (user might not care if values of certain columns change. Could be added to subscriptions config.

DSL not working

problem: using DSL like README gives a compilation error (v 3.6.0)

reason: wrong macro definition

the codes in the WalEx.Event is defining macro like below.

defmodule MacroModule do
  defmacro __using__(_) do
    quote do
      defmacro define_hello(do: block) do
        quote do
          def hello() do
            unquote(block).("hello world")
          end
        end
      end
    end
  end
end

defmodule Test do
  use MacroModule

  define_hello(do: fn x -> IO.puts(x) end)
end

Test.hello()

this code gives an error.

     error: undefined function define_hello/1 (there is no such import)
     
       define_hello(do: fn x -> IO.puts(x) end)
        ^

You intended something like the one below per README.

defmodule MacroModule do
  defmacro __using__(_) do
    quote do
      import MacroModule
    end
  end

  defmacro define_hello(do: block) do
    quote do
      def hello() do
        unquote(block).("hello world")
      end
    end
  end
end

defmodule Test do
  use MacroModule

  define_hello(do: fn x -> IO.puts(x) end)
end

Test.hello()

Error: :erlang.binary_to_float("2222222")

Thank you for awesome library!

Here is an error when I try to connect to my DB

Screen Shot 2022-07-10 at 17 22 59

My work around is using (instead of to_float())

record |> Float.parse() |> elem(0)

Set up Horde (for running on multiple nodes)

#41

Similar to Nebulex

defmodule MyApp.Cache do
  use Nebulex.Cache,
    otp_app: :my_app,
    adapter: Nebulex.Adapters.Horde,
    horde: [
      members: :auto,
      process_redistribution: :passive
      # any other Horde options ...
    ]
end

But perhaps just in regular config instead of a module.

recovery after temporary DB connection close

It happens that sometimes the DB connection is closed.
Even though other processes are restarted and reconnected shortly after the temporary connection issue is resolved,
Walex just stopped working.

Can you give me some ideas and how to implement them?
(ex: reconnecting after exponential backoff)
@cpursley

Change subscriptions and db connection on runtime

Hi! Thanks for the great library. I already got a very small example app running and it's working really well.

I'd like to build an http interface for this, so I can pack it into a docker container. The api should allow to change the list of tables that walex is subscribed to and ultimately it should also allow to change the db url while the docker container is running.

Is there any possibility this could work?

I'm quite new to elixir, so sorry if my question is dumb.

Great Idea!

Thanks for building this. Love this idea. Would love to relay all changes into S3 using Kinesis or something

It does not work on every server boot

I have an issue that it does not work on every server boot in my dev environment. When it works,
it works 100% and vice versa. My guess, some sort of race condition. The processes seem to be alive and seems to be the same in both scenarios.

dev.exs

config :tutorial, Tutorial.Repo,
  username: "postgres",
  password: "postgres",
  hostname: "localhost",
  database: "phx_tutorials_dev",
  stacktrace: true,
  show_sensitive_data_on_connection_error: true,
  pool_size: 10

config :tutorial, WalEx,
  hostname: "localhost",
  username: "postgres",
  password: "postgres",
  port: "5432",
  database: "phx_tutorials_dev",
  publication: "events",
  subscriptions: [:companies],
  modules: [Tutorial.CompanyEvent],
  name: Tutorial

I run Elixir 1.15.7 (compiled with Erlang/OTP 26)

bild

bild

I run Postgres 15 (postgres.app)
and ran these and restarted the server:

ALTER SYSTEM SET wal_level = 'logical';
CREATE PUBLICATION events FOR ALL TABLES;
ALTER TABLE companies REPLICA IDENTITY FULL;

[ask] how to determine max_wal_senders

im getting error [25360]:FATAL: number of requested standby connections exceeds max_wal_senders (currently 25) while replicating my production db
my db currently on RDS and have 2 slave
how to determin max_wal_senders to avoid getting this error?

Ability to take and apply a list of functions

The function would take the event data as the arg. Functions could be applied asynchronously.

on_insert(:user, [:clear_cache, :send_email, :send_webhook],
  fn {:ok, user} -> IO.inspect(on_insert: user)
end)

def clear_cache(user) do
  # cache clearing logic here
end

# etc

This is probably a bad idea for a variety of reasons, but might be cool.

Python

Thanks for this project! Seeing it mentioned on HN piqued my interest. Small question, can this be used by a Python app, or would the interfacing language have to be Elixir?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.