Giter Site home page Giter Site logo

ibmstreams / streamsx.protobuf Goto Github PK

View Code? Open in Web Editor NEW
2.0 8.0 0.0 128 KB

IBM Streams toolkit for parsing and creating Google Protocol Buffers

Home Page: https://ibmstreams.github.io/streamsx.protobuf/

License: Apache License 2.0

Makefile 1.60% Perl 89.52% Shell 0.22% Perl 6 8.66%
ibmstreams streams protobuffer protobuf

streamsx.protobuf's Introduction

streamsx.protobuf

The streamsx.protobuf toolkit contains operators for interacting with serialized protocol buffer messages. It contains two conversion operators and two simple source operators.

Currently, this toolkit only supports proto2 syntax.

Examples can be found in the streamsx.protobuf.samples directory.

Conversion operators

  1. ProtobufParse takes a tuple with a blob field and emits a tuple matching the protoMessage parameter type it is given.

  2. ProtobufBuild takes a tuple as generated by the spl-schema-from-protobuf script (see below) and emits a serialized version in the Protobuf serialization format as a blob.

Important!! To compile these operators, your Makefile should include the following flag passed to sc:

APPDIR = $(shell basename `pwd`)

sc -M <main composite> -t <path to streamsx.protobuf> -w -Wl,-rpath="'\$$\$$ORIGIN/../toolkits/$(APPDIR)/impl/lib'"

Note, that -Wl is W followed by lowercase L. If this is not included, the sab bundle does not properly set the runtime library path to include the generated libcustomproto.so that is placed in the application directory.

Source operators

  1. ProtobufTCPSource creates a TCP server that will accept connections which can pass 1 or more Protobuf messages, each prefixed with a 4-byte record length.

  2. ProtobufFileSource reads binary files that contain Protobuf messages, each prefixed with a 4-byte record length.

Configuration

The streamsx.protobuf toolkit requires the Protobuf libraries are installed on the compiling machine.

The easiest way to install them is from the CentOS base yum repository

yum install protobuf.x86_64
yum install protobuf-devel.x86_64

Two environment variables are required: :$STREAMSX_PROTOBUF_LIBPATH :$STREAMSX_PROTOBUF_INCLUDEPATH.

The following statements will set them for protobuf and protobuf-devel that are available with CentOS:

export STREAMSX_PROTOBUF_LIBPATH=/usr/lib64
export STREAMSX_PROTOBUF_INCLUDEPATH=/usr/include/google/protobuf

Generating SPL schemas from .proto files

This toolkit contains a script under streamsx.protobuf/bin called spl-schema-from-protobuf. This script will generate tuples in SPL to match the Protobuf messages in .proto files.

This generated schema is required to use the conversion operators.

ProtobufParse emits the tuple generated by the script corresponding to the Protobuf message it is receiving.

ProtobufBuild receives the tuple generated by the script corresponding to the Protobuf message it is producing.

Naming Conventions

For all message and enum names, _pb is appended to the identifier.

For all field names or enum values, _ (underscore) is appended to the identifier. An example can be seen in streamsx.protobuf.samples.

Usage

To use this toolkit, create an empty application. Place your .proto file inside your <application>/impl directory.

Run the command:

<path to streamsx.protobuf toolkit>/bin/spl-schema-from-protobuf impl <your protobuf file name>

This will generate the SPL schema to use with the conversion operators. The files will be placed in your <application> directory within a nested directory structure based on the .proto message structure. For example, if your .proto file contains a package directive: package tutorial, then the generated files will be in <application>/tutorial.

The output will also include a console message providing the code snippet to add to your .spl application to use the generated SPL types.

As an example, if you use the protobuf tutorial file (address.proto), the directory structure created will include:

tutorial
├── AddressBook_pb.spl
├── Person_pb.spl
├── Person_PhoneNumber_pb.spl
└── Person_PhoneType_pb.spl

Next, you will use the operators within your application or composite operator.

Simple Example

