Giter Site home page Giter Site logo

sysadminmike / couch-to-postgres Goto Github PK

View Code? Open in Web Editor NEW
110.0 7.0 17.0 59 KB

Node libary to stream CouchDB changes into PostgreSQL

License: BSD 2-Clause "Simplified" License

JavaScript 94.44% Shell 5.56%
couchdb postgresql synchronization change-data-capture cdc cloudant

couch-to-postgres's Introduction

couch-to-postgres / pgcouch / couchpg / couchgres / postcouch

Node libary to stream CouchDB changes into PostgreSQL with a simple client example. Based on https://github.com/orchestrate-io/orchestrate-couchdb.

By adding a few some extra bits allows not only for SELECT queries on the data but also UPDATE/INSERT/DELETE on your couchdb docs within Postgres. It is also possible to use your couch views as tables.

Basically it allows postgres to use couchdb as its datastore - sort of like a Foreign Data Wrapper https://wiki.postgresql.org/wiki/Foreign_data_wrappers eg couchdb_fdw - but has a near realtime copy of records in postgres.

For example:

Add a doc to a couch

  curl -X PUT http://192.168.3.21:5984/example/1234567 -d '{"myvar":"foo"}'
  {"ok":true,"id":"1234567","rev":"1-d3747a58baa817834a21ceeaf3084c41"}      

See it in postgres:

 postgresdb=> SELECT id, doc FROM example WHERE id='1234567';

     id    |                                       doc                                        
  ---------+----------------------------------------------------------------------------------
   1234567 | {"_id": "1234567", "_rev": "1-d3747a58baa817834a21ceeaf3084c41", "myvar": "foo"}
  (1 row)

Update a doc using postgres:

  postgresdb=> UPDATE example 
  postgresdb-> SET doc=json_object_set_key(doc::json, 'myvar'::text, 'bar'::text)::jsonb, from_pg=true 
  postgresdb-> WHERE id='1234567';
  DEBUG:  pgsql-http: queried http://192.168.3.21:5984/example/1234567
  CONTEXT:  SQL statement "SELECT headers FROM http_post('http://192.168.3.21:5984/' || TG_TABLE_NAME || '/' || NEW.id::text, '', NEW.doc::text, 'application/json'::text)"
  PL/pgSQL function couchdb_put() line 9 at SQL statement
  UPDATE 0

The couchdb_put function needs some more work.

See it in couch:

  	curl -X GET http://192.168.3.21:5984/example/1234567 
    {"_id":"1234567","_rev":"2-b9f4c54fc36bdeb78c31590920c9751b","myvar":"bar"}

And in postgres:

  postgresdb=> SELECT id, doc FROM example WHERE id='1234567';
     id    |                                       doc                                        
  ---------+----------------------------------------------------------------------------------
   1234567 | {"_id": "1234567", "_rev": "2-b9f4c54fc36bdeb78c31590920c9751b", "myvar": "bar"}
  (1 row)

Add a doc using postgres

  postgresdb=> INSERT INTO example (id, doc, from_pg) VALUES ('7654321', json_object('{_id,myvar}','{7654321, 100}')::jsonb, true);
  DEBUG:  pgsql-http: queried http://192.168.3.21:5984/example/7654321
  CONTEXT:  SQL statement "SELECT headers FROM http_post('http://192.168.3.21:5984/' || TG_TABLE_NAME || '/' || NEW.id::text, '', NEW.doc::text, 'application/json'::text)"
  PL/pgSQL function couchdb_put() line 9 at SQL statement
  INSERT 0 0

See it in couch

  curl -X GET http://192.168.3.21:5984/example/7654321 
  {"_id":"7654321","_rev":"1-08343cb32bb0903348c0903e574cfbd0","myvar":"100"}

