Giter Site home page Giter Site logo

mqtt-collector's Introduction

mqtt-collector

mqtt-collector is an MQTT-based data pipeline that ingests data from MQTT publishers. It uses user-defined json-schemas for data validation and MongoDB for storing the validated data.

How does it work?

  • The json schema of the event to send to the pipeline is defined in event_schemas.py
  • Event is published to an MQTT broker.
  • The MQTT client in the pipeline subscribes to the defined events.
  • The published events are inserted to MongoDB if compatible with the defined json schema.

Core dependencies

$ sudo docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx/emqx:latest

How to install and run?

1. Configurations

The project reads the configurations from a .env file. Create a file with the name .env in the project's folder and insert the following necessary configuration values as:

MQTT_BROKER_HOST=localhost
MQTT_BROKER_PORT=1883
MONGO_CONN_STR=mongodb://localhost/events

2. Install the packages

Create a virtual environment outside project and install the necessary packages.

$ virtualenv -p python3.7 venv
$ source venv/bin/activate
$ cd mqtt-collector/
$ pip install -r requirements.txt

3. Run

  • In the project's folder, you can simply run the project as:
$ python run.py

If you see MQTT Connection Successful in console logs, that means you are successfully connected to the broker.

How to define an event schema and send data to the pipeline?

  • To define your json schemas to validate your events, put your schemas one under the other as dictionaries in json-schema format in event_schemas.py file which is in the project's top folder.

# example event_schemas.py file
temp_sensor = {
   "$schema": "http://json-schema.org/schema#",
   "type": "object",
   "properties": {
       "temperature": {
           "type": "number"
       },
       "timestamp": {
           "type": "string",
           "format": "date-time"
       }
   },
   "additionalProperties": False,
   "required": ["temperature", "timestamp"]
}
pressure_sensor = {
   "$schema": "http://json-schema.org/schema#",
   "type": "object",
   "properties": {
       "pressure": {
           "type": "number"
       },
       "timestamp": {
           "type": "string",
           "format": "date-time"
       }
   },
   "additionalProperties": False,
   "required": ["pressure", "timestamp"]
}
  • To send your event to the pipeline, you must use an mqtt publisher. The mqtt topic must be in events/<schema_name> format. For example, if you defined your schema as temp_sensor in your event_schemas.py file, then events/temp_sensor must be the topic to publish to. Note that the event's payload must be json formatted string.

Here is an example publisher:
"""An example event publisher"""
import json
import uuid
import paho.mqtt.client as mqtt

topic = "events/temp_sensor"
broker_host = 'localhost'
client = mqtt.Client(str(uuid.uuid4()))  # create new client with a unique id
client.connect(broker_host, 1883)  # connect the to broker

data = {
    "temperature": 1,
    "timestamp": "2018-11-13T20:20:39+00:00"
}
# serializing json to str
formatted_data = json.dumps(data)
client.publish(topic, formatted_data, qos=0)

Where are the events stored?

  • If the events are compatible with their pre-defined schemas, they are stored on your MongoDB under events database. The name of the collections they are inserted into are the same as the name of the schema. For example, if an event's schema name is temp_sensor, it is inserted into the collection with the name temp_sensor.

TODO

  • Rest api to define event schemas instead of hard-coding them in a file
  • Authentication
  • AWS Kinesis Integration
    ...

mqtt-collector's People

Contributors

beratakuzum avatar

Stargazers

 avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.