If you have a Protobuf message named my.package.MyMessage, the files will look like this:

MyMessage.proto

syntax = "proto2";

package my.package;

message MyMessage {
    required string field = 1;
}

my.package/MyMessage_pb.spl

namespace my.package;

use my.package::*;

type MyMessage_pb = tuple<
    rstring field_
>;

ProtobufParse invocation

stream<blob recordData> serializedRecords = ProtobufFileSource() {
    param
        file: "<binary file>";
}

stream<my.package::MyMessage_pb> myMessages = ProtobufParse(serializedRecords) {
    param
        dataAttribute: recordData;
        protoMessage: "my.package.MyMessage";
        protoDirectory: "impl";
        protoRootFile: "MyMessage.proto";
}

ProtobufBuild invocation

stream<my.package::MyMessage_pb> myMessages = Beacon() {
    param
        period: 1.0;
    output
        myMessages: field_ = "<value>";
}

stream<blob recordData> serializedRecords = ProtobufParse(myMessages) {
    param
        protoMessage: "my.package.MyMessage";
        protoDirectory: "impl";
        protoRootFile: "MyMessage.proto";
}

Under the hood

How do the converters work?

They utilize a grammar file in yapp, which is a Perl port of yacc. The grammar defines the proto2 syntax according to the Google language specification sheet. The yapp grammar is compiled into a Perl module, which generates a parse tree containing all message and enum definitions within the file. For each import from the root file, this process is repeated until all files have been processed.

The Build/Parse operators iterate through this parse tree to map Protobuf message values into and out of Streams tuples. At compile time, these operators run this parser to create the tree, and then they run the protoc command to generate the C++ that is the messages will use. The C++ is compiled into a shared object library named libcustomproto.so, which is stored in the application directory's impl/lib. This means that if more than one Build and/or Parse operator exists in the same composite, they cannot be compiled in parallel and they must used the same Protobuf definitions. Otherwise, race conditions will occur and one or both will be non-functional at run time.

Variable mapping is generated recursively, so infinitely complex messages can be handled. There are two limitations: These operators cannot handle group fields or oneof fields. Oneof fields are planned for future implementation, but group fields have been deprecated by Google in favor of nested messages.

Every available effort to ensure the readability of the generated code was made, as this makes debugging issues much easier. Feel free to take a look. However, all variable names are randomly generated to reduce the likelihood of a name collision. Name collisions are not checked beforehand, as the likelihood of not having a name collision in a message with 100 fields is (1-1/52^20)^100, which is infintessimally small.

Some older versions of the protoc compiler do not require the first line to state syntax = "proto2";, but this parser requires the statement to be present regardless of the version of protoc installed.

streamsx.protobuf's People

Contributors

bmwilli avatar petenicholls avatar resinten avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamsx.protobuf's Issues

Missing functions for optional parameters

Hi,

Having issues with the following proto file and Streams code.


src/operator/serialized.cpp: In member function \u2018virtual void SPL::Operator::serialized$OP::process(SPL::Tuple&, uint32_t)\u2019:
src/operator/serialized.cpp:157:23: error: \u2018class com::ibm::streamsx::protobuf::project::SignalStreamMessage\u2019 has no member named \u2018set_bufferOverflow\u2019
WEJUQXwhHagRdlPMEQyq.set_bufferOverflow(inTuple.get_bufferOverflow
());
^
[CXX-operator] deserialized
make: *** [build/operator/serialized.o] Error 1
make: *** Waiting for unfinished jobs....
src/operator/deserialized.cpp: In member function \u2018virtual void SPL::Operator::deserialized$OP::process(const SPL::Tuple&, uint32_t)\u2019:
src/operator/deserialized.cpp:136:99: error: \u2018const class com::ibm::streamsx::protobuf::project::SignalStreamMessage_SignalData\u2019 has no member named \u2018bytesValue\u2019
AbjLpOTzDxHhTtaOEJVM.set_bytesValue
(SPL::blob((const unsigned char *) (*HgidJkNuIKeWCYSoxQTU).bytesValue().data(), (*HgidJkNuIKeWCYSoxQTU).bytesValue().size()));
^
src/operator/deserialized.cpp:136:144: error: \u2018const class com::ibm::streamsx::protobuf::project::SignalStreamMessage_SignalData\u2019 has no member named \u2018bytesValue\u2019
AbjLpOTzDxHhTtaOEJVM.set_bytesValue_(SPL::blob((const unsigned char *) (*HgidJkNuIKeWCYSoxQTU).bytesValue().data(), (*HgidJkNuIKeWCYSoxQTU).bytesValue().size()));
^
make: *** [build/operator/deserialized.o] Error 1
CDISP0141E ERROR: The compilation of the generated code failed.