Update doc created postgres with couch

  curl -X PUT http://192.168.3.21:5984/example/7654321 -d '{"_id":"7654321","_rev":"1-08343cb32bb0903348c0903e574cfbd0","myvar":"50"}'
  {"ok":true,"id":"7654321","rev":"2-5057c4942c6b92f8a9e2c3e5a75fd0b9"

See it in postgres

  SELECT id, doc FROM example WHERE id='1234567';
     id    |                                       doc                                        
  ---------+----------------------------------------------------------------------------------
   1234567 | {"_id": "1234567", "_rev": "2-b9f4c54fc36bdeb78c31590920c9751b", "myvar": "bar"}
  (1 row)

Add some more docs

  INSERT INTO example (id, doc, from_pg) VALUES ('test1', json_object('{_id,myvar}','{test1, 100}')::jsonb, true);
  INSERT INTO example (id, doc, from_pg) VALUES ('test2', json_object('{_id,myvar}','{test2, 50}')::jsonb, true);

or

  curl -X PUT http://192.168.3.21:5984/example/test3 -d '{"_id":"test3", "myvar":"100"}'
  curl -X PUT http://192.168.3.21:5984/example/test4 -d '{"_id":"test4", "myvar":"50"}'
  curl -X PUT http://192.168.3.21:5984/example/test5 -d '{"_id":"test5", "myvar":"70"}'
  curl -X PUT http://192.168.3.21:5984/example/test6 -d '{"_id":"test6", "myvar":"20"}'
  curl -X PUT http://192.168.3.21:5984/example/test7 -d '{"_id":"test7", "myvar":"10"}'

Do a query on the docs

  SELECT id, doc->'myvar' AS myvar FROM example 
  WHERE id LIKE 'test%' AND CAST(doc->>'myvar' AS numeric) > 50
  ORDER BY myvar
  
    id   | myvar 
  -------+-------
   test3 | "100"
   test1 | "100"
   test5 | "70"
  (3 rows)

Update some of the docs

 UPDATE example 
 SET doc=json_object_set_key(
        doc::json, 'myvar'::text, (CAST(doc->>'myvar'::text AS numeric) + 50)::text
     )::jsonb,
     from_pg=true  
 WHERE id LIKE 'test%' AND CAST(doc->>'myvar' AS numeric) < 60

Peform same query

SELECT id, doc->'myvar' AS myvar FROM example 
WHERE id LIKE 'test%' AND CAST(doc->>'myvar' AS numeric) > 50
ORDER BY myvar

   id   | myvar 
 -------+-------
  test4 | "100"
  test2 | "100"
  test3 | "100"
  test1 | "100"
  test7 | "60"
  test5 | "70"
  test6 | "70"
 (7 rows)

Initially I didnt spot the above order being wrong so you need to be careful.

  SELECT id, CAST(doc->>'myvar' AS numeric) as myvar FROM example 
  WHERE id LIKE 'test%' AND CAST(doc->>'myvar' AS numeric) > 50
  ORDER BY myvar, doc->>'_id'

   id   | myvar 
 -------+-------
  test7 | "60"
  test5 | "70"
  test6 | "70"
  test1 | "100"
  test2 | "100"
  test3 | "100"
  test4 | "100"
 (7 rows)

Order is now correct.

And finally in couch

 curl -s -X POST '192.168.3.21:5984/example/_temp_view?include_docs=false' -H 'Content-Type: application/json' \
 -d '{"map":"function(doc) { emit(doc._id, doc.myvar) };"}'  
 {"total_rows":7,"offset":0,"rows":[
 {"id":"test1","key":"test1","value":"100"},
 {"id":"test2","key":"test2","value":"100"},
 {"id":"test3","key":"test3","value":"100"},
 {"id":"test4","key":"test4","value":"100"},
 {"id":"test5","key":"test5","value":"70"},
 {"id":"test6","key":"test6","value":"70"},
 {"id":"test7","key":"test7","value":"60"}
 ]}

It is also possible to use a couchdb view as a table:

The couch design doc:

{
  "_id": "_design/mw_views",
  "language": "javascript",
  "views": {
      "by_feedName": {
      "map": "function(doc) { emit(doc.feedName,null); }",
      "reduce": "_count"
    },
    "by_tags": {
      "map": "function(doc) { for(var i in doc.tags) { emit (doc.tags[i],null);  } }",
      "reduce": "_count"
    }
 }
}


WITH by_feedname_reduced AS (
  SELECT * FROM json_to_recordset(
    (
     SELECT  (content::json->>'rows')::json  
     FROM http_get('http://192.168.3.23:5984/articles/_design/mw_views/_view/by_feedName?group=true'))
    ) AS x (key text, value text)
)

SELECT * FROM by_feedname_reduced WHERE value::numeric > 6000 ORDER BY key 

This takes under a second to run but the initial build of the view takes about 20 mins for a fresh copy of the couchdb.

The equivilent query using the the data in postgres

WITH tbl AS (
    SELECT DISTINCT doc->>'feedName' as key, COUNT(doc->>'feedName') AS value 
    FROM articles
    GROUP BY doc->>'feedName'
)
SELECT key, value FROM tbl WHERE value > 6000 ORDER BY key;

This takes over 4 seconds.

Testing with my articles database from birdreader - https://github.com/glynnbird/birdreader

curl -X GET http://localhost:5984/articles
{"db_name":"articles","doc_count":63759,"doc_del_count":2,"update_seq":92467,"purge_seq":0,"compact_running":false,"disk_size":151752824,"data_size":121586165,"instance_start_time":"1418686121041424","disk_format_version":6,"committed_update_seq":92467}


SELECT DISTINCT jsonb_object_keys(doc) AS myfields
FROM articles ORDER BY myfields

This queries all of the documents and retrieves the couch documents fields.

On another couch database with a 'type' field for different doc types stored in the same database - about 70k docs.

SELECT DISTINCT doc->>'type' as doctype, count(doc->>'type')
FROM mytable GROUP BY doctype ORDER BY doctype 

Takes under a second.

SELECT DISTINCT doc->>'type' as doctype, jsonb_object_keys(doc) AS myfields
FROM mytable
ORDER BY doctype , myfields;

With no indexes the above query takes just over 10 secs. I have made no indexes or adjustments to the default FreeBSD postgresql94-server-9.4.r1 port.


Example setup and postgres configuration

git clone [email protected]:sysadminmike/couch-to-postgres.git

Get needed modules:

npm install 

Edit ./bin/index.js to suite your settings:

var settings =
      {
        couchdb: {
         url: 'http://192.168.3.21:5984',
         pgtable:  'example',
         database: 'example'
       }
      };

 pgclient = new pg.Client("postgres://mike@localhost/pgdatabase");

Before starting it up create the since_checkpoints table

CREATE TABLE since_checkpoints
(
  pgtable text NOT NULL,
  since numeric DEFAULT 0,
  enabled boolean DEFAULT false, --not used in the simple client example
  CONSTRAINT since_checkpoint_pkey PRIMARY KEY (pgtable)
)

This table is used to store the checkpoint for the database(s) being synced something akin to the couchdb _replicator database.

Create the table to store the couch docs:

CREATE TABLE example
(
  id text NOT NULL,
  doc jsonb,
  CONSTRAINT example_pkey PRIMARY KEY (id)
)

Start watching changes

./bin/index.js

It will add a record to the since_checkpoints table and begin syncing.

At this point you can now perform SELECT queries the docs within postgres as in the above example. This should be fine to use against a production couchdb as it makes no changes to and is performing the same tasks as the elastic search river plugin. With a bit of copy/pasting it is possible to use sql to create simple scripts or one liners to run in a shell with curl.

Also take a look at /bin/daemon.js and https://github.com/sysadminmike/couch-to-postgres/blob/master/daemon-README.md


To handle UPDATE/INSERT/DELETE more configuration is required. Note this is still experimental so I wouldnt point this at any production data.

First install the postgres extension pgsql-http at https://github.com/pramsey/pgsql-http

See note about pgsql-http module install if you not sure how to install a postgres extension - note this has just been updated to handle PUT and DELETE requests - i have not yet had a chane to test anything i have done on this page with the new version but will try to shortly but think any reference to http_post need updating as the new version specifies:

http_post(uri VARCHAR, content VARCHAR, content_type VARCHAR)

But on this page I am using the old one:

http_post(url VARCHAR, params VARCHAR, data VARCHAR, contenttype VARCHAR DEFAULT NULL)

So please bear this in mind if setting this up.

Then add it in the database you want to use:

CREATE EXTENSION http

If you havent already done it:

CREATE TABLE since_checkpoints   
(
  pgtable text NOT NULL,
  since numeric DEFAULT 0,
  enabled boolean DEFAULT false,
  CONSTRAINT since_checkpoint_pkey PRIMARY KEY (pgtable)
);

Add function to put data into couchdb:

CREATE OR REPLACE FUNCTION couchdb_put() RETURNS trigger AS $BODY$
DECLARE
    RES RECORD;
BEGIN
 IF (NEW.from_pg) IS NULL THEN
   RETURN NEW;
 ELSE 
   
   SELECT status FROM http_post('http://192.168.3.21:5984/' || TG_TABLE_NAME || '/' || NEW.id::text, '', NEW.doc::text, 'application/json'::text) INTO RES;    

   --Need to check RES for response code
   --RAISE EXCEPTION 'Result: %', RES;
   RETURN null;
 END IF;
END;
$BODY$
LANGUAGE plpgsql VOLATILE  

Add function to modify fields inside the PostgreSQL JSON datatype - from: http://stackoverflow.com/questions/18209625/how-do-i-modify-fields-inside-the-new-postgresql-json-datatype

CREATE OR REPLACE FUNCTION json_object_set_key(json json, key_to_set text, value_to_set anyelement)
  RETURNS json AS
$BODY$
SELECT COALESCE(
  (SELECT ('{' || string_agg(to_json("key") || ':' || "value", ',') || '}')
     FROM (SELECT *
             FROM json_each("json")
            WHERE "key" <> "key_to_set"
            UNION ALL
           SELECT "key_to_set", to_json("value_to_set")) AS "fields"),
  '{}'
)::json
$BODY$
  LANGUAGE sql IMMUTABLE STRICT;

Create table to hold the docs

CREATE TABLE example
(
  id text NOT NULL,
  doc jsonb,
  from_pg boolean, -- for trigger nothing stored here
  CONSTRAINT example_pkey PRIMARY KEY (id)
);

Create trigger to stop data being inserted into the table from sql and send off to couch instead

CREATE TRIGGER add_doc_to_couch 
BEFORE INSERT OR UPDATE 
ON example FOR EACH ROW EXECUTE PROCEDURE couchdb_put();

Note: All queries in postgres must have "from_pg=true" for inserts and updates or the postgres will send the data to the table and not send it to couch.

I plan to reverse this logic and make the libary include this so it will be possible to issue inserts/updates and exclude this field.

You can now start the node client and give it a test.


A few variable to tune in ./lib/index.js need to move to config options

In checkpoint_changes function:

ckwait = 3 * 1000;  

This is how often the stream is checkpointed when the stream is active. I would adjust this depending on how busy you couchdb is. When the stream is idle this increases to 10 secs.

In startFollowing function there is: // The inactivity timer is for time between changes, or time between the // initial connection and the first change. Therefore it goes here. stream.inactivity_ms = 30000;

Maybe use NOTIFY and have node client LISTEN for a message when postgres calls couchdb_put() for the first time (can you do a timer in postgres?? or node will get notified about every update and only needs a wake up after idle time).


Performance wise compared to the php dumping script

On a test with a couchdb of about 150Mb with 65k docs the node libary complete working through _changes in about 17 minutes to add all the docs to an empty table and then keeps it in sync.

The couch-to-postgres-php-dumper script - https://github.com/sysadminmike/couch-to-postgres-php-dump takes about 28 minutes for the initial sync and 11 secs for a resync.

Replicating the same db from one jail running couch to another couch jail on the same machine as a baseline takes just over 8 minutes:

{"session_id":"661411f2137c64efc940f55b802dc35b","start_time":"Tue, 16 Dec 2014 17:00:05 GMT","end_time":"Tue, 16 Dec 2014 17:08:10 GMT","start_last_seq":0,"end_last_seq":92862,"recorded_seq":92862,"missing_checked":63840,"missing_found":63840,"docs_read":63840,"docs_written":63840,"doc_write_failures":0}

Looking at top I think postgres is waiting on the disk most of the time rather than the process being cpu bound - the single php process calling curl for each doc was hitting the cpu hard and couldnt be used as a solution for huge databases or have the ability to deal with more than one db at once.

On further testing with some dogey postgres conf settings:

fsync = off
synchronous_commit = off

As postgres is not the primary datastore its ok if the data dies considering a full rebuild now is under 2 mins:

mike:~/postgres-couch/couch-to-postgres-test % time ./bin/index.js
articlespg: {"db_name":"articlespg","doc_count":63838,"doc_del_count":2,"update_seq":63840,"purge_seq":0,"compact_running":false,"disk_size":242487416,"data_size":205414817,"instance_start_time":"1418749205916149","disk_format_version":6,"committed_update_seq":63840}
Connected to postgres
articlespg: initial since=0
articlespg: Starting checkpointer
articlespg: Checkpoint set to 7180 next check in 3 seconds
articlespg: Checkpoint set to 9344 next check in 3 seconds
articlespg: Checkpoint set to 11536 next check in 3 seconds
...
articlespg: Checkpoint set to 60920 next check in 3 seconds
articlespg: Checkpoint set to 63636 next check in 3 seconds
articlespg: Checkpoint set to 63840 next check in 3 seconds
articlespg: Checkpoint 63840 is current next check in: 10 seconds
^C45.919u 3.226s 1:42.10 48.1%  10864+321k 158+0io 0pf+0w
mike:~/postgres-couch/couch-to-postgres-test % 

So down to well under 2 minutes now todo the initial sync of the same test db - so 4 times faster than a native couch to couch sync. I think this is faster than Elastic search river doing a similar task.

Snippet from top while it was syncing:

  PID USERNAME    THR PRI NICE   SIZE    RES STATE   C   TIME    WCPU COMMAND
57635 mike          6  45    0   621M 66064K uwait   1   0:25  50.78% node
57636     70        1  36    0   186M 97816K sbwait  1   0:11  22.75% postgres
44831    919       11  24    0   181M 30048K uwait   0  67:28  20.51% beam.smp
23891    919       11  20    0   232M 69168K uwait   0  26:22   0.39% beam.smp
57624     70        1  20    0   180M 17840K select  0   0:00   0.29% postgres
57622     70        1  21    0   180M 65556K select  1   0:00   0.20% postgres

Possible ways to deploy - master-master postgres setup using couchdbs primary data store and setting up replication between all locations using Postgres and Couch as a pair.

 Location 1                              Location 2
 Postgres == CouchDB ---------- CouchDB == Postgres
                     \        /
                      \      /
                       \    /
                        \  /
                         \/    
                     Location 3
                      CouchDB
                        ||
                      Postgres  
                           
 Where === is the node client keeping the paired postgres up to date
 And ----- is couchdb performing replication 

IDEAS/TODOS - Comments most welcome.

How to do bulk updates:

WITH new_docs AS (
  SELECT json_object_set_key(doc::json, 'test'::text, 'Couch & Postgres are scool'::text)::jsonb AS docs
  FROM articlespg
),
agg_docs AS (
  SELECT json_agg(docs) AS aggjson FROM new_docs
)

SELECT headers FROM 
  http_post('http://192.168.3.21:5984/articlespg/_bulk_docs', '',
  '{"all_or_nothing":true, "docs":' || (SELECT * FROM agg_docs) || '}',
  'application/json'::text) ;    

I tried on the articles test db i am using and it was very fast for an update to < 100 rows I then tried to update all docs and crashed couch

DEBUG:  pgsql-http: queried http://192.168.3.21:5984/articlespg/_bulk_docs
ERROR:  Failed to connect to 192.168.3.21 port 5984: Connection refused
couchplay=> 

However if we split up the request in to smaller chunks:

WITH newdocs AS ( -- Make chage to json here 
  SELECT id, json_object_set_key(doc::json, 'test'::text, 'Couch & Postgres are scool'::text)::jsonb AS docs
  FROM articlespg 
),
chunked AS (  -- get in chunks 
SELECT docs, ((ROW_NUMBER() OVER (ORDER BY id) - 1)  / 50) +1 AS chunk_no  
FROM newdocs
),
chunked_newdocs AS (  -- Bulk up bulk_docs chunks to send 
    SELECT json_agg(docs) AS bulk_docs, chunk_no FROM chunked GROUP BY chunk_no  ORDER BY chunk_no
)

SELECT chunk_no, status FROM chunked_newdocs,
       http_post('http://192.168.3.21:5984/articlespg/_bulk_docs', '',
       '{"all_or_nothing":true, "docs":' || (bulk_docs) || '}',
       'application/json'::text); 

Chunk size - in this case 50 - i think safe to go to about 500 or 1000 depending on doc size - I tried 1000 to begin with but http_post timed out - and 500 seems to be fine.

Watching the node daemon while running chunked bulk updates i can see the changes streaming back to postgres almost as soon as the start so i think better using an UPDATE as postgres doesnt lock the table while this is happening ***Note need to retest this.

However I think better to change all PUTS to bulks POSTS - need a function like:

 post_docs(docs,chunk_size) - returning recordset of status codes? or just true/false?

how to deal with the case where there are 5 chunks and the first 2 sucseed but the 3rd fails? is it possible to rollback a transaction in postgres and give the function oldocs and newdocs then a post_docs chunk fails it can rollback the chunks which have succeeded?

to be used after like:

 SELECT post_docs(json_object_set_key(doc::json, 'test'::text, 'Couch & Postgres are scool'::text)::jsonb,100)
        AS results
 FROM articlespg

This also makes it very simple to make new databases - just add a new db in couch and change the url to point to it:

chunked AS (
  SELECT docs, ((ROW_NUMBER() OVER (ORDER BY id) - 1)  / 500) +1 AS chunk_no  
  FROM articlespg
),
chunked_newdocs AS (
   SELECT json_agg(docs) AS bulk_docs, chunk_no FROM chunked GROUP BY chunk_no  ORDER BY chunk_no
)
SELECT chunk_no, status FROM chunked_newdocs,
       http_post('http://192.168.3.21:5984/NEW_articlespg_COPY/_bulk_docs', '',
       '{"all_or_nothing":true, "docs":' || (bulk_docs) || '}', 'application/json'::text);  

I think maybe faster than a replication.


Note: On pgsql-http module install:

https://wiki.postgresql.org/wiki/Building_and_Installing_PostgreSQL_Extension_Modules

For FreeBSD you need to have curl and gettext-tools installed.

# gmake PG_CONFIG=/usr/local/bin/pg_config
cc -O2 -pipe  -fstack-protector -fno-strict-aliasing -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fPIC -DPIC -I. -I./ -I/usr/local/include/postgresql/server -I/usr/local/include/postgresql/internal -I/usr/local/include/libxml2 -I/usr/include -I/usr/local/include -I/usr/local/include  -c -o http.o http.c
http.c:89:1: warning: unused function 'header_value' [-Wunused-function]
header_value(const char* header_str, const char* header_name)
^
1 warning generated.
cc -O2 -pipe  -fstack-protector -fno-strict-aliasing -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fPIC -DPIC -I. -I./ -I/usr/local/include/postgresql/server -I/usr/local/include/postgresql/internal -I/usr/local/include/libxml2 -I/usr/include -I/usr/local/include -I/usr/local/include  -c -o stringbuffer.o stringbuffer.c
cc -O2 -pipe  -fstack-protector -fno-strict-aliasing -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fPIC -DPIC -shared -o http.so http.o stringbuffer.o -L/usr/local/lib -L/usr/local/lib -pthread -Wl,-rpath,/usr/lib:/usr/local/lib -fstack-protector -L/usr/local/lib -L/usr/lib  -L/usr/local/lib -Wl,--as-needed -Wl,-R'/usr/local/lib'  -L/usr/local/lib -lcurl



# gmake PG_CONFIG=/usr/local/bin/pg_config install
/bin/mkdir -p '/usr/local/lib/postgresql'
/bin/mkdir -p '/usr/local/share/postgresql/extension'
/bin/mkdir -p '/usr/local/share/postgresql/extension'
/usr/bin/install -c -o root -g wheel -m 755  http.so '/usr/local/lib/postgresql/http.so'
/usr/bin/install -c -o root -g wheel -m 644 http.control '/usr/local/share/postgresql/extension/'
/usr/bin/install -c -o root -g wheel -m 644 http--1.0.sql '/usr/local/share/postgresql/extension/'

Futher thoughts and ideas/questions or want to help? https://github.com/sysadminmike/couch-to-postgres/issues

More testing:

Update to all records in test articles db -

SELECT id , doc->>'read' FROM articlespg WHERE doc->>'read'='false'

couchplay=> SELECT id , doc->>'read' FROM articlespg WHERE doc->>'read'='false'
couchplay-> 
couchplay-> ;
	id        | ?column? 
------------------+----------
 _design/mw_views | false
(1 row)

Returns just the design doc.

On running:

  UPDATE articlespg 
  SET doc = json_object_set_key(doc::json, 'read'::text, true)::jsonb, from_pg=true ;

Something interesting happens with the feed and postgres - I think postgres locks the table while the update takes place as the feeder carries on querying couch but does not update postgres until the update is complete.

While the query is runing you can see the commit sequence in couch updating:

articlespg: {"db_name":"articlespg","doc_count":63838,"doc_del_count":2,"update_seq":233296,"purge_seq":0,"compact_running":false,"disk_size":2145373958,"data_size":214959726,"instance_start_time":"1418762851354294","disk_format_version":6,"committed_update_seq":233224}
articlespg: Checkpoint 192414 is current next check in: 10 seconds
PG_WATCHDOG: OK

articlespg: {"db_name":"articlespg","doc_count":63838,"doc_del_count":2,"update_seq":242301,"purge_seq":0,"compact_running":false,"disk_size":2234531964,"data_size":215440194,"instance_start_time":"1418762851354294","disk_format_version":6,"committed_update_seq":242301}
articlespg: Checkpoint 192414 is current next check in: 10 seconds
PG_WATCHDOG: OK

As soon as I get a return for the query the feed goes mad so think postgres has locked the table while the update runs. *** I need to restest this as i may have been doing this test after i introduced a bug stalling the feed on updates.

The UPDATE takes 475 seconds to return The river then takes about 3 minutes to catch up after the return So about 10 minutes to do an update on all 60k records.

I need to look at the bulk updates as i do now think it is possible to do all or nothing update and possible do in a transaction - i think if 2 updates to couch were issued and the second failed then the first would have still taken place as far as couch is concerned.
At the moment if a single PUT were to fail postgres assume no data has been updated but all of the docs up to then would have been updated - in a bulk this would not be a problem i think. Note so far not one insert or update has failed but i havent killed couch 1/2 way through.

This did give me an idea for another use for this. Populate a new couchdb from a subset of the couchdb tables in postgres by simply updating the put_function to temporarly submit updates to a different ip or db eg:

--SELECT headers FROM http_post('http://192.168.3.21:5984/' || TG_TABLE_NAME || '/' || NEW.id::text, '', NEW.doc::text, 'application/json'::text) INTO RES;    
  SELECT headers FROM http_post('http://192.168.3.21:5984/articlespg-subset' || '/' || NEW.id::text, '', NEW.doc::text, 'application/json'::text) INTO RES;    

Then re-run the update but with a WHERE

UPDATE articlespg 
SET doc = json_object_set_key(doc::json, 'read'::text, true)::jsonb, from_pg=true 
WHERE doc ->>'feedName' ='::Planet PostgreSQL::';

About 10 secs later a populated couchdb with just 761 docs matching the WHERE:

{"db_name":"articlespg-subset","doc_count":761,"doc_del_count":0,"update_seq":761,"purge_seq":0,"compact_running":false,"disk_size":6107249,"data_size":3380130,"instance_start_time":"1418770153501066","disk_format_version":6,"committed_update_seq":761}

A lot simpler that creating a design doc for a one of filtered replication. There is no reason why you couldnt do a union on two couch db tables in posgres and merge them into a new couchdb provided there are no id issues.

I have also done a quick test with excel and ms query & access and a passthrough sql query both via odbc to postgres - i can see the couch data in both - this makes ad hoc reports so simple.

Note on name - I think I like postcouch best - and i think most of the work will be done by http_post function POSTing (from postgres) to couchdb

(I think a good idea to also also give option to do PUTS)

couch-to-postgres's People

Contributors

dreamfall avatar rusllonrails avatar sysadminmike avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

couch-to-postgres's Issues

attachments - issue to hold info on them

I dont think works with _attachments - or is ignoring them - as they are in couch and I think postgres is more use manipulating/generating reports/ad hoc queries on the data rather than dealing with attahments.

Not done any tests with them yet

sort out couch_put - replace with couch_post

Function needs to deal with status code returns from http_post
Also use POST not PUT requests so no need to edit pgsql-http extension before compilation.

ie implement:

post_docs(docs,chunk_size) - returning recordset of status codes? or just true/false?

Postgres disconnects after inserting into couchdb (http_put)

I've installed couch-to-postgres and the http extension for postgres. When I insert from Postgres to Couchdb, postgres disconnects, however in Couchdb data has been succesfully inserted.

This is the change in the trigger function couchdb_put() I made, because with POST it never worked for me:

SELECT status FROM http_put('http://127.0.0.1:5984/' || TG_TABLE_NAME || '/' || NEW.id::text, NEW.doc::text, 'Content-Type:application/json'::text) INTO RES;

This is the test:
INSERT INTO example (id, doc, from_pg) VALUES ('i', json_object('{_id,myvar}','{i, 100}')::jsonb, true);

This is the message I get from wireshark:
PUT /example/i HTTP/1.1\r\n

this is the message I get from the couchdb log:
Sun, 04 Oct 2015 16:29:33 GMT] [info] [<0.439.0>] 127.0.0.1 - - PUT /example/i 201

