neo4j-contrib / neo4j-streams Goto Github PK
View Code? Open in Web Editor NEWNeo4j Kafka Connector
Home Page: https://neo4j.com/docs/kafka
License: Apache License 2.0
Neo4j Kafka Connector
Home Page: https://neo4j.com/docs/kafka
License: Apache License 2.0
Create a procedure:
call streams.register(topic, statement, {config})
That allow to register topics into graph properties in real-time
neo4j-stream or neo4j-streaming
Create a configuration that allow to use a pool of coroutines in the Consumer for the daemon thread
The convention could be: match the file name ("labels.new.cypher") with the topic name ("labels.new").
The configuration specifies something particular and the connection parameters.
Define where put the configuration properties
to allow run-time configuration changes
add an "out of the box" mechanism that uses some config/schema/convention for 1:1 mapping nodes and rels
setup kafka locally
Patterns should allow defining custom id. Example:
Label1{!myCustomPropertyId}
For unique keys:
Label1{!(myCustomNodeKeyA,myCustomNodeKeyB)}
In order to be used in a cluster environment the consumer.pool
must happen only in the leader.
It must manage also leader change
Use the neo4j.conf
in order to store the consumer templates
Test with a real Kafka setup and inserting 1bn nodes and relationship into neo4j -> kafka
and also for the consumer 1bn events in kafka -> nodes and rels in neo4j
One nice real world test could be consuming the RSVP API endpoint for meetup.com
or the twitter streams API
we can also just have a simple (concurrent) create/update/delete workload that goes to kafka
and then on the other side uses the kafka sink to recreate the graph and then we compare the graphs that they are the same
To get the data from the very beginning of the database
Compiled this project using maven
In order to use "Producer" to publish "neo4j" database changes to a kafka topic which class should I use as "Connector", consider following config properties:
name=neo4jMessageProducer
topic=neo4j
tasks.max=1
connector.class= "connector class is required here"
connector.class property needs a connector class and I didnt found such class in this project.
Configurable with the following features:
created:propName
updated:propName
createdBy:propName
updatedBy:propName
created(At)
updated(At)
When building this project, it fails in eclipse - embedded (Mac OS) - not sure if this is a local environment issue as I am also having odd maven-clean execution errors on this project in Eclipse
Exception in thread "Thread-8" org.neo4j.graphdb.TransactionTerminatedException: The transaction has been terminated. Retry your operation in a new transaction, and you should see a successful result. The database is not currently available to serve your request, refer to the database logs for more details. Retrying your request at a later time may succeed.
at org.neo4j.kernel.impl.api.KernelStatement.assertOpen(KernelStatement.java:213)
at org.neo4j.kernel.impl.api.LockingStatementOperations.schemaStateGetOrCreate(LockingStatementOperations.java:142)
at org.neo4j.kernel.impl.api.OperationsFacade.schemaStateGetOrCreate(OperationsFacade.java:818)
at org.neo4j.cypher.internal.spi.v3_3.TransactionBoundPlanContext.getOrCreateFromSchemaState(TransactionBoundPlanContext.scala:121)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext$$anonfun$getOrCreateFromSchemaState$1.apply(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslationSupport$class.translateException(ExceptionTranslationSupport.scala:32)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.translateException(ExceptionTranslatingPlanContext.scala:27)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.getOrCreateFromSchemaState(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$class.org$neo4j$cypher$internal$compatibility$v3_3$Compatibility$$provideCache(Compatibility.scala:156)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:122)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:107)
at org.neo4j.cypher.internal.compatibility.v3_3.exceptionHandler$runSafely$.apply(exceptionHandler.scala:90)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2.plan(Compatibility.scala:107)
at org.neo4j.cypher.internal.ExecutionEngine.org$neo4j$cypher$internal$ExecutionEngine$$producePlan$1(ExecutionEngine.scala:180)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1$$anonfun$apply$1.apply(CacheAccessor.scala:38)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor$$anonfun$1.apply(CacheAccessor.scala:64)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache$$anon$1.apply(LFUCache.scala:31)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$16(BoundedLocalCache.java:1973)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:1971)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:1954)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:113)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.getOrElseUpdate(LFUCache.scala:30)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.apply(LFUCache.scala:48)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor.getOrElseUpdate(CacheAccessor.scala:62)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1.apply(CacheAccessor.scala:36)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:162)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache.getOrElseUpdate(CacheAccessor.scala:53)
at org.neo4j.cypher.internal.ExecutionEngine.liftedTree1$1(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine.planQuery(ExecutionEngine.scala:168)
at org.neo4j.cypher.internal.ExecutionEngine.execute(ExecutionEngine.scala:116)
at org.neo4j.cypher.internal.javacompat.ExecutionEngine.executeQuery(ExecutionEngine.java:62)
at org.neo4j.kernel.impl.factory.ClassicCoreSPI.executeQuery(ClassicCoreSPI.java:80)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:451)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:434)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:418)
at kafka.KafkaTest$createNodes$1.run(KafkaTest.kt:48)
at java.lang.Thread.run(Thread.java:748)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 27.013 s <<< FAILURE! - in kafka.KafkaTest
[ERROR] createNodes(kafka.KafkaTest) Time elapsed: 13.553 s <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<0>
at kafka.KafkaTest.createNodes(KafkaTest.kt:52)
assertEquals(1, records.count())
The template must inject an event
param:
UNWIND {batch} AS event
...
In order to test the system with different Kafka versions
In order to share common functions between consumer and producer
The Streams Producer should have a procedure streams.fromScratch
that enables the huge loading of all nodes and relationships into the Kafka topics before tracing each new transactions
Integration tests using Docker like ETL
Is a neo4j internal component or an external process?
The whole project has at least 2 components
The tests do not seem to execute - not familiar with kotlin tests, but the src/test/kotlin does not seem to be in the testCompile path - executing mvn test -X skips the tests (nothing to test)
The transaction handler should be modular, so you can configure more operations when a transaction event is performed. One of these is the file module that:
https://github.com/neo4j/neo4j/blob/3.5/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java
Check if those are actually just physical (record changes) or logical entries (like the tx-event handler)
A small example:
{
"type" : "record",
"namespace" : "neo4j.cdc.events",
"name" : "data_change",
"doc:" : "Debezium information of a data change operation over the graph",
"fields" : [ {
"name" : "payload",
"doc" : "Tha data changed",
"type" : {
"type": "record",
"name": "payload",
"fields" : [ {
"name": "username",
"type": "string",
"doc" : "The user who did the operation"
},{
"name": "timestamp",
"type": "long",
"doc" : "The committing time"
},{
"name": "tx_id",
"type": "long",
"doc" : "The transaction identifier"
} ,
{
"name": "tx_event_id",
"type": "long",
"doc" : "The identifier of the event inside the transaction"
} ,
{
"name": "tx_events_count",
"type": "long",
"doc" : "The number of events that compose the transaction"
} ,
//TODO here the metadata map
{
"name": "operation",
"type": "string", //change to enum
"doc" : "The kind of data change"
//"symbols": ["create","update","delete"]
},
{
"name": "source",
"type": {
"name": "source",
"type": "record",
"fields": [{
"name": "hostname",
"type": "string"
}
]
}
},
{
"name": "before",
"doc" : "The data before the change",
"type": {
"type": "record",
"name": "node",
"fields" : [ {
"name": "type",
"type": "string", //TODO to enum
"doc" : "Node or relatioship"
},{
"name": "labels",
"doc" : "Labels of node",
"type": {
"type": "array",
"items": "string"
}
}, {
"name": "properties",
"doc": "The properties of the node",
"type": {"type": "map", "values": "string"} //TODO is a limitation?
}
]
}
},//before
{
"name": "after",
"doc": "The data after the operation",
"type": "node"
}
]//payload fields
}//type
},{
"name" : "schema",
"doc" : "How the data are stored",
"type" : {
"name": "schemaNode",
"doc": "constraints of node",
"type": "record",
"fields": [{
"name": "node",
"type": {
"name": "nodeSchema",
"type": "record",
"fields": [{
"name": "primaryKeys",
"type": {
"name": "primaryKeysDef",
"type": "record",
"fields": [{
"name": "label",
"type": "string"
},{
"name": "properties",
"type": {
"type": "array",
"items": "string"
}
}]
}
},{
"name": "properties",
"type": {"type": "map", "values": "string"} }]
}
}
]
}
}
]
}
trace stats (nodes /rels/props) created via log.debug in
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.