syntax = "proto2";

package com.ibm.streamsx.protobuf.project;

enum SignalState {
OK = 1;
ERROR = 2;
NOT_AVAILABLE = 3;
}

message SignalStreamMessage {
message SignalData {
required int32 signalId = 1;
optional uint64 deltaTime = 2;
optional SignalState state = 3;
optional int32 intValue = 4;
optional int64 longValue = 5;
optional float floatValue = 6;
optional double doubleValue = 7;
optional bool boolValue = 8;
optional string stringValue = 9;
optional bytes bytesValue = 10;
}
message GPSData {
optional uint64 timestamp = 1;
optional float latitude = 2;
optional float longitude = 3;
optional float altitude = 4;
optional float speed = 5;
optional float heading = 6;
optional float accuracyFactor = 7;
optional float estimatedAccuracy = 8;
optional uint32 satelliteCount = 9;
}
required uint64 timestamp = 1;
required uint64 counter = 2;
optional bool bufferOverflow = 3;
repeated SignalData signals = 4;
repeated GPSData gpsData = 5;
}


	stream<SignalStreamMessage_pb> records = Custom()
	{
		logic
			onProcess :{
				mutable list<SignalStreamMessage_GPSData_pb> gpsData=[];
				mutable list<SignalStreamMessage_SignalData_pb> signals=[];
               	mutable SignalStreamMessage_GPSData_pb g={};
                 g.altitude_=400fw;
                 g.satelliteCount_=4u;
                 g.heading_=180fw;
                 g.latitude_=48.72233fw;
                 g.accuracyFactor_=1fw;
                 g.estimatedAccuracy_=1fw;
                 g.speed_=80fw;
                 g.timestamp_=1560244091359ul;
                 g.longitude_=9.125218fw;
                 appendM(gpsData,g);
                 
                 mutable SignalStreamMessage_SignalData_pb s1={};
                 s1.deltaTime_=10ul;
                 s1.signalId_=1;
                 s1.doubleValue_=4fl;
                 appendM(signals,s1);
                 
                 mutable SignalStreamMessage_SignalData_pb s2={};
                 s2.deltaTime_=20ul;
                 s2.signalId_=2;
                 s2.bytesValue_=(blob)("dGhpc0lzQVN0cmluZw==");
                 appendM(signals,s2);
                 

				mutable SignalStreamMessage_pb msg={};
				msg.counter_=1ul;
           		msg.timestamp_=1560244091359ul;
           		msg.bufferOverflow_=false;
           		
                 msg.gpsData_=gpsData;
                 msg.signals_=signals;
                 submit(msg,records);
			}
	}
	stream<blob data> serialized = ProtobufBuild(records)
		{
			param
				protoMessage : "com.ibm.streamsx.protobuf.project.SignalStreamMessage" ;
				protoDirectory : "." ;
				protoRootFile : "DeviceStreamingStructure.proto" ;
		}

	stream<SignalStreamMessage_pb> deserialized = ProtobufParse(serialized)
		{
			param
				dataAttribute : data ;
				protoMessage : "com.ibm.streamsx.protobuf.project.SignalStreamMessage" ;
				protoDirectory : "." ;
				protoRootFile : "DeviceStreamingStructure.proto" ;
		}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.