Giter Site home page Giter Site logo

walrus's Introduction

walrus

PostgreSQL version License


Source Code: https://github.com/supabase/walrus


Write Ahead Log Realtime Unified Security (WALRUS) is a utility for managing realtime subscriptions to tables and applying row level security rules to those subscriptions.

The subscription stream is based on logical replication slots.

Summary

Managing Subscriptions

User subscriptions are managed through a table

create table realtime.subscription (
    id bigint generated always as identity primary key,
    subscription_id uuid not null,
    entity regclass not null,
    filters realtime.user_defined_filter[] not null default '{}',
    claims jsonb not null,
    claims_role regrole not null generated always as (realtime.to_regrole(claims ->> 'role')) stored,
    created_at timestamp not null default timezone('utc', now()),

    unique (subscription_id, entity, filters)
);

where realtime.user_defined_filter is

create type realtime.user_defined_filter as (
    column_name text,
    op realtime.equality_op,
    value text
);

and realtime.equality_ops are a subset of postgrest ops. Specifically:

create type realtime.equality_op as enum(
    'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
);

For example, to subscribe to a table named public.notes where the id is 6 as the authenticated role:

insert into realtime.subscription(subscription_id, entity, filters, claims)
values ('832bd278-dac7-4bef-96be-e21c8a0023c4', 'public.notes', array[('id', 'eq', '6')], '{"role", "authenticated"}');

Reading WAL

This package exposes 1 public SQL function realtime.apply_rls(jsonb). It processes the output of a wal2json decoded logical replication slot and returns:

  • wal: (jsonb) The WAL record as JSONB in the form
  • is_rls_enabled: (bool) If the entity (table) the WAL record represents has row level security enabled
  • subscription_ids: (uuid[]) An array subscription ids that should be notified about the WAL record
  • errors: (text[]) An array of errors

The jsonb WAL record is in the following format for inserts.

{
    "type": "INSERT",
    "schema": "public",
    "table": "todos",
    "columns": [
        {
            "name": "id",
            "type": "int8",
        },
        {
            "name": "details",
            "type": "text",
        },
        {
            "name": "user_id",
            "type": "int8",
        }
    ],
    "commit_timestamp": "2021-09-29T17:35:38Z",
    "record": {
        "id": 1,
        "user_id": 1,
        "details": "mow the lawn"
    }
}

updates:

{
    "type": "UPDATE",
    "schema": "public",
    "table": "todos",
    "columns": [
        {
            "name": "id",
            "type": "int8",
        },
        {
            "name": "details",
            "type": "text",
        },
        {
            "name": "user_id",
            "type": "int8",
        }
    ],
    "commit_timestamp": "2021-09-29T17:35:38Z",
    "record": {
        "id": 2,
        "user_id": 1,
        "details": "mow the lawn"
    },
    "old_record": {
        "id": 1,
    }
}

deletes:

{
    "type": "DELETE",
    "schema": "public",
    "table": "todos",
    "columns": [
        {
            "name": "id",
            "type": "int8",
        },
        {
            "name": "details",
            "type": "text",
        },
        {
            "name": "user_id",
            "type": "int8",
        }
    ],
    "old_record": {
        "id": 1
    }
}

Important Notes:

  • Row level security is not applied to delete statements
  • The key/value pairs displayed in the old_record field include the table's identity columns for the record being updated/deleted. To display all values in old_record set the replica identity for the table to full
  • When a delete occurs, the contents of old_record will be broadcast to all subscribers to that table so ensure that each table's replica identity only contains information that is safe to expose publicly

Error States

Error 400: Bad Request, no primary key

If a WAL record for a table that does not have a primary key is passed through realtime.apply_rls, an error is returned

Ex:

(
    {
        "type": ...,
        "schema": ...,
        "table": ...
    },                               -- wal
    true,                            -- is_rls_enabled
    [...],                           -- subscription_ids,
    array['Error 400: Bad Request, no primary key'] -- errors
)::realtime.wal_rls;

Error 401: Unauthorized