and this is the postgres log:

2015-10-04 10:33:35 CST [1727-3] DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally an
d possibly corrupted shared memory.
2015-10-04 10:33:35 CST [1727-4] HINT: In a moment you should be able to reconnect to the database and repeat your command.
2015-10-04 10:33:35 CST [709-20] LOG: all server processes terminated; reinitializing
2015-10-04 10:33:35 CST [1770-1] LOG: database system was interrupted; last known up at 2015-10-04 10:29:34 CST
2015-10-04 10:33:35 CST [1770-2] LOG: database system was not properly shut down; automatic recovery in progress
2015-10-04 10:33:35 CST [1770-3] LOG: record with zero length at 0/18146D0
2015-10-04 10:33:35 CST [1770-4] LOG: redo is not required
2015-10-04 10:33:35 CST [1770-5] LOG: MultiXact member wraparound protections are now enabled

Versions:
postgresql-9.4
couchdb 1.6.1
pgsql-http 1.1

I would appreciate any help from you, thank you very much in advance!!

G.Carranza

example.sql

Need to collect up current db schema as its currently on my laptop/head/readme

Hovercraft / pl/Erlang / pl/sh - a place for any ideas on this

Replace put function with https://github.com/jchris/hovercraft - needs erlang extension For postgres like PL/Perl - any one know if this exists as i think it would be quite simple to embed hovercraft in postgres then and should then be possible to do proper transactions and do very large updates (i havnt tried more than a few dozen docs at the moment) - oif no pl/erlang then perhaps using pl/sh - https://github.com/petere/plsh

