Giter Site home page Giter Site logo

neo4j-contrib / neo4j-streams Goto Github PK

View Code? Open in Web Editor NEW
167.0 26.0 68.0 3.64 MB

Neo4j Kafka Connector

Home Page: https://neo4j.com/docs/kafka

License: Apache License 2.0

Kotlin 100.00%
neo4j kafka kafka-producer cdc change-data-capture graph-database stream-processing hacktoberfest

neo4j-streams's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

neo4j-streams's Issues

Pool of Coroutines

Create a configuration that allow to use a pool of coroutines in the Consumer for the daemon thread

Define the configurations and the conventions

The convention could be: match the file name ("labels.new.cypher") with the topic name ("labels.new").
The configuration specifies something particular and the connection parameters.
Define where put the configuration properties

Manage custom ids

Patterns should allow defining custom id. Example:

Label1{!myCustomPropertyId}

For unique keys:

Label1{!(myCustomNodeKeyA,myCustomNodeKeyB)}

Performance Testing

Test with a real Kafka setup and inserting 1bn nodes and relationship into neo4j -> kafka
and also for the consumer 1bn events in kafka -> nodes and rels in neo4j

One nice real world test could be consuming the RSVP API endpoint for meetup.com
or the twitter streams API

we can also just have a simple (concurrent) create/update/delete workload that goes to kafka
and then on the other side uses the kafka sink to recreate the graph and then we compare the graphs that they are the same

In Producer where is "Connector" class

Compiled this project using maven
In order to use "Producer" to publish "neo4j" database changes to a kafka topic which class should I use as "Connector", consider following config properties:

name=neo4jMessageProducer
topic=neo4j
tasks.max=1
connector.class= "connector class is required here"

connector.class property needs a connector class and I didnt found such class in this project.

Add auditing module

Configurable with the following features:

  • created:propName
  • updated:propName
  • createdBy:propName
  • updatedBy:propName
  • created(At)
  • updated(At)
  • optional user
  • optional on relationships

Test execution fails - neo4j database unavailable

When building this project, it fails in eclipse - embedded (Mac OS) - not sure if this is a local environment issue as I am also having odd maven-clean execution errors on this project in Eclipse

Exception in thread "Thread-8" org.neo4j.graphdb.TransactionTerminatedException: The transaction has been terminated. Retry your operation in a new transaction, and you should see a successful result. The database is not currently available to serve your request, refer to the database logs for more details. Retrying your request at a later time may succeed.
at org.neo4j.kernel.impl.api.KernelStatement.assertOpen(KernelStatement.java:213)
at org.neo4j.kernel.impl.api.LockingStatementOperations.schemaStateGetOrCreate(LockingStatementOperations.java:142)
at org.neo4j.kernel.impl.api.OperationsFacade.schemaStateGetOrCreate(OperationsFacade.java:818)
at org.neo4j.cypher.internal.spi.v3_3.TransactionBoundPlanContext.getOrCreateFromSchemaState(TransactionBoundPlanContext.scala:121)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext$$anonfun$getOrCreateFromSchemaState$1.apply(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslationSupport$class.translateException(ExceptionTranslationSupport.scala:32)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.translateException(ExceptionTranslatingPlanContext.scala:27)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.getOrCreateFromSchemaState(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$class.org$neo4j$cypher$internal$compatibility$v3_3$Compatibility$$provideCache(Compatibility.scala:156)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:122)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:107)
at org.neo4j.cypher.internal.compatibility.v3_3.exceptionHandler$runSafely$.apply(exceptionHandler.scala:90)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2.plan(Compatibility.scala:107)
at org.neo4j.cypher.internal.ExecutionEngine.org$neo4j$cypher$internal$ExecutionEngine$$producePlan$1(ExecutionEngine.scala:180)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1$$anonfun$apply$1.apply(CacheAccessor.scala:38)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor$$anonfun$1.apply(CacheAccessor.scala:64)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache$$anon$1.apply(LFUCache.scala:31)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$16(BoundedLocalCache.java:1973)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:1971)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:1954)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:113)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.getOrElseUpdate(LFUCache.scala:30)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.apply(LFUCache.scala:48)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor.getOrElseUpdate(CacheAccessor.scala:62)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1.apply(CacheAccessor.scala:36)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:162)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache.getOrElseUpdate(CacheAccessor.scala:53)
at org.neo4j.cypher.internal.ExecutionEngine.liftedTree1$1(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine.planQuery(ExecutionEngine.scala:168)
at org.neo4j.cypher.internal.ExecutionEngine.execute(ExecutionEngine.scala:116)
at org.neo4j.cypher.internal.javacompat.ExecutionEngine.executeQuery(ExecutionEngine.java:62)
at org.neo4j.kernel.impl.factory.ClassicCoreSPI.executeQuery(ClassicCoreSPI.java:80)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:451)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:434)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:418)
at kafka.KafkaTest$createNodes$1.run(KafkaTest.kt:48)
at java.lang.Thread.run(Thread.java:748)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 27.013 s <<< FAILURE! - in kafka.KafkaTest
[ERROR] createNodes(kafka.KafkaTest) Time elapsed: 13.553 s <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<0>
at kafka.KafkaTest.createNodes(KafkaTest.kt:52)
assertEquals(1, records.count())

