Comments (16)
valya: I modified DAS internals and configuration to use MongoDB URIs. Now it's time to MongoDB setup tweaks.
from das.
Here is an example of sharding configuration
#!/bin/bash
shardname=`hostname -s`
# each node has 1 shard, 1 config server and 1 router
# bring up the shard node
cmd="mongod --port 8230 --shardsvr --dbpath <shard_path>"
echo $cmd
# bring up config server
cmd="mognod --port 8231 --configsvr --dbpath <config_dbpath>"
echo $cmd
# bring up router, where list_of_config_servers in a form host1:port1,host2:port2
cmd="mongos --port 8232 --configdb <list_of_config_servers>"
echo $cmd
# JavaScript/DB part
echo "Create shard.js configuration file, to be executed as: mongo shard.js"
echo "-------- shard.js ---------"
cat > shard.js << EOF
use admin;
// add shards to router
db.runCommand( { addshard : "localhost:8230" },
name: "$shardname",
allowLocal: true);
// enable sharding
db.runCommand( { enablesharding : "das" } );
// shard cache collection
db.runCommand( { shardcollection : "das.cache",
key : { dataset.name:1, block.name:1, file.name:1 } }
// shard merge collection
db.runCommand( { shardcollection : "das.merge",
key : { dataset.name:1, block.name:1, file.name:1 } }
EOF
cat shard.js
echo "-------- end of shard.js ---------"
# execute shard configuration file on configuration server
cmd="mongo --port 8232 shard.js"
echo $cmd
from das.
valya: Need to test the following scenarios:
- Replica set
- write explicit commands to setup replica set
- confirm that replicas works when node is going down
- Sharding configuration
a. set 1+1+1 configuration, 1 shard, 1 config, 1 router all on the same node
b. set 1(a+b)+1+1 configuration, where a shard is set as a replica set - Sharding keys
- random shards (by id only)
- type shards (shard only large "buckets"), e.g. define shard keys as dataset, file
- query key shards (shard everything on the key joins use)
- need to check if I can change shard keys dynamically, since we don't know a-priory the queries, even though we can pre-define some shard keys, similar to type shards
from das.
valya: From this thread, http://groups.google.com/group/mongodb-user/browse_thread/thread/c9e44d046fb6a74a, I figure out that all documents in shard cluster need to have values in shard keys. Since we dynamically create docs and docs not necessary share keys, e.g. dataset vs runs, our only choice is random shards using id as a key or das.expire.
from das.
valya: A few observations during shard testing.
- we must create shard keys up-front
- data which do not have shard key will not be written into shard
- data are spread evenly among shards
In current DAS design we have two collections in DAS db: cache and merge. We store records with different keys into them. This do not allow to use common shard key, except das_id and/or das.expire. But these keys are not used in queries, therefore we cannot benefit from them. Instead we can take slightly different approach and divide DAS db into multiple collections based on data types. For instance, files.cache/file.merge, dataset.cache/dataset.merge, etc. This will allow to use sharding per data collection with well defined shard keys, e.g. file.name, dataset.name.
Details will be the following:
- upon input query create new collection if it does not exists
- invoke admin.command to add new shard key, e.g.
admin = conn.admin
admin.command('shardcollection', 'files.cache', key={'file.name':1}) - ensure other indexes for that collection
- insert data into new collection
from das.
I was able to create dynamically the shards using the following code:
def add_shard(ckey):
"Add shard dynamically"
uris = ['mongodb://vk5.cern.ch:8232']
conn = Connection(host=uris)
dbname, dbcoll = ckey.split('.')
coll = conn[dbname][dbcoll]
admin= conn.admin
print "enablesharding", dbname
print "shardcollection", ckey
admin.command('enablesharding', dbname)
admin.command('shardcollection', ckey, key={ckey:1})
limit= 2
for idx in range(0, limit):
timestamp = time.time() + int(random_index(3600))
value = gen_passwd()
rec = {dbname : {dbcoll: value, 'idx':idx}, 'das': {'expire':timestamp}}
rec.update(dict(das_id=genkey(rec)))
print "rec", rec
coll.ensure_index([('das_id', 1)])
coll.insert(rec)
spec = {"das_id": rec['das_id']}
print coll.find(spec).explain()
add_shard('file.name')
It reproduce real time use case. We get DAS key, e.g. file.name, for this primary key we create a new database collection in shard cluster, with database name as '''file''' and collection name as '''name'''. The shard key become '''file.name''', and we insert records with file.name values into desired shard collection. Here is shard configuration configuration
MongoDB shell version: 1.6.2
connecting to: 127.0.0.1:8232/test
--- Sharding Status ---
sharding version: { "_id" : 1, "version" : 3 }
shards:
{ "_id" : "shard1", "host" : "vk1.cern.ch:8230" }
{ "_id" : "shard2", "host" : "vk2.cern.ch:8230" }
{ "_id" : "shard3", "host" : "vk3.cern.ch:8230" }
databases:
{ "_id" : "admin", "partitioned" : false, "primary" : "config" }
{ "_id" : "das", "partitioned" : true, "primary" : "shard1" }
das.cache chunks:
{ "name" : { $minKey : 1 } } -->> { "name" : { $maxKey : 1 } } on : shard1 { "t" : 1000, "i" : 0 }
{ "_id" : "file", "partitioned" : true, "primary" : "shard2" }
file.name chunks:
{ "file.name" : { $minKey : 1 } } -->> { "file.name" : { $maxKey : 1 } } on : shard2 { "t" : 1000, "i" : 0 }
{ "_id" : "dataset", "partitioned" : true, "primary" : "shard3" }
dataset.name chunks:
{ "dataset.name" : { $minKey : 1 } } -->> { "dataset.name" : { $maxKey : 1 } } on : shard3 { "t" : 1000, "i" : 0 }
It is interesting to observe that primary shard is chosen accordingly among empty shards. Such that dataset comes to shard3 while files to shard3. Now I need a test this with some number of records to see how load will be spread.
from das.
I finished stress test with shard configuration. The file/dataset shards were created dynamically and populated with 3M records. The load has been spread across shards more/less evenly. For instance, here is numbers for file shard: 1125766 records on shard1, 1499670 on shard2 and 374564 on shard3. Here is DB stats:
{ "_id" : "file", "partitioned" : true, "primary" : "shard2" }
file.name chunks:
{ "file.name" : { $minKey : 1 } } -->> { "file.name" : "012SIFOc" } on : shard3 { "t" : 2000, "i" : 0 }
{ "file.name" : "012SIFOc" } -->> { "file.name" : "7lgFtvNZ" } on : shard1 { "t" : 4000, "i" : 0 }
{ "file.name" : "7lgFtvNZ" } -->> { "file.name" : "FX1ghmpa" } on : shard1 { "t" : 5000, "i" : 0 }
{ "file.name" : "FX1ghmpa" } -->> { "file.name" : "NEnJgdBW" } on : shard1 { "t" : 6000, "i" : 0 }
{ "file.name" : "NEnJgdBW" } -->> { "file.name" : "UzOXBFve" } on : shard3 { "t" : 7000, "i" : 0 }
{ "file.name" : "UzOXBFve" } -->> { "file.name" : "cjDzR3qe" } on : shard2 { "t" : 3000, "i" : 33 }
{ "file.name" : "cjDzR3qe" } -->> { "file.name" : "kTHXbNvD" } on : shard2 { "t" : 3000, "i" : 34 }
{ "file.name" : "kTHXbNvD" } -->> { "file.name" : "sEK8cBzt" } on : shard2 { "t" : 3000, "i" : 35 }
{ "file.name" : "sEK8cBzt" } -->> { "file.name" : "zyxXcWvA" } on : shard2 { "t" : 7000, "i" : 1 }
{ "file.name" : "zyxXcWvA" } -->> { "file.name" : { $maxKey : 1 } } on : shard3 { "t" : 3000, "i" : 0 }
All tests used default shard chunk size which is 200MB. This parameter can be tuned during shard configuration.
from das.
valya: To make collection separation, I need to make a pool of rawcaches. Such that DASCore class will replace self.rawcache with hash dict, whose key will be primary_keys, e.g. dataset and values will be Connections to appropriate collections. For example
self.rawcache = {
'dataset': Connection().dataset,
'file': Connection().file,
}
Then each connection database will have two collections, cache and merge. The code should be adjusted to replace DASCore.rawcache with DASCore.rawcache[primary_key].
from das.
valya: I've added a new class to core/das_collection_manager.py who can allocate and return pointer to proper collection based on input query. The idea is to parse input query extract its keys and use them to create a new collection in Mongo. For instance, if user asked for block, the query should be redirected to block.cache, block.merge collection, while if user asked for dataset the query will be redirected to dataset.cache, dataset.merge collection. Now we need to modify das_mongocache.py class to take advantage of that. To do so we need to perform the following actions:
- add new collection manager into DASMongocache, e.g. self.colmgr
- replace all methods using self.col to use self.colmgr.cache(query)
- replace all methods using self.merge to use self.colmgr.merge(query)
- remove/replace self.mdb with appropriate call to proper collection
- change insert_query_record to insert query records into das.cache
- modify clean_cache, delete_cache, map_reduce to loop over all cache/merge collections
Right now I have 3 collections: das.cache, das.merge, das.mapreduce. In new design I will have N collection with DAS key, e.g. block.cache, block.merge, dataset.cache, dataset.merge, which will hold data records (records with actual meta-data which) and I will need another collection to store query records, e.g. das.queries. The das.mapreduce can stay intact since it will only contains map-reduce functions.
All these changes will affect analytics code, since it uses rawcache. Some methods, e.g. find_records, get_superset_keys, similar_queries will require modification to look-up query records from separate collection.
from das.
metson: Replying to [comment:10 valya]:
I've added a new class to core/das_collection_manager.py who can allocate and return pointer to proper collection based on input query. The idea is to parse input query extract its keys and use them to create a new collection in Mongo. For instance, if user asked for block, the query should be redirected to block.cache, block.merge collection, while if user asked for dataset the query will be redirected to dataset.cache, dataset.merge collection.
Dumb question: what happens if a query is across two keys (e.g. locations of blocks in a given primary dataset)?
from das.
valya: Replying to [comment:11 metson]:
Replying to [comment:10 valya]:
I've added a new class to core/das_collection_manager.py who can allocate and return pointer to proper collection based on input query. The idea is to parse input query extract its keys and use them to create a new collection in Mongo. For instance, if user asked for block, the query should be redirected to block.cache, block.merge collection, while if user asked for dataset the query will be redirected to dataset.cache, dataset.merge collection.
Dumb question: what happens if a query is across two keys (e.g. locations of blocks in a given primary dataset)?
In first approximation it would be combined key, e.g.
block=bla site=T1
will resolve into block_site collection. The more accurate approach needs to be determine (nice topic for Dong thesis). The problem here is multidimensional. On one side we need to have a shard key which should be present in collection (definition of shard), on another we would like to share data across collection. This is related to determination of query super/sub-sets.
from das.
liangd: Here is the description of how did I splite the database(as we discussed last week)
Split Database into smaller ones, each of them named by a das entity.
For example:
block dataset=/a/b/c | grep block.name
The result will store in to dataset['run']. And finally retrieve the result from database['run'].
And we could have multiple select entity:
block dataset dataset=/a/b/c, And the result will come from database['block'] and databse['dataset']
Here is the major changes:
0. In order to have a unique query record for manage status, A database['query'] is seprated for this purpose.
- Map the query to matching apis, each apis will write down the records at destinate database according the das.primary_key. A collection_manager Class is managing the loading of database, indexing correct database via given query and primary_key.
- During merging, for we merge records based on das.primary_key, so we merge is happening inside each individual database.
- At get_from_cache steps, a colletion set called DASCollection will be return from collection_manager, which will perform a find/remove/count operation on the collection set. Instead of a pymongo cursor, a cursor made via itertools.chain() will be return.
from das.
liangd: Here is the major change:
- reviewed tools/analytics codes for supporting databases change.
- enable switch between spliting or not in configuration file:
# mongodb configuration
...
config.mongodb.enable_sharding = True
# shard configuration
config.component_('sharding')
config.sharding.querydb = 'query'
config.sharding.querydb_colleciton = 'db'
config.sharding.cachecollection = 'cache'
config.sharding.mergecollection = 'merge'
config.sharding.mrdb = 'mapreduce'
config.sharding.mrcollection = 'db'
from das.
valya: Dong, thanks for update. I'm at CERN now and will only back on Apr 8th. I want to sit down and carefully review your patch so I'll do it once I'll get back. If you'll get anything before that feel free to update the ticket.
from das.
valya: Replying to [comment:15 valya]:
Dong, thanks for update. I'm at CERN now and will only back on Apr 8th. I want to sit down and carefully review your patch so I'll do it once I'll get back. If you'll get anything before that feel free to update the ticket.
Dong it would be nice if you'll write some document (I prefer in doc/sphinx/sharding.rst) which describes what sharding means and how it is implemented. It would be nice if you'll make some pictures as well showing the workflow.
from das.
valya: With new DAS query interface I no longer need to worry about shard keys. All DAS records in both cache and merge collections will have qhash. This will become a shard key. The qhash is a md5 hash of user input query. It is also a request ID returned to clients.
Now the question is how to setup sharding environment in a way to satisfy HTTP sandbox deployment. We need to deploy router node (mongos) on one of the cmsweb back-ends. In addition we need to deploy shard nodes on back-ends. Then setup procedure for shard and config nodes, router node. The router node should be started once all shards in place (need to check that). Then DAS will need to re-connect to shard node for processing requests.
If we deploy DAS+MongoDB on every cmsweb node, then MongoDB will contain all dbs, where only das.cache/das.merge DBs will be sharded. Then we will choose router node. This means we can still deploy DAS+MongoDB which need to be adjusted in order to enable shard configuration. And we will need new app which will deploy router (mongos).
from das.
Related Issues (20)
- pagination links do not remember DBS instance in DAS2GO HOT 3
- Handling of large run range queries HOT 4
- Duplicate entries in JSON output HOT 2
- Perform final migration of DAS python server to DAS go server HOT 3
- Display of Fraction of Datasets with invalid Files HOT 20
- Given a block name and a run number, query for file names HOT 2
- status filter not working HOT 3
- 'plain' option sometimes misbehaves HOT 3
- DAS not found but by "crab status" it exists HOT 2
- Config files not available from the Configs link in web queries HOT 7
- Error="invalid character 'p' after array element" HOT 14
- "DAS query guide" link points to nowhere HOT 1
- finding the file that contains a run,lumi is not working HOT 6
- error=invalid character '<' looking for beginning of value" HOT 2
- for results format = list, total number of entries is not correct HOT 1
- panic: runtime error from dasgoclient HOT 2
- Turn on Rucio requests from DAS by default HOT 1
- Explore data consistency checking between Rucio and PhEDEx HOT 3
- Remove Rucio-Account header from command line version of DAS HOT 2
- Remove PhEDEx from default list of services to query HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from das.