cat ~/.bashrc | erl -noshell -s rot13 rot13 | wc

http://www.erlang.org/faq/how_do_i.html

and do something like:
http://www.softwarepassion.com/importing-data-to-couchdb-java-ruby-and-erlang-way/

Maybe have 2 options so one for when postgres and couch are on the same machine and can communicate via pipes (for bulk updates I am sure this will be the fastest method without PL/Erlang plus less bits to go wrong and maybe with exit codes from the shell transactions could be possible) and another version for calls over http.

add _add + friends feed to api

_add
_enable
_disable
_remove
All need to accept json as get/post request and keep it couchy

_feeds_status - list all feeds - current /_status does this - add more info about feed
_status - change to give status about daemon + postgres connection + watchdog + pg_watchdog and any other global info

daemon - add some kind of stats collection

we have postgres or couch to dump some statistics about the feeds to

eg inserts/updates/deletes a sec / min / hr

should be pretty easy to pump to couch and use elastic search + kibrana for some pretty graphs

possible issue with couch restarting

I think the daemon may stop following a _changes feed but think all is ok if couch crashes and is restarted in between a watchdog check.

Still testing and not sure if its the daemon part or library - could possible be the npm 'follow' but not sure.

A restart of the daemon is all that is needed to fix this but can leave postgres out of sync with couch

