Giter Site home page Giter Site logo

juanitorius / rmlstreamer Goto Github PK

View Code? Open in Web Editor NEW

This project forked from rmlio/rmlstreamer

0.0 1.0 0.0 2.49 MB

The RMLStreamer executes RML rules to generate high quality Linked Data from multiple originally (semi-)structured data sources in a streaming way.

Home Page: http://rml.io/

License: MIT License

Scala 100.00%

rmlstreamer's Introduction

RMLStreamer

DOI

The RMLStreamer generates RDF from files or data streams using RML. The difference with other RML implementations is that it can handle big input files and continuous data streams, like sensor data.

Documentation regarding the use of (custom) functions can be found here.

Quick start

If you want to get the RMLStreamer up and running within 5 minutes using Docker, check out docker/README.md

If you want to deploy it yourself, read on.

Installing Flink

RMLStreamer runs its jobs on Flink clusters. More information on how to install Flink and getting started can be found here. At least a local cluster must be running in order to start executing RML Mappings with RMLStreamer. Please note that this version works with Flink 1.10.0 with Scala 2.11 support, which can be downloaded here.

Building RMLStreamer

In order to build a jar file that can be deployed on a Flink cluster, you need:

  • a Java JDK 8 or higher
  • Apache Maven 3 or higher

Clone or download and then build the code in this repository:

$ git clone https://github.com/RMLio/RMLStreamer.git 
$ cd RMLStreamer

and then run:

$ mvn -DskipTests clean package

-DskipTests just builds and packages without running tests. If you want to run the tests, just omit this parameter.

clean cleans any cached builds before packaging. While not strictly necessary, it is considered good practice to do so.

The resulting RMLStreamer-<version>.jar, found in the target folder, can be deployed on a Flink cluster.

Executing RML Mappings

Here we give examples for running RMLStreamer from the command line. We use FLINK_BIN to denote the Flink CLI tool, usually found in the bin directory of the Flink installation. E.g. /home/myuser/flink-1.10.0/bin/flink. For Windows a flink.bat script is provided.

The general usage is:

$ FLINK_BIN run [Flink options] -c io.rml.framework.Main <path to RMLStreamer jar> [toFile|toKafka|toTCPSocket] [options]

FLINK HOME | The path to the provided Flink CLI script. Flink options | Options to the Flink run script. Example: -p 4 sets the parallelism to 4. -c io.rml.framework.Main | This is the application class of RMLStreamer. Path to RMLStreamer jar | The absolute path to the RMLStreamer jar file. RMLStreamer options | The actual program arguments for RMLStreamer. See below for a full list.

Basic commands:

# write output to file(s)
$FLINK_BIN run <path to RMLStreamer jar> toFile --mapping-file <path to mapping file> --output-path <path to output file>  

# write output to a listening socket (only if logical source(s) are streams)
$FLINK_BIN run <path to RMLStreamer jar> toTCPSocket --output-socket <host:port>

# write output to kafka topic (only if logical source(s) are streams)
$FLINK_BIN run <path to RMLStreamer jar> toKafka --broker-list <host:port> --topic <topic name>

Complete RMLStreamer usage:

Usage: RMLStreamer [toFile|toKafka|toTCPSocket] [options]

  -j, --job-name <job name>
                           The name to assign to the job on the Flink cluster. Put some semantics in here ;)
  -i, --base-iri <base IRI>
                           The base IRI as defined in the R2RML spec.
  --disable-local-parallel
                           By default input records are spread over the available task slots within a task manager to optimise parallel processing,at the cost of losing the order of the records throughout the process. This option disables this behaviour to guarantee that the output order is the same as the input order.
  -m, --mapping-file <RML mapping file>
                           REQUIRED. The path to an RML mapping file. The path must be accessible on the Flink cluster.
  --json-ld                Write the output as JSON-LD instead of N-Quads. An object contains all RDF generated from one input record. Note: this is slower than using the default N-Quads format.
  --bulk                   Write all triples generated from one input record at once.
  --checkpoint-interval <time (ms)>
                           If given, Flink's checkpointing is enabled with the given interval. If not given, checkpointing is disabled.
Command: toFile [options]
Write output to file
  -o, --output-path <output file>
                           The path to an output file.
Command: toKafka [options]
Write output to a Kafka topic
  -b, --broker-list <host:port>[,<host:port>]...
                           A comma separated list of Kafka brokers.
  -t, --topic <topic name>
                           The name of the Kafka topic to write output to.
  --partition-id <id>      EXPERIMENTAL. The partition id of kafka topic to which the output will be written to.
Command: toTCPSocket [options]
Write output to a TCP socket
  -s, --output-socket <host:port>
                           The TCP socket to write to.

Examples

Processing a stream

An example of how to define the generation of an RDF stream from a stream in an RML Mapping via TCP.

 <#TripleMap>

    a rr:TriplesMap;
    rml:logicalSource [
        rml:source [
            rdf:type rmls:TCPSocketStream ;
            rmls:hostName "localhost";
            rmls:type "PULL" ;
            rmls:port "5005"
        ];
        rml:referenceFormulation ql:JSONPath;
    ];

    rr:subjectMap [
        rml:reference "$.id";
        rr:termType rr:IRI;
        rr:class skos:Concept
    ];

    rr:predicateObjectMap [
            rr:predicateMap [
                rr:constant dcterms:title;
                rr:termType rr:IRI
            ];
            rr:objectMap [
                rml:reference "$.id";
                rr:termType rr:Literal
            ]
        ].

