Giter Site home page Giter Site logo

node-stream-to-mongo-db's Introduction

Alt text Build Status Code Climate

Support this package

Please support this package by starring it on Github

Stream To Mongo DB

stream-to-mongo-db allows you to stream objects directly into a MongoDB databases, using a read stream (an a S3 file, local file, a Web API or even another MongoDB database). The best thing about this package is it allows you to control the size of the batch before issuing a write to mongo - see CONFIG

SUPPORTED NODE VERSIONS

This package supports Node.js versions 8+. If you require another version to be supported, please raise an issue.

USAGE

npm i stream-to-mongo-db

EXAMPLES

Example 1: Stream from another MongoDB database

Example 1.1: Using MongoDB Client

const MongoClient     = require('mongodb').MongoClient;
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;

// where the data will come from
const inputDBConfig  = { dbURL : 'mongodb://localhost:27017/yourInputDBHere', collection : 'yourCollectionHere'  };
// where the data will end up
const outputDBConfig = { dbURL : 'mongodb://localhost:27017/streamToMongoDB', collection : 'devTestOutput' };

MongoClient.connect(inputDBConfig.dbURL, (error, db) => {
    if(error) { throw error; }

    // create the writable stream
    const writableStream = streamToMongoDB(outputDBConfig);

    // create readable stream and consume it
    const stream = db.collection(inputDBConfig.collection).find().stream();

    stream.pipe(writableStream);

    stream.on('end', () => {
        console.log('done!');
        db.close();
    });
});

Example 1.2: Using Mongoose

const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const mongoose        = require('mongoose');

// where the data will come from
const connection = mongoose.connect('mongodb://localhost:27017/streamToMongoDB');
const MyModel    = mongoose.model('ModelName', mySchema);

// where the data will end up
const outputDBConfig = { dbURL : 'mongodb://localhost:27017/streamToMongoDB', collection : 'devTestOutput' };

// create the writable stream
const writableStream = streamToMongoDB(outputDBConfig);

// create readable stream and consume it
const stream = MyModel.find().lean().stream();

stream.pipe(writableStream);

stream.on('end', () => {
    console.log('done!');
    connection.close();
});

This example gets even more powerful when you want to transform the input data before writing it to the writableStream:

[...]

// create the readable stream and transform the data before writing it
const stream = MyModel.find().lean().stream({
    transform: (doc) => {
        // do whatever you like to the doc
        doc.whoIsAwesome = 'StreamToMongoDBIsAwesome';
    }
});

stream.pipe(writableStream);

stream.on('end', () => {
    console.log('done!');
    connection.close();
});

Example 2: Stream from an S3 file using AWS-SDK

const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const AWS             = require('aws-sdk');
const JSONStream      = require('JSONStream');

const s3              = new AWS.S3();
const params          = { Bucket: 'myBucket', Key: 'myJsonData.json' };

// where the data will end up
const outputDBConfig = { dbURL : 'mongodb://localhost:27017/streamToMongoDB', collection : 'devTestOutput' };

// create the writable stream
const writableStream = streamToMongoDB(outputDBConfig);

// create readable stream and consume it
s3.getObject(params).createReadStream()
    .pipe(JSONStream.parse('*'))
    .pipe(writableStream);

Example 3: Stream from a Web API

const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const request         = require('request');
const JSONStream      = require('JSONStream');

// where the data will end up
const outputDBConfig = { dbURL : 'mongodb://localhost:27017/streamToMongoDB', collection : 'devTestOutput' };

// create the writable stream
const writableStream = streamToMongoDB(outputDBConfig);

// create readable stream and consume it
request('www.pathToYourApi.com/endPoint')
    .pipe(JSONStream.parse('*'))
    .pipe(writableStream);

Example 4: Stream from a local file

const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const JSONStream      = require('JSONStream');
const fs              = require('fs');

// where the data will end up
const outputDBConfig = { dbURL: 'mongodb://localhost:27017/streamToMongoDB', collection: 'devTestOutput' };

// create the writable stream
const writableStream = streamToMongoDB(outputDBConfig);

// create readable stream and consume it
fs.createReadStream('./myJsonData.json')
    .pipe(JSONStream.parse('*'))
    .pipe(writableStream);

CONFIG

  • dbURL

    [ REQUIRED - String ]

    The url to your db (including the db name)

    eg: mongodb://localhost:27017/streamToMongoDB

  • dbConnection

    [ OPTIONAL - Object ]

    An optional connection to mongodb (By default, stream-to-mongo-db will open a new mongo connection). When provided, dbUrl will be ignored.

  • collection

    [ REQUIRED - String ]

    The collection to stream to

    eg: myCollection

  • batchSize

    [ OPTIONAL [ default : 1 ] - Integer ]

    The number of documents consumed from the read stream before writing to mongodb

    This option defaults to 1, i.e: write every object individually to mongoDB as it is received. This default is ideal if want to ensure every object is written as soon as possible without the possibility of losing any objects if the MongoDB connection is interrupted.

    However, in most cases, this is unnecessary, since writing every object individually will incur an additional I/O cost. You can change this option to, say 100, which will batch these writes in 100's; allowing you to consume the stream must faster.

    eg: 100

  • insertOptions

    [ OPTIONAL [ default : { w : 1 } ] - Object ]

    MongoDB insert options

    This option defaults to { w : 1 }, i.e: requests acknowledgement that the write operation has propagated to the standalone mongod or the primary in a replica set

    eg: see mongo documentation for other options