Produce events "from scratch"

The Streams Producer should have a procedure streams.fromScratch that enables the huge loading of all nodes and relationships into the Kafka topics before tracing each new transactions

improve documentation

  • motivation
  • how to give feedback / issues
  • setup / testing (docker compose)
  • point to release for usage

Splitting into Producer and Consumer

The whole project has at least 2 components

  • Producer: to send events into kafka topics, when a transaction commits
  • Consumer: to get the events for the kafka topic and write them into neo4j db

Tests don't seem to run

The tests do not seem to execute - not familiar with kotlin tests, but the src/test/kotlin does not seem to be in the testCompile path - executing mvn test -X skips the tests (nothing to test)

Write our own logical transaction log

The transaction handler should be modular, so you can configure more operations when a transaction event is performed. One of these is the file module that:

  • write information to rotated logfile
  • single line json or avro+thrift (whatever kafka uses) for each change-event
  • transactions_from-ts-to-ts-from-txid-to-txid.json / or binary ->
  • expose those logical-tx-logfiles via HTTP API (Atom-Feed or REST) and procedures as event-streams

https://github.com/neo4j/neo4j/blob/3.5/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java
Check if those are actually just physical (record changes) or logical entries (like the tx-event handler)

Define the Producer Event Schema

A small example:

{
  "type" : "record",
  "namespace" : "neo4j.cdc.events",
  "name" : "data_change",
 
  "doc:" : "Debezium information of a data change operation over the graph",

  "fields" : [ {
    "name" : "payload",
    "doc"  : "Tha data changed",
    "type" : {
        "type": "record",
        "name": "payload",
        "fields" : [ {
            "name": "username",
            "type": "string",
            "doc"  : "The user who did the operation"
        },{
            "name": "timestamp",
            "type": "long",
            "doc"  : "The committing time"
        },{
            "name": "tx_id",
            "type": "long",
            "doc"  : "The transaction identifier"
        } ,
        {
            "name": "tx_event_id",
            "type": "long",
            "doc"  : "The identifier of the event inside the transaction"
        } ,
        {
            "name": "tx_events_count",
            "type": "long",
            "doc"  : "The number of events that compose the transaction"
        } ,
        //TODO here the metadata map
        {
            "name": "operation",
            "type": "string", //change to enum
            "doc"  : "The kind of data change"
            //"symbols": ["create","update","delete"]
        },
        {
            "name": "source",
            "type": {
                "name": "source",
                "type": "record",
                "fields": [{
                    "name": "hostname",
                    "type": "string"
                }
                ]
            }
        },
        {
            "name": "before",
            "doc"  : "The data before the change",
            "type": {
                "type": "record",
                "name": "node",
                "fields" : [ {
                    "name": "type",
                    "type": "string", //TODO to enum
                    "doc"  : "Node or relatioship"                    
                },{
                    "name": "labels",
                    "doc"  : "Labels of node",
                    "type": {
                        "type": "array",                     
                        "items": "string"
                    }

                }, {
                    "name": "properties",
                    "doc": "The properties of the node",
                    "type": {"type": "map", "values": "string"} //TODO is a limitation?
                }
                ]
            }                        
        },//before
        {
            "name": "after",
            "doc": "The data after the operation",
            "type": "node"
        }
        ]//payload fields
    }//type
  },{
    "name" : "schema",
    "doc"  : "How the data are stored",
    "type" : {
        "name": "schemaNode",
        "doc": "constraints of node",
        "type": "record",
        "fields": [{
            "name": "node",
            "type": {
                "name": "nodeSchema",
                "type": "record",
                "fields": [{
                    "name": "primaryKeys",
                    "type": {
                        "name": "primaryKeysDef",
                        "type": "record",
                        "fields": [{
                            "name": "label",
                            "type": "string"
                        },{
                            "name": "properties",
                            "type": {
                                "type": "array",                     
                                "items": "string"
                            }
                        }]
                    }
                },{
                    "name": "properties",
                    "type": {"type": "map", "values": "string"}                 }]
            }            
        }
        ]
    }   
  }
  ]
 
}


Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.