The RMLStreamer generates RDF from files or data streams using RML. The difference with other RML implementations is that it can handle big input files and continuous data streams, like sensor data.
Documentation regarding the use of (custom) functions can be found here.
If you want to get the RMLStreamer up and running within 5 minutes using Docker, check out docker/README.md
If you want to deploy it yourself, read on.
RMLStreamer runs its jobs on Flink clusters. More information on how to install Flink and getting started can be found here. At least a local cluster must be running in order to start executing RML Mappings with RMLStreamer. Please note that this version works with Flink 1.10.0 with Scala 2.11 support, which can be downloaded here.
In order to build a jar file that can be deployed on a Flink cluster, you need:
- a Java JDK 8 or higher
- Apache Maven 3 or higher
Clone or download and then build the code in this repository:
$ git clone https://github.com/RMLio/RMLStreamer.git
$ cd RMLStreamer
and then run:
$ mvn -DskipTests clean package
-DskipTests
just builds and packages without running tests. If you want to run the tests, just omit this parameter.
clean
cleans any cached builds before packaging. While not strictly necessary, it is considered good practice to do
so.
The resulting RMLStreamer-<version>.jar
, found in the target
folder, can be deployed on a Flink cluster.
Here we give examples for running RMLStreamer from the command line. We use FLINK_BIN
to denote the Flink CLI tool,
usually found in the bin
directory of the Flink installation. E.g. /home/myuser/flink-1.10.0/bin/flink
.
For Windows a flink.bat
script is provided.
The general usage is:
$ FLINK_BIN run [Flink options] -c io.rml.framework.Main <path to RMLStreamer jar> [toFile|toKafka|toTCPSocket] [options]
FLINK HOME
| The path to the provided Flink CLI script.
Flink options | Options to the Flink run script. Example: -p 4
sets the parallelism
to 4.
-c io.rml.framework.Main
| This is the application class of RMLStreamer.
Path to RMLStreamer jar | The absolute path to the RMLStreamer jar file.
RMLStreamer options | The actual program arguments for RMLStreamer. See below for a full list.
# write output to file(s)
$FLINK_BIN run <path to RMLStreamer jar> toFile --mapping-file <path to mapping file> --output-path <path to output file>
# write output to a listening socket (only if logical source(s) are streams)
$FLINK_BIN run <path to RMLStreamer jar> toTCPSocket --output-socket <host:port>
# write output to kafka topic (only if logical source(s) are streams)
$FLINK_BIN run <path to RMLStreamer jar> toKafka --broker-list <host:port> --topic <topic name>
Usage: RMLStreamer [toFile|toKafka|toTCPSocket] [options]
-j, --job-name <job name>
The name to assign to the job on the Flink cluster. Put some semantics in here ;)
-i, --base-iri <base IRI>
The base IRI as defined in the R2RML spec.
--disable-local-parallel
By default input records are spread over the available task slots within a task manager to optimise parallel processing,at the cost of losing the order of the records throughout the process. This option disables this behaviour to guarantee that the output order is the same as the input order.
-m, --mapping-file <RML mapping file>
REQUIRED. The path to an RML mapping file. The path must be accessible on the Flink cluster.
--json-ld Write the output as JSON-LD instead of N-Quads. An object contains all RDF generated from one input record. Note: this is slower than using the default N-Quads format.
--bulk Write all triples generated from one input record at once.
--checkpoint-interval <time (ms)>
If given, Flink's checkpointing is enabled with the given interval. If not given, checkpointing is disabled.
Command: toFile [options]
Write output to file
-o, --output-path <output file>
The path to an output file.
Command: toKafka [options]
Write output to a Kafka topic
-b, --broker-list <host:port>[,<host:port>]...
A comma separated list of Kafka brokers.
-t, --topic <topic name>
The name of the Kafka topic to write output to.
--partition-id <id> EXPERIMENTAL. The partition id of kafka topic to which the output will be written to.
Command: toTCPSocket [options]
Write output to a TCP socket
-s, --output-socket <host:port>
The TCP socket to write to.
An example of how to define the generation of an RDF stream from a stream in an RML Mapping via TCP.
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:type "PULL" ;
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
];
rr:subjectMap [
rml:reference "$.id";
rr:termType rr:IRI;
rr:class skos:Concept
];
rr:predicateObjectMap [
rr:predicateMap [
rr:constant dcterms:title;
rr:termType rr:IRI
];
rr:objectMap [
rml:reference "$.id";
rr:termType rr:Literal
]
].
The RML Mapping above can be executed as follows:
The input and output in the RML Framework are both TCP clients when streaming. Before running stream mappings the input and output ports must be listened to by an application. For testing purposes the following commands can be used:
$ nc -lk 5005 # This will start listening for input connections at port 5005
$ nc -lk 9000 # This will start listening for output connections at port 9000
# This is for testing purposes, your own application needs to start listening to the input and output ports.
Once the input and output ports are listened to by applications or by the above commands, the RML Mapping can be executed. RMLStreamer will open the input and output sockets so it can act upon data that will be written to the input socket.
$FLINK_BIN run <path to RMLStreamer jar> toTCPSocket -s localhost:9000 -m .../framework/src/main/resources/json_stream_data_mapping.ttl
# The -m paramater sets the mapping file location
# The -s parameter sets the output socket port number
Whenever data is written (every data object needs to end with \n
) to the socket, this data will be processed by the RML Framework.
An example of how to define the generation of an RDF stream from a stream in an RML Mapping via Kafka.
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source [
rdf:type rmls:KafkaStream ;
rmls:broker "broker" ;
rmls:groupId "groupId";
rmls:topic "topic";
];
rml:referenceFormulation ql:JSONPath;
];
Note on using Kafka with Flink: As a consumer, the Flink Kafka client never subscribes to a topic, but it is
assigned to a topic/partition (even if you declare it to be in a consumer group with the rmls:groupId
predicate). This means that it doesn't do
anything with the concept "consumer group", except for committing offsets. This means that load is not spread across
RMLStreamer jobs running in the same consumer group. Instead, each RMLStreamer job is assigned a partition.
This has some consequences:
- When you add multiple RMLStreamer jobs in a consumer group, and the topic it listens to has one partition, only one instance will get the input.
- If there are multiple partitions in the topic and multiple RMLStreamer jobs, it could be that two (or more) jobs are assigned a certain partition, resulting in duplicate output.
See also https://stackoverflow.com/questions/38639019/flink-kafka-consumer-groupid-not-working .
The only option for spreading load is to use multiple topics, and assign one RMLStreamer job to one topic.
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source [
rdf:type rmls:FileStream;
rmls:path "/home/wmaroy/github/rml-framework/akka-pipeline/src/main/resources/io/rml/framework/data/books.json"
];
rml:referenceFormulation ql:JSONPath;
rml:iterator "$.store.books[*]"
];
rr:subjectMap [
rr:template "{$.id}" ;
rr:termType rr:IRI;
rr:class skos:Concept
];
rr:predicateObjectMap [
rr:predicateMap [
rr:constant dcterms:title;
rr:termType rr:IRI
];
rr:objectMap [
rml:reference "$.id";
rr:termType rr:Literal
]
].
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source "/home/wmaroy/github/rml-framework/akka-pipeline/src/main/resources/io/rml/framework/data/books_small.json";
rml:referenceFormulation ql:JSONPath;
rml:iterator "$.store.books"
];
rr:subjectMap [
rml:reference "id";
rr:termType rr:IRI;
rr:class skos:Concept
];
rr:predicateObjectMap [
rr:predicateMap [
rr:constant dcterms:title;
rr:termType rr:IRI
];
rr:objectMap [
rml:reference "id";
rr:termType rr:Literal
]
] .
Namespace: http://semweb.mmlab.be/ns/rmls#
The RML vocabulary have been extended with rmls to support streaming logical sources. The following are the classes/terms currently used:
-
rmls:[stream type]
- rmls:TCPSocketStream specifies that the logical source will be a tcp socket stream.
- rmls:FileStream specifies that the logical source will be a file stream (to be implemented).
- rmls:KafkaStream specifies that the logical source will be a kafka stream.
-
rmls:hostName specifies the desired host name of the server, from where data will be streamed from.
-
rmls:port specifies a port number for the stream mapper to connect to.
-
rmls:type specifies how a streamer will act:
- "PULL":
The stream mapper will act as a client.
It will create a socket and connect to the specified port at the given host name.
rmls:port and rmls:hostName needs to be specified. - "PUSH":
The stream mapper will act as a server and will start listening at the given port.
If the given port is taken, the mapper will keep opening subsequent ports until a free port is found.
Only rmls:port needs to be specified here.
- "PULL":
Example of a valid json logical source map using all possible terms:
rml:logicalSource [
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:type "PULL" ;
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
];