Giter Site home page Giter Site logo

MongoDB replication/sharding about das HOT 16 CLOSED

dmwm avatar dmwm commented on July 17, 2024
MongoDB replication/sharding

from das.

Comments (16)

vkuznet avatar vkuznet commented on July 17, 2024

valya: I modified DAS internals and configuration to use MongoDB URIs. Now it's time to MongoDB setup tweaks.

from das.

vkuznet avatar vkuznet commented on July 17, 2024

Here is an example of sharding configuration

#!/bin/bash

shardname=`hostname -s`

# each node has 1 shard, 1 config server and 1 router
# bring up the shard node
cmd="mongod --port 8230 --shardsvr --dbpath <shard_path>"
echo $cmd

# bring up config server
cmd="mognod --port 8231 --configsvr --dbpath <config_dbpath>"
echo $cmd

# bring up router, where list_of_config_servers in a form host1:port1,host2:port2
cmd="mongos --port 8232 --configdb <list_of_config_servers>"
echo $cmd

# JavaScript/DB part
echo "Create shard.js configuration file, to be executed as: mongo shard.js"
echo "-------- shard.js ---------"
cat > shard.js << EOF
use admin;

// add shards to router
db.runCommand( { addshard : "localhost:8230" }, 
                name: "$shardname",
                allowLocal: true);

// enable sharding
db.runCommand( { enablesharding : "das" } );