The RML Mapping above can be executed as follows:

The input and output in the RML Framework are both TCP clients when streaming. Before running stream mappings the input and output ports must be listened to by an application. For testing purposes the following commands can be used:

$ nc -lk 5005 # This will start listening for input connections at port 5005
$ nc -lk 9000 # This will start listening for output connections at port 9000
# This is for testing purposes, your own application needs to start listening to the input and output ports. 

Once the input and output ports are listened to by applications or by the above commands, the RML Mapping can be executed. RMLStreamer will open the input and output sockets so it can act upon data that will be written to the input socket.

$FLINK_BIN run <path to RMLStreamer jar> toTCPSocket -s localhost:9000 -m .../framework/src/main/resources/json_stream_data_mapping.ttl
# The -m paramater sets the mapping file location
# The -s parameter sets the output socket port number

Whenever data is written (every data object needs to end with \n) to the socket, this data will be processed by the RML Framework.

Generating a stream from a Kafka Source

An example of how to define the generation of an RDF stream from a stream in an RML Mapping via Kafka.

 <#TripleMap>

    a rr:TriplesMap;
    rml:logicalSource [
        rml:source [
            rdf:type rmls:KafkaStream ;
            rmls:broker "broker" ;
            rmls:groupId "groupId";
            rmls:topic "topic";
        ];
        rml:referenceFormulation ql:JSONPath;
    ];

Note on using Kafka with Flink: As a consumer, the Flink Kafka client never subscribes to a topic, but it is assigned to a topic/partition (even if you declare it to be in a consumer group with the rmls:groupId predicate). This means that it doesn't do anything with the concept "consumer group", except for committing offsets. This means that load is not spread across RMLStreamer jobs running in the same consumer group. Instead, each RMLStreamer job is assigned a partition. This has some consequences:

  • When you add multiple RMLStreamer jobs in a consumer group, and the topic it listens to has one partition, only one instance will get the input.
  • If there are multiple partitions in the topic and multiple RMLStreamer jobs, it could be that two (or more) jobs are assigned a certain partition, resulting in duplicate output.

See also https://stackoverflow.com/questions/38639019/flink-kafka-consumer-groupid-not-working .

The only option for spreading load is to use multiple topics, and assign one RMLStreamer job to one topic.

Generating a stream from a file (to be implemented)
<#TripleMap>

    a rr:TriplesMap;
    rml:logicalSource [
        rml:source [
            rdf:type rmls:FileStream;
            rmls:path "/home/wmaroy/github/rml-framework/akka-pipeline/src/main/resources/io/rml/framework/data/books.json"
        ];
        rml:referenceFormulation ql:JSONPath;
        rml:iterator "$.store.books[*]"
    ];

    rr:subjectMap [
        rr:template "{$.id}" ;
        rr:termType rr:IRI;
        rr:class skos:Concept
    ];

    rr:predicateObjectMap [
            rr:predicateMap [
                rr:constant dcterms:title;
                rr:termType rr:IRI
            ];
            rr:objectMap [
                rml:reference "$.id";
                rr:termType rr:Literal
            ]
        ].
Generating a stream from a dataset
 <#TripleMap>

    a rr:TriplesMap;
    rml:logicalSource [
        rml:source "/home/wmaroy/github/rml-framework/akka-pipeline/src/main/resources/io/rml/framework/data/books_small.json";
        rml:referenceFormulation ql:JSONPath;
        rml:iterator "$.store.books"
    ];

    rr:subjectMap [
        rml:reference "id";
        rr:termType rr:IRI;
        rr:class skos:Concept
    ];

    rr:predicateObjectMap [
            rr:predicateMap [
                rr:constant dcterms:title;
                rr:termType rr:IRI
            ];
            rr:objectMap [
                rml:reference "id";
                rr:termType rr:Literal
            ]
        ] .
        

RML Stream Vocabulary (non-normative)

Namespace: http://semweb.mmlab.be/ns/rmls#

The RML vocabulary have been extended with rmls to support streaming logical sources. The following are the classes/terms currently used:

  • rmls:[stream type]

    • rmls:TCPSocketStream specifies that the logical source will be a tcp socket stream.
    • rmls:FileStream specifies that the logical source will be a file stream (to be implemented).
    • rmls:KafkaStream specifies that the logical source will be a kafka stream.
  • rmls:hostName specifies the desired host name of the server, from where data will be streamed from.

  • rmls:port specifies a port number for the stream mapper to connect to.

  • rmls:type specifies how a streamer will act:

    • "PULL":
      The stream mapper will act as a client.
      It will create a socket and connect to the specified port at the given host name.
      rmls:port and rmls:hostName needs to be specified.
    • "PUSH":
      The stream mapper will act as a server and will start listening at the given port.
      If the given port is taken, the mapper will keep opening subsequent ports until a free port is found.
      Only rmls:port needs to be specified here.

Example of a valid json logical source map using all possible terms:


rml:logicalSource [
        rml:source [
            rdf:type rmls:TCPSocketStream ;
            rmls:hostName "localhost";
            rmls:type "PULL" ;
            rmls:port "5005"
        ];
        rml:referenceFormulation ql:JSONPath;
    ];

rmlstreamer's People

Contributors

s-minoo avatar ghsnd avatar gertjandemulder avatar pheyvaer avatar wmaroy avatar vemonet avatar kmhaeren avatar bjdmeest avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.