SQL DELETE - document functionality

Do with bulk updates setting doc._deleted flag to true

Should be straight forward to do once bulk function in place - as this can be then done with

json_object_set_key(doc::json, '_deleted'::text, true):

Make a function to help which just gets passed the doc id and rev plus array of fields to keep in the deleted doc so not upset anyone with elastic search couch river (https://github.com/elasticsearch/elasticsearch-river-couchdb - Indexing Databases with Multiple Types)

ie bulk submit needs to be something like:
"docs" : [{
"_id: 2,
"_rev" : "rev",
"_deleted" : true,
"type" : "Person"
}]'

Change logic for from_pg to from_couch

Need to modify the sql in the library and trigger.

Update readme accordingly

I dont think its possible not to do this without somehow identifying to the trigger where the query came from

works on workstation, not working on server

I worked through getting this to work on my desktop with PG and couch on our server.
When I copy the folder( couch-to-Postgres) to my server (same server as pg and couch) it will not sync.

I get this on the server. there is blue text that starts with "follow:" This text was green when I first ran it.


follow:stream http://administrator:[email protected]:5984/tcsoffice:debug } +9ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug JSON: '{"seq":"25-g1AAAACTeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUklMiTV____PyuDOZEhFyjAbmxqaGiRYpzCwFmal5KalpmXmoJHex4LkGRoAFL_oaZIQkyxNDNISjbBpi8LAJ9ZLRw","id":"wo::1010","changes":[{"rev":"2-779eb03dff0f507cbedf9dd44b5c1a24"}],"deleted":true,"doc":{"_id":"wo::1010","_rev":"2-779eb03dff0f507cbedf9dd44b5c1a24","_deleted":true}}' +19ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug Object: {
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug seq: '25-g1AAAACTeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUklMiTV____PyuDOZEhFyjAbmxqaGiRYpzCwFmal5KalpmXmoJHex4LkGRoAFL_oaZIQkyxNDNISjbBpi8LAJ9ZLRw',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug id: 'wo::1010',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug changes: [ { rev: '2-779eb03dff0f507cbedf9dd44b5c1a24' } ],
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug deleted: true,
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug doc: {
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug _id: 'wo::1010',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug _rev: '2-779eb03dff0f507cbedf9dd44b5c1a24',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug _deleted: true
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug }
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug } +3ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug JSON: '{"seq":"27-g1AAAACTeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUklMiTV____PyuDOZEhFyjAbmxqaGiRYpzCwFmal5KalpmXmoJHex4LkGRoAFL_oaZIQ0yxNDNISjbBpi8LAJ-dLR4","id":"wo::12365","changes":[{"rev":"2-e0d0a9a121da8776801f59ccea7cc691"}],"deleted":true,"doc":{"_id":"wo::12365","_rev":"2-e0d0a9a121da8776801f59ccea7cc691","_deleted":true}}' +6ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug Object: {
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug seq: '27-g1AAAACTeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUklMiTV____PyuDOZEhFyjAbmxqaGiRYpzCwFmal5KalpmXmoJHex4LkGRoAFL_oaZIQ0yxNDNISjbBpi8LAJ-dLR4',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug id: 'wo::12365',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug changes: [ { rev: '2-e0d0a9a121da8776801f59ccea7cc691' } ],
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug deleted: true,
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug doc: {
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug _id: 'wo::12365',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug _rev: '2-e0d0a9a121da8776801f59ccea7cc691',
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug _deleted: true
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug }
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug } +13ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug emit: data +3ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Data from 2021-03-26T20:33:11.106Z +2ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Req 2021-03-26T20:33:11.106Z timeout=37500, inactivity=30000: http://192.168.0.12:5984/tcsoffice +11ms
example: Starting checkpointer
example: Checkpoint 1 is current next check in: 120 seconds
follow:http://administrator:[email protected]:5984/tcsoffice:debug Req 2021-03-26T20:33:11.106Z made no changes for 30.002s +30s
follow:http://administrator:[email protected]:5984/tcsoffice:debug Stop +1ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Destroying req 2021-03-26T20:33:11.106Z +4ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug destroy +1ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Checking database: http://192.168.0.12:5984/tcsoffice +4ms
follow:stream http://administrator:[email protected]:5984/tcsoffice:debug write: { data: '', buf: '' } +8ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Confirmed database: http://192.168.0.12:5984/tcsoffice +7ms
example: {"db_name":"tcsoffice","purge_seq":"0-g1AAAABXeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUnlsQBJhgYg9R8IshIZ8KhNZEiqhyjKAgBm5Rxs","update_seq":"151-g1AAAABXeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUnlsQBJhgYg9R8IshL78KhNZEiqByvizAIAfJkdAw","sizes":{"file":467308,"external":95,"active":25024},"props":{"partitioned":true},"doc_del_count":39,"doc_count":1,"disk_format_version":8,"compact_running":false,"cluster":{"q":2,"n":1,"w":1,"r":1},"instance_start_time":"0"}
follow:http://administrator:[email protected]:5984/tcsoffice:debug Feed query 2021-03-26T20:33:41.278Z: http://192.168.0.12:5984/tcsoffice/_changes?since=11-g1AAAACTeJzLYWBgYMpgTmEQTM4vTc5ISXLIyU9OzMnILy7JAUklMiTV____PyuDOZEhFyjAbmxqaGiRYpzCwFmal5KalpmXmoJHex4LkGRoAFL_oaZwQ0yxNDNISjbBpi8LAJ19LQ4&feed=continuous&heartbeat=30000&include_docs=true +2ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Remove feed from agent pool: 2021-03-26T20:33:41.278Z +9ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Good response: 2021-03-26T20:33:41.278Z +1ms
follow:http://administrator:[email protected]:5984/tcsoffice:debug Req 2021

daemon wakeup idea

When no changes for a while it can take a while before the daemon notices the next change

Perhaps add a field in checkpoint_settings - when sending stuff to couch check after send

  • if time is < inactivity time send NOTIFY to daemon to wake up and check for the new changes.
  • make inactivity time unique per db and store in checkpoint settings

example use sql

Do a wiki/readme page of example sql queries eg:

SELECT DISTINCT doc->>'type' as doctype, count(doc->>'type')
FROM mytable GROUP BY doctype ORDER BY doctype

And any things which might bite like the ORDER BY issue

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.