// shard cache collection
db.runCommand( { shardcollection : "das.cache", 
                key : { dataset.name:1, block.name:1, file.name:1 } }

// shard merge collection
db.runCommand( { shardcollection : "das.merge", 
                key : { dataset.name:1, block.name:1, file.name:1 } }
EOF
cat shard.js
echo "-------- end of shard.js ---------"

# execute shard configuration file on configuration server
cmd="mongo --port 8232 shard.js"
echo $cmd

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: Need to test the following scenarios:

  1. Replica set
    • write explicit commands to setup replica set
    • confirm that replicas works when node is going down
  2. Sharding configuration
    a. set 1+1+1 configuration, 1 shard, 1 config, 1 router all on the same node
    b. set 1(a+b)+1+1 configuration, where a shard is set as a replica set
  3. Sharding keys
    • random shards (by id only)
    • type shards (shard only large "buckets"), e.g. define shard keys as dataset, file
    • query key shards (shard everything on the key joins use)
      • need to check if I can change shard keys dynamically, since we don't know a-priory the queries, even though we can pre-define some shard keys, similar to type shards

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: From this thread, http://groups.google.com/group/mongodb-user/browse_thread/thread/c9e44d046fb6a74a, I figure out that all documents in shard cluster need to have values in shard keys. Since we dynamically create docs and docs not necessary share keys, e.g. dataset vs runs, our only choice is random shards using id as a key or das.expire.

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: A few observations during shard testing.

  • we must create shard keys up-front
  • data which do not have shard key will not be written into shard
  • data are spread evenly among shards

In current DAS design we have two collections in DAS db: cache and merge. We store records with different keys into them. This do not allow to use common shard key, except das_id and/or das.expire. But these keys are not used in queries, therefore we cannot benefit from them. Instead we can take slightly different approach and divide DAS db into multiple collections based on data types. For instance, files.cache/file.merge, dataset.cache/dataset.merge, etc. This will allow to use sharding per data collection with well defined shard keys, e.g. file.name, dataset.name.

Details will be the following:

  • upon input query create new collection if it does not exists
  • invoke admin.command to add new shard key, e.g.
    admin = conn.admin
    admin.command('shardcollection', 'files.cache', key={'file.name':1})
  • ensure other indexes for that collection
  • insert data into new collection

from das.

vkuznet avatar vkuznet commented on July 17, 2024

I was able to create dynamically the shards using the following code:

def add_shard(ckey):
    "Add shard dynamically"
    uris = ['mongodb://vk5.cern.ch:8232']
    conn = Connection(host=uris)
    dbname, dbcoll = ckey.split('.')
    coll = conn[dbname][dbcoll]
    admin= conn.admin
    print "enablesharding", dbname
    print "shardcollection", ckey
    admin.command('enablesharding', dbname)
    admin.command('shardcollection', ckey, key={ckey:1})
    limit= 2
    for idx in range(0, limit):
        timestamp = time.time() + int(random_index(3600))
        value = gen_passwd()
        rec = {dbname : {dbcoll: value, 'idx':idx}, 'das': {'expire':timestamp}}
        rec.update(dict(das_id=genkey(rec)))
        print "rec", rec
        coll.ensure_index([('das_id', 1)])
        coll.insert(rec)
        spec = {"das_id": rec['das_id']}
        print coll.find(spec).explain()

add_shard('file.name')

It reproduce real time use case. We get DAS key, e.g. file.name, for this primary key we create a new database collection in shard cluster, with database name as '''file''' and collection name as '''name'''. The shard key become '''file.name''', and we insert records with file.name values into desired shard collection. Here is shard configuration configuration

MongoDB shell version: 1.6.2
connecting to: 127.0.0.1:8232/test
--- Sharding Status --- 
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
      { "_id" : "shard1", "host" : "vk1.cern.ch:8230" }
      { "_id" : "shard2", "host" : "vk2.cern.ch:8230" }
      { "_id" : "shard3", "host" : "vk3.cern.ch:8230" }
  databases:
    { "_id" : "admin", "partitioned" : false, "primary" : "config" }
    { "_id" : "das", "partitioned" : true, "primary" : "shard1" }
        das.cache chunks:
            { "name" : { $minKey : 1 } } -->> { "name" : { $maxKey : 1 } } on : shard1 { "t" : 1000, "i" : 0 }
    { "_id" : "file", "partitioned" : true, "primary" : "shard2" }
        file.name chunks:
            { "file.name" : { $minKey : 1 } } -->> { "file.name" : { $maxKey : 1 } } on : shard2 { "t" : 1000, "i" : 0 }
    { "_id" : "dataset", "partitioned" : true, "primary" : "shard3" }
        dataset.name chunks:
            { "dataset.name" : { $minKey : 1 } } -->> { "dataset.name" : { $maxKey : 1 } } on : shard3 { "t" : 1000, "i" : 0 }

It is interesting to observe that primary shard is chosen accordingly among empty shards. Such that dataset comes to shard3 while files to shard3. Now I need a test this with some number of records to see how load will be spread.

from das.

vkuznet avatar vkuznet commented on July 17, 2024

I finished stress test with shard configuration. The file/dataset shards were created dynamically and populated with 3M records. The load has been spread across shards more/less evenly. For instance, here is numbers for file shard: 1125766 records on shard1, 1499670 on shard2 and 374564 on shard3. Here is DB stats:

{ "_id" : "file", "partitioned" : true, "primary" : "shard2" }
file.name chunks:
    { "file.name" : { $minKey : 1 } } -->> { "file.name" : "012SIFOc" } on : shard3 { "t" : 2000, "i" : 0 }
    { "file.name" : "012SIFOc" } -->> { "file.name" : "7lgFtvNZ" } on : shard1 { "t" : 4000, "i" : 0 }
    { "file.name" : "7lgFtvNZ" } -->> { "file.name" : "FX1ghmpa" } on : shard1 { "t" : 5000, "i" : 0 }
    { "file.name" : "FX1ghmpa" } -->> { "file.name" : "NEnJgdBW" } on : shard1 { "t" : 6000, "i" : 0 }
    { "file.name" : "NEnJgdBW" } -->> { "file.name" : "UzOXBFve" } on : shard3 { "t" : 7000, "i" : 0 }
    { "file.name" : "UzOXBFve" } -->> { "file.name" : "cjDzR3qe" } on : shard2 { "t" : 3000, "i" : 33 }
    { "file.name" : "cjDzR3qe" } -->> { "file.name" : "kTHXbNvD" } on : shard2 { "t" : 3000, "i" : 34 }
    { "file.name" : "kTHXbNvD" } -->> { "file.name" : "sEK8cBzt" } on : shard2 { "t" : 3000, "i" : 35 }
    { "file.name" : "sEK8cBzt" } -->> { "file.name" : "zyxXcWvA" } on : shard2 { "t" : 7000, "i" : 1 }
    { "file.name" : "zyxXcWvA" } -->> { "file.name" : { $maxKey : 1 } } on : shard3 { "t" : 3000, "i" : 0 }

All tests used default shard chunk size which is 200MB. This parameter can be tuned during shard configuration.

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: To make collection separation, I need to make a pool of rawcaches. Such that DASCore class will replace self.rawcache with hash dict, whose key will be primary_keys, e.g. dataset and values will be Connections to appropriate collections. For example

self.rawcache = {
   'dataset': Connection().dataset,
   'file': Connection().file,
}

Then each connection database will have two collections, cache and merge. The code should be adjusted to replace DASCore.rawcache with DASCore.rawcache[primary_key].

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: I've added a new class to core/das_collection_manager.py who can allocate and return pointer to proper collection based on input query. The idea is to parse input query extract its keys and use them to create a new collection in Mongo. For instance, if user asked for block, the query should be redirected to block.cache, block.merge collection, while if user asked for dataset the query will be redirected to dataset.cache, dataset.merge collection. Now we need to modify das_mongocache.py class to take advantage of that. To do so we need to perform the following actions:

  • add new collection manager into DASMongocache, e.g. self.colmgr
  • replace all methods using self.col to use self.colmgr.cache(query)
  • replace all methods using self.merge to use self.colmgr.merge(query)
  • remove/replace self.mdb with appropriate call to proper collection
  • change insert_query_record to insert query records into das.cache
  • modify clean_cache, delete_cache, map_reduce to loop over all cache/merge collections

Right now I have 3 collections: das.cache, das.merge, das.mapreduce. In new design I will have N collection with DAS key, e.g. block.cache, block.merge, dataset.cache, dataset.merge, which will hold data records (records with actual meta-data which) and I will need another collection to store query records, e.g. das.queries. The das.mapreduce can stay intact since it will only contains map-reduce functions.

All these changes will affect analytics code, since it uses rawcache. Some methods, e.g. find_records, get_superset_keys, similar_queries will require modification to look-up query records from separate collection.

from das.

drsm79 avatar drsm79 commented on July 17, 2024

metson: Replying to [comment:10 valya]:

I've added a new class to core/das_collection_manager.py who can allocate and return pointer to proper collection based on input query. The idea is to parse input query extract its keys and use them to create a new collection in Mongo. For instance, if user asked for block, the query should be redirected to block.cache, block.merge collection, while if user asked for dataset the query will be redirected to dataset.cache, dataset.merge collection.

Dumb question: what happens if a query is across two keys (e.g. locations of blocks in a given primary dataset)?

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: Replying to [comment:11 metson]:

Replying to [comment:10 valya]:

I've added a new class to core/das_collection_manager.py who can allocate and return pointer to proper collection based on input query. The idea is to parse input query extract its keys and use them to create a new collection in Mongo. For instance, if user asked for block, the query should be redirected to block.cache, block.merge collection, while if user asked for dataset the query will be redirected to dataset.cache, dataset.merge collection.

Dumb question: what happens if a query is across two keys (e.g. locations of blocks in a given primary dataset)?

In first approximation it would be combined key, e.g.
block=bla site=T1
will resolve into block_site collection. The more accurate approach needs to be determine (nice topic for Dong thesis). The problem here is multidimensional. On one side we need to have a shard key which should be present in collection (definition of shard), on another we would like to share data across collection. This is related to determination of query super/sub-sets.

from das.

DMWMBot avatar DMWMBot commented on July 17, 2024

liangd: Here is the description of how did I splite the database(as we discussed last week)
Split Database into smaller ones, each of them named by a das entity.
For example:
block dataset=/a/b/c | grep block.name
The result will store in to dataset['run']. And finally retrieve the result from database['run'].
And we could have multiple select entity:
block dataset dataset=/a/b/c, And the result will come from database['block'] and databse['dataset']

Here is the major changes:
0. In order to have a unique query record for manage status, A database['query'] is seprated for this purpose.

  1. Map the query to matching apis, each apis will write down the records at destinate database according the das.primary_key. A collection_manager Class is managing the loading of database, indexing correct database via given query and primary_key.
  2. During merging, for we merge records based on das.primary_key, so we merge is happening inside each individual database.
  3. At get_from_cache steps, a colletion set called DASCollection will be return from collection_manager, which will perform a find/remove/count operation on the collection set. Instead of a pymongo cursor, a cursor made via itertools.chain() will be return.

from das.

DMWMBot avatar DMWMBot commented on July 17, 2024

liangd: Here is the major change:

  1. reviewed tools/analytics codes for supporting databases change.
  2. enable switch between spliting or not in configuration file:
# mongodb configuration
...
config.mongodb.enable_sharding = True

# shard configuration
config.component_('sharding')
config.sharding.querydb = 'query'
config.sharding.querydb_colleciton = 'db'
config.sharding.cachecollection = 'cache'
config.sharding.mergecollection = 'merge'
config.sharding.mrdb = 'mapreduce'
config.sharding.mrcollection = 'db'

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: Dong, thanks for update. I'm at CERN now and will only back on Apr 8th. I want to sit down and carefully review your patch so I'll do it once I'll get back. If you'll get anything before that feel free to update the ticket.

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: Replying to [comment:15 valya]:

Dong, thanks for update. I'm at CERN now and will only back on Apr 8th. I want to sit down and carefully review your patch so I'll do it once I'll get back. If you'll get anything before that feel free to update the ticket.

Dong it would be nice if you'll write some document (I prefer in doc/sphinx/sharding.rst) which describes what sharding means and how it is implemented. It would be nice if you'll make some pictures as well showing the workflow.

from das.

vkuznet avatar vkuznet commented on July 17, 2024

valya: With new DAS query interface I no longer need to worry about shard keys. All DAS records in both cache and merge collections will have qhash. This will become a shard key. The qhash is a md5 hash of user input query. It is also a request ID returned to clients.

Now the question is how to setup sharding environment in a way to satisfy HTTP sandbox deployment. We need to deploy router node (mongos) on one of the cmsweb back-ends. In addition we need to deploy shard nodes on back-ends. Then setup procedure for shard and config nodes, router node. The router node should be started once all shards in place (need to check that). Then DAS will need to re-connect to shard node for processing requests.

If we deploy DAS+MongoDB on every cmsweb node, then MongoDB will contain all dbs, where only das.cache/das.merge DBs will be sharded. Then we will choose router node. This means we can still deploy DAS+MongoDB which need to be adjusted in order to enable shard configuration. And we will need new app which will deploy router (mongos).

from das.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.