If a WAL record is passed through realtime.apply_rls and the subscription's clams_role does not have permission to select the primary key columns in that table, an Unauthorized error is returned with no WAL data.

Ex:

(
    {
        "type": ...,
        "schema": ...,
        "table": ...
    },                               -- wal
    true,                            -- is_rls_enabled
    [...],                           -- subscription_ids,
    array['Error 401: Unauthorized'] -- errors
)::realtime.wal_rls;

Error 413: Payload Too Large

When the size of the wal2json record exceeds max_record_bytes the record and old_record objects are filtered to include only fields with a value size <= 64 bytes. The errors output array is set to contain the string "Error 413: Payload Too Large".

Ex:

(
    {..., "record": {"id": 1}, "old_record": {"id": 1}}, -- wal
    true,                                  -- is_rls_enabled
    [...],                                 -- subscription_ids,
    array['Error 413: Payload Too Large']  -- errors
)::realtime.wal_rls;

How it Works

Each WAL record is passed into realtime.apply_rls(jsonb) which:

  • impersonates each subscribed user by setting the appropriate role and request.jwt.claims that RLS policies depend on
  • queries for the row using its primary key values
  • applies the subscription's filters to check if the WAL record is filtered out
  • filters out all columns that are not visible to the user's role

Usage

Given a wal2json replication slot with the name realtime

select * from pg_create_logical_replication_slot('realtime', 'wal2json')

A complete list of config options can be found here:

The stream can be polled with

select
    xyz.wal,
    xyz.is_rls_enabled,
    xyz.subscription_ids,
    xyz.errors
from
    pg_logical_slot_get_changes(
        'realtime', null, null,
        'include-pk', '1',
        'include-transaction', 'false',
        'include-timestamp', 'true',
        'include-type-oids', 'true',
        'write-in-chunks', 'true',
        'format-version', '2',
        'actions', 'insert,update,delete',
        'filter-tables', 'realtime.*'
    ),
    lateral (
        select
            x.wal,
            x.is_rls_enabled,
            x.subscription_ids,
            x.errors
        from
            realtime.apply_rls(data::jsonb) x(wal, is_rls_enabled, subcription_ids, errors)
    ) xyz
where
    xyz.subscription_ids[1] is not null

Or, if the stream should be filtered according to a publication:

with pub as (
    select
        concat_ws(
            ',',
            case when bool_or(pubinsert) then 'insert' else null end,
            case when bool_or(pubupdate) then 'update' else null end,
            case when bool_or(pubdelete) then 'delete' else null end
        ) as w2j_actions,
        coalesce(
            string_agg(
                realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
                ','
            ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
            ''
        ) w2j_add_tables
    from
        pg_publication pp
        left join pg_publication_tables ppt
            on pp.pubname = ppt.pubname
    where
        pp.pubname = 'supabase_realtime'
    group by
        pp.pubname
    limit 1
),
w2j as (
    select
        x.*, pub.w2j_add_tables
    from
         pub,
         pg_logical_slot_get_changes(
            'realtime', null, null,
            'include-pk', '1',
            'include-transaction', 'false',
            'include-type-oids', 'true',
            'include-timestamp', 'true',
            'write-in-chunks', 'true',
            'format-version', '2',
            'actions', pub.w2j_actions,
            'add-tables', pub.w2j_add_tables
        ) x
)
select
    xyz.wal,
    xyz.is_rls_enabled,
    xyz.subscription_ids,
    xyz.errors
from
    w2j,
    realtime.apply_rls(
        wal := w2j.data::jsonb,
        max_record_bytes := 1048576
    ) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
    w2j.w2j_add_tables <> ''
    and xyz.subscription_ids[1] is not null

Configuration

max_record_bytes

max_record_bytes (default 1 MiB): Controls the maximum size of a WAL record that will be emitted with complete record and old_record data. When the size of the wal2json record exceeds max_record_bytes the record and old_record objects are filtered to include only fields with a value size <= 64 bytes. The errors output array is set to contain the string "Error 413: Payload Too Large".

Ex:

realtime.apply_rls(wal := w2j.data::jsonb, max_record_bytes := 1024*1024) x(wal, is_rls_enabled, subscription_ids, errors)

Installation

The project is SQL only and can be installed by executing the contents of sql/walrus--0.1.sql in a database instance.

Tests

Requires

  • Postgres 13+
  • wal2json >= 53b548a29ebd6119323b6eb2f6013d7c5fe807ec

On a Mac:

Install postgres

brew install postgres

Install wal2json

git clone https://github.com/eulerto/wal2json.git
cd wal2json
git reset --hard 53b548a
make
make install

Run the tests, from the repo root.

./bin/installcheck

RFC Process

To open an request for comment (RFC), open a github issue against this repo and select the RFC template.

walrus's People

Contributors

0xflotus avatar dshukertjr avatar egor-romanov avatar olirice avatar w3b6x9 avatar y-yagi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

walrus's Issues

Append additional errors to errors array

errors is currently of type text[] but errors are never appended to the array in walrus--0.1.sql. Is the plan to incorporate all errors into this errors array or should we change the type of errors to text and rename errors to error?

Enable filters for delete events

Feature request

Is your feature request related to a problem? Please describe.

I want to be able to filter delete events based on primary key(s) and when replica identity is set to full, filter based on all columns available in database table.

Describe the solution you'd like

For DELETE events:

  • when I want to filter by a column that is not available in old_columns, then the subscriber is ignored.
  • when replica identity is not full, only filters referencing the pk column(s) are respected.
  • when replica identity is full, filters referencing any column in the table are respected.

Additional context

Default use a temporary replication slot

Summary

By default, walrus should use a temporary replication slot, but allow users to optionally use a permanent replication slot.

Rationale

The wal log can easily get backed up with a permanent replication slot. A temp slot cleans itself upon disconnect.

Drawbacks

People may expect a consistent wal log across client connect / disconnects, which a temporary slot does not provide. Permanent slots enable this guarantee.

Alternatives

Using a postgres wal config to limit the wal size: #12

Support for `in` operator

Hi, I am eagerly waiting for this functionality for my project. Just curious if support for in operator is in the roadmap. It will be super handy if in is supported.

Commit Timestamp is not in Zulu Time Format

Describe the bug

The commit timestamp is not in ISO 8601 Zulu time format. It currently has the following example format:

2022-01-07T02:24:31.114082+00:00

Expected example format

2022-01-06T05:49:57Z

[LOW PRIORITY] Format Commit Timestamp

wal2json is passing the commit_timestamp in the following format:

"2021-10-20 22:11:14.828798+00"

It would be better if it were consistent with timestamptz found in the change data which is in ISO 8601 format:

"2021-10-20T22:11:14.828798+00"

Listening to a bpchar column returns only the first character

Describe the bug

When listening to a column of type bpchar, only the first character is returned.

To Reproduce
Steps to reproduce the behavior:

  1. CREATE TABLE test ( id char(3) primary key );
  2. create publication some_pub for table test;
  3. INSERT INTO test (id) VALUES ('abc');
  4. Run polling query
  5. For record I'm getting back id of just a when I'm expecting abc.

Expected behavior

I'm expecting to get back the entire record instead of just the first character.

Additional context

Error when column type is quoted custom enum

Describe the bug

I cannot listen to a table that has a column of type custom enum with double quotes.

To Reproduce

Steps to reproduce the behavior:

  1. CREATE TYPE "MOOD" AS ENUM ('sad', 'ok', 'happy');
create table test (
  id bigint primary key,
  mood "MOOD"
);
  1. Insert a row
  2. Poll for changes

Expected behavior

I get back database changes without error.

Actual behavior

ERROR:  invalid input syntax for type json
DETAIL:  Token "MOOD" is invalid.
CONTEXT:  JSON data, line 1: ...:"bigint","value":1},{"name":"mood","type":""MOOD...

Additional context

wal2json issue: eulerto/wal2json#224

Realtime events not emitted when used with `eq` filter and the row no longer meets the `eq` condition due to update

Bug report

Describe the bug

Original issue: supabase/supabase-flutter#319

When listening to row level changes using the eq filter, the update or delete events where the column of eq filter changes.

To Reproduce

  1. Create a table with at least one additional column (I set it to type boolean and called it col)
  2. Add some mock data
  3. Use the SDK to retrieve data with an .eq filter: col=eq.true
  4. Lets say there are 2 rows where col is true. These two are delivered to the device and an event is registered everytime something changes in these rows.
  5. If a row A that previously had col = true gets set to col = false, there is no event emitted.

Expected behavior

I have a feeling that this behavior is the intended behavior, or there are some known limitations, but would it be better if there was one last event emitted in the above example when the col was updated to false so that client can react to the fact that the data no longer meets the eq condition?

Raises exception with regtypes that need to be quoted

Confirm

  • quoted regtypes pass through the query
  • quoted regtypes can be used in filters

Note:

select '1'::int -- valid
select '1'::"int" -- invalid

but

select '1'::VoterType   -- invalid
select '1'::"VoterType" -- valid

Incomplete change output for tables in schemas other than 'public'

Describe the bug

When making changes to tables in schemas other than public I get null values for fields such as columns and record.

To Reproduce
Steps to reproduce the behavior:

  1. Listen to cdc.subscription
  2. Make an insert
  3. Get
{
    "columns": null,
    "commit_timestamp": "2021-10-27T03:40:03.047725+00:00",
    "record": null,
    "schema": "cdc",
    "table": "subscription",
    "type": "INSERT"
}

Expected behavior

Expected to get data back for columns and record

Additional context

Did we make a conscious decision to return null for certain fields for schemas other than public? I think it'll be better if it's up to devs to decide which schemas they want to listen to, public or otherwise.

Clear the WAL when its size surpasses some threshold

One possible solution is to drop the realtime replication slot (it's always inactive) whenever its lag size or WAL size surpasses some limit. Another is to tweak Postgres configs like max_slot_wal_keep_size to limit the max size of WAL files for the slot.

Request: Error states return jsonb object with type (INSERT/UPDATE/DELETE)

Summary

Clients are listening to specific event types (i.e. INSERT/UPDATE/DELETE) but when there's a Error 400: Bad Request, no primary key or 401: Unauthorized the payload is:

(
    null,                            -- wal
    true,                            -- is_rls_enabled
    [...],                           -- subscription_ids,
    array['Error 400: Bad Request, no primary key'] -- errors
)::realtime.wal_rls;

Realtime has no way of knowing what type of database change it was so there's no way to properly broadcast to clients.

Request

Return payload:

(
    {"type": "INSERT"},        -- wal
    true,                            -- is_rls_enabled
    [...],                           -- subscription_ids,
    array['Error 400: Bad Request, no primary key'] -- errors
)::realtime.wal_rls;

WALRUS Disregards Publication

Describe the bug

WALRUS emits changes despite absence of publications. In other words, even when there are no existing publications (i.e. supabase_realtime publication doesn't exist), WALRUS will still send changes when polled. We must rely on filter-tables/add-tables to filter out schemas/tables but users are currently able to dynamically alter/drop tables in the supabase_realtime publication via the Supabase Dashboard.

To Reproduce

  1. drop publication if exists supabase_realtime; (assuming no other publications were created)
  2. insert into table
  3. query

Expected behavior

WALRUS should adhere to the tables that were added to the supabase_realtime publication.

Additional context

wal2json doesn't use publications: eulerto/wal2json#164

Reject subscriptions when filter value has the incorrect column type

Describe the bug

Developers sometimes set incorrect values to filters they want to listen to. For example, I've seen developers try and listen to:

users:id=eq.undefined

and other values when they were not of the column type.

This results in Realtime RLS crashing because of realtime.subscription_check_filters(), specifically performing realtime.cast(val text, type_ regtype).

Expected behavior

Prevent the filter record from being inserted into realtime.subscription table which will force Realtime RLS server to reject subscriptions for filters with incorrect value types. This will keep Realtime RLS server running smoothly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.