CONTRIBUTION

Please feel free to fork, pull request, discuss, share your ideas and raise issues. Any feedback is welcome!

ACKNOWLEDGEMENTS

Insipred by stream-to-mongo

LICENSE

MIT

node-stream-to-mongo-db's People

Contributors

abdullahali avatar fabienjuif avatar kainhaart avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

node-stream-to-mongo-db's Issues

Possible expansion to include other data operations.

I was using this as a quick way of streaming some data into my local mongodb instance for a pet project and I need to not only insert, but also update. I could make a PR with some changes that allow multiple operations (update, stream delete based on index, etc.) If that is something the owner of this repository is interested in pursuing.

Let me know what you think @AbdullahAli

Force flush if readable stream is < batchSize

Hi,

First off thanks for the great package.

Instead of piping directly to the stream, I'm using the stream.write({obj}) option. It works well but if I don't hit the batch size, then the last group will not write to Mongo. Is there a way to force flush whatever is left (i.e. write whatever it has to mongo and finish the stream)?

Parse String API to JSON

hi i have somthing Web Stream API but this is not Pure JSON Data
the Data Like this
id: 0
data: {"code":"start","data":1}

id: 0
data: {"code":"server","data":["192.168.188.192","9333"]}

id: 0
data: {"code":"Create","data":1}

Attempting to connect to '192.168.188.192' on port '9333'...
id: 0
data: {"code":"Connect","data":1}

id: 1503388899
data: {"code":"sending","data":"R|LT|1|1\r\n"}

id: 1503388899
data: {"code":"counter","data":0}

how i parse this data to Json ?

TypeError: Cannot read property 'stream' of undefined

Trying out in local where nodeJS = v8.0.0, Mongo DB = 3.6.3

"dependencies": {
    "mongoose": "^5.2.15",
    "stream-to-mongo-db": "^1.5.0",
    "xml2js": "^0.4.19"
  }

Error:

node stream.js
/Users/vish/Projects/testnpm/node_modules/mongoose/node_modules/mquery/lib/collection/node.js:131
  return this.collection.find(match, findOptions).stream(streamOptions);
                                                 ^

TypeError: Cannot read property 'stream' of undefined
    at NodeCollection.findStream (/Users/vish/Projects/testnpm/node_modules/mongoose/node_modules/mquery/lib/collection/node.js:131:50)
    at model.Query.Query.stream (/Users/vish/Projects/testnpm/node_modules/mongoose/node_modules/mquery/lib/mquery.js:3017:27)
    at Object.<anonymous> (/Users/vish/Projects/testnpm/stream.js:54:38)
    at Module._compile (module.js:569:30)
    at Object.Module._extensions..js (module.js:580:10)
    at Module.load (module.js:503:32)
    at tryModuleLoad (module.js:466:12)
    at Function.Module._load (module.js:458:3)
    at Function.Module.runMain (module.js:605:10)
    at startup (bootstrap_node.js:158:16)
    at bootstrap_node.js:575:3

The code is same as given in readme markdown.

const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const mongoose = require('mongoose');

// where the data will come from
const connection = mongoose.connect('mongodb://admin:[email protected]:27017/dbBiz?authSource=admin&authMechanism=SCRAM-SHA-1');
// Event Schema
const eventsSchema = mongoose.Schema(
    {
        accountIdentifier: { type: Number }     
    }, { collection: 'events' }

);
const MyModel = mongoose.model('events', eventsSchema);

// where the data will end up
const outputDBConfig = { dbURL: 'mongodb://admin:[email protected]:27017/dbBiz?authSource=admin&authMechanism=SCRAM-SHA-1', collection: 'bizeventsConverted' };

// create the writable stream
const writableStream = streamToMongoDB(outputDBConfig);

// create readable stream and consume it
const stream = MyModel.find().lean().stream();

/*
//TODO:
// create the readable stream and transform the data before writing it

*/

stream.pipe(writableStream);

stream.on('end', () => {
    console.log('done!');
    connection.close();
});

Use DuplexStream/Tranform to allow reading the committed record.

It would be beneficial if you used stream.Transform to allow enable your the stream to be re-usable

E.g. the following code would be extremely useful for message ingesting from a kafka/messaging topic.

const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const mongoUrl = 'mongodb://localhost:27017/schema';
const mongoStream = streamToMongoDB({ dbURL: mongoUrl, collection: 'messages' });


topicConsumer
    .pipe(messageTransform)  // Normalize a consumed message
    .pipe(mongoStream) // publish to mongodb
    .pipe(topicProducer); // publish resulting mongo record (including ObjectId) to some other topic

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.