Giter Site home page Giter Site logo

thenativeweb / node-cqrs-domain Goto Github PK

View Code? Open in Web Editor NEW
270.0 19.0 57.0 954 KB

Node-cqrs-domain is a node.js module based on nodeEventStore that. It can be very useful as domain component if you work with (d)ddd, cqrs, eventdenormalizer, host, etc.

Home Page: http://cqrs.js.org/pages/domain.html

License: MIT License

JavaScript 100.00%
cqrs cqrs-framework domain-driven-design domain javascript

node-cqrs-domain's Introduction

⚠️ IMPORTANT NEWS! πŸ“°

I’ve been dealing with CQRS, event-sourcing and DDD long enough now that I don’t need working with it anymore unfortunately, so at least for now this my formal farewell!

I want to thank everyone who has contributed in one way or another. Especially...

  • Jan, who introduced me to this topic.
  • Dimitar, one of the last bigger contributors and maintainer.
  • My last employer, who gave me the possibility to use all these CQRS modules in a big Cloud-System.
  • My family and friends, who very often came up short.

Finally, I would like to thank Golo Roden, who was there very early at the beginning of my CQRS/ES/DDD journey and is now here again to take over these modules.

Golo Roden is the founder, CTO and managing director of the native web, a company specializing in native web technologies. Among other things, he also teaches CQRS/ES/DDD etc. and based on his vast knowledge, he brought wolkenkit to life. wolkenkit is a CQRS and event-sourcing framework based on Node.js. It empowers you to build and run scalable distributed web and cloud services that process and store streams of domain events.

With this step, I can focus more on i18next, locize and localistars. I'm happy about that. 😊

So, there is no end, but the start of a new phase for my CQRS modules. πŸ˜‰

I wish you all good luck on your journey.

Who knows, maybe we'll meet again in a github issue or PR at i18next πŸ˜‰

Adriano Raiano


Introduction

travis npm

Node-cqrs-domain is a node.js module based on node-eventstore. It can be very useful as domain component if you work with (d)ddd, cqrs, eventdenormalizer, host, etc.

Table of Contents

Workflow


        β”‚
       cmd
        β”‚
        ∨
  ╔════════════╗
  β•‘ validation ║─────────> "rejected"
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•
        β”‚
       cmd
        β”‚
        ∨
╔═════════════════════╗
β•‘ pre-load-conditions ║─────> "rejected"
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
        β”‚
       cmd
        β”‚
        ∨
╔════════════════╗
β•‘ pre-conditions ║─────> "rejected"
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
        β”‚
       cmd
        β”‚
        ∨
  ╔════════════╗
  β•‘ handle cmd β•‘
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•
        β”‚
       evt
        β”‚
        ∨
  ╔═══════════╗
  β•‘ apply evt β•‘
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•
        β”‚
        β”‚
        β”‚
        ∨
╔════════════════╗
β•‘ business rules ║─────> "rejected"
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
        β”‚
        β”‚
        β”‚
        ∨
   ╔════════╗
   β•‘ commit β•‘
   β•šβ•β•β•β•β•β•β•β•β•

Installation

npm install cqrs-domain

Usage

var domain = require('cqrs-domain')({
  // the path to the "working directory"
  // can be structured like
  // [set 1](https://github.com/adrai/node-cqrs-domain/tree/master/test/integration/fixture/set1) or
  // [set 2](https://github.com/adrai/node-cqrs-domain/tree/master/test/integration/fixture/set2)
  domainPath: '/path/to/my/files',

  // optional, default is 'commandRejected'
  // will be used if an error occurs and an event should be generated
  commandRejectedEventName: 'rejectedCommand',

  // optional, default is 800
  // if using in scaled systems and not guaranteeing that each command for an aggregate instance
  // dispatches to the same worker process, this module tries to catch the concurrency issues and
  // retries to handle the command after a timeout between 0 and the defined value
  retryOnConcurrencyTimeout: 1000,

  // optional, default is 100
  // global snapshot threshold value for all aggregates
  // defines the amount of loaded events, if there are more events to load, it will do a snapshot, so next loading is faster
  // an individual snapshot threshold defining algorithm can be defined per aggregate (scroll down)
  snapshotThreshold: 1000,

  // optional, default is in-memory
  // currently supports: mongodb, redis, tingodb, azuretable and inmemory
  // hint: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
  eventStore: {
    type: 'mongodb',
    host: 'localhost',                          // optional
    port: 27017,                                // optional
    dbName: 'domain',                           // optional
    eventsCollectionName: 'events',             // optional
    snapshotsCollectionName: 'snapshots',       // optional
    transactionsCollectionName: 'transactions', // optional
    timeout: 10000                              // optional
  // authSource: 'authedicationDatabase',        // optional
    // username: 'technicalDbUser',                // optional
    // password: 'secret'                          // optional
  // url: 'mongodb://user:pass@host:port/db?opts // optional
  },

  // optional, default is in-memory
  // currently supports: mongodb, redis, tingodb, couchdb, azuretable, dynamodb and inmemory
  // hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
  aggregateLock: {
    type: 'redis',
    host: 'localhost',                          // optional
    port: 6379,                                 // optional
    db: 0,                                      // optional
    prefix: 'domain_aggregate_lock',            // optional
    timeout: 10000                              // optional
    // password: 'secret'                          // optional
  },

  // optional, default is not set
  // checks if command was already seen in the last time -> ttl
  // currently supports: mongodb, redis, tingodb and inmemory
  // hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
  deduplication: {
		type: 'redis',
		ttl: 1000 * 60 * 60 * 1, // 1 hour          // optional
		host: 'localhost',                          // optional
		port: 6379,                                 // optional
		db: 0,                                      // optional
		prefix: 'domain_aggregate_lock',            // optional
		timeout: 10000                              // optional
		// password: 'secret'                          // optional
  },

  // optional, default false
  // resolves valid file types from loader extensions instead of default values while parsing definition files
  useLoaderExtensions: true
});

Using factory methods for event store or / and aggregate lock in domain definition

You can replace the framework-provided implementation of event store or / and aggregate lock with the one of your own. To do that, you need to include a factory method in the options object passed to the domain constructor. Using the factory methods, the example above might become:

var myGetEventStore = require('./getEventStore.js');
var myLock = require('./myLock.js');

var domain = require('cqrs-domain')({
  domainPath: '/path/to/my/files',
  commandRejectedEventName: 'rejectedCommand',
  retryOnConcurrencyTimeout: 1000,
  snapshotThreshold: 1000,

  eventStore: function () {
    return myGetEventStore({
      host: '127.0.0.1',
      port: 2113,
      username: 'admin',
      password: 'changeit'
    });
  },

  aggregateLock: : function () {
    return myLock({
       // ....
    });
  },

  deduplication: : function () {
		return myCommandBumper({
		   // ....
		});
  }
});

When using factory methods, the objects they return are required to implement the following public interfaces:

Event Store:

  f:  init(function(err));
  f:  getNewId(function (err, id));
  f:  on(evtName, function (err));
  f:  getFromSnapshot(query, revMax, function(err, snapshot, stream));
  f:  createSnapshot(obj, function (err));
  f:  setEventToDispatched(evt, function (err));

Event Stream (returned by getFromSnapshot through the callback):

  p:  events
  p:  lastRevision
  p:  eventsToDispatch
  f:  addEvents(evts)
  f:  commit(function (err, stream));

Aggregate Lock:

  f: connect(function(err, lock))
  f: disconnect(function(err))
  f: getNewId(function(err, id))
  f: reserve(workerId, aggregateId, function(err))
  f: getAll(aggregateId, function(err, workerIds))
  f: resolve(aggregateId, function(err))

Command Bumper:

  f: connect(function(err, lock))
  f: disconnect(function(err))
  f: getNewId(function(err, id))
  f: add(key, ttl, function(err, added))

where:

  f: function
  p: property

Using custom structure loader function

You can also replace the built-in structure loader with one that suits your needs. To do that, you need to include a loading method in the options object passed to the domain constructor.

// options will contain a the domainPath, validatorExtension, and useLoaderExtensions options passed to the constructor
// as well as a definition object containing all the constructors of the domain components  ( Context, Aggregate etc. ) and error constructors ( inside errors )
function myCustomLoader(options) {
	return {
		myContext:  new Context({ name: 'myContext' }).addAggregate(new Aggregate({ name : 'agg' }, function(){}).addCommand({ name: 'cmd' }, function(){}))
	}
	// or, more probably 
	return myExternalCoolLoader(options.domainPath, options.definitions);
}
// or async
function myCustomLoaderAsync(options, callback) {
	callback(null, myExternalCoolLoader(options.domainPath, options.definitions));
}

var domain = require('cqrs-domain')({
  domainPath: '/path/to/my/files',
	structureLoader: myCustomLoader
});

Exposed errors

You can use this for example in you custom command handlers.

require('cqrs-domain').errors.ValidationError
require('cqrs-domain').errors.BusinessRuleError
require('cqrs-domain').errors.AggregateDestroyedError
require('cqrs-domain').errors.AggregateConcurrencyError
require('cqrs-domain').errors.ConcurrencyError
require('cqrs-domain').errors.DuplicateCommandError

Catch connect and disconnect events

// eventStore
domain.eventStore.on('connect', function() {
  console.log('eventStore connected');
});

domain.eventStore.on('disconnect', function() {
  console.log('eventStore disconnected');
});

// aggregateLock
domain.aggregateLock.on('connect', function() {
  console.log('aggregateLock connected');
});

domain.aggregateLock.on('disconnect', function() {
  console.log('aggregateLock disconnected');
});

// commandBumper
domain.commandBumper.on('connect', function() {
  console.log('commandBumper connected');
});

domain.commandBumper.on('disconnect', function() {
  console.log('commandBumper disconnected');
});


// anything (eventStore or aggregateLock or commandBumper)
domain.on('connect', function() {
  console.log('something connected');
});

domain.on('disconnect', function() {
  console.log('something disconnected');
});

Define the command structure

The values describes the path to that property in the command message.

domain.defineCommand({
  // optional, default is 'id'
  id: 'id',

  // optional, default is 'name'
  name: 'name',

  // optional, default is 'aggregate.id'
  // if an aggregate id is not defined in the command, the command handler will create a new aggregate instance
  aggregateId: 'aggregate.id',

  // optional, only makes sense if contexts are defined in the 'domainPath' structure
  context: 'context.name',

  // optional, only makes sense if aggregates with names are defined in the 'domainPath' structure
  aggregate: 'aggregate.name',

  // optional, but recommended
  payload: 'payload',

  // optional, if defined the command handler will check if the command can be handled
  // if you want the command to be handled in a secure/transactional way pass a revision value that matches the current aggregate revision
  revision: 'revision',

  // optional, if defined the command handler will search for a handle that matches command name and version number
  version: 'version',

  // optional, if defined theses values will be copied to the event (can be used to transport information like userId, etc..)
  meta: 'meta'
});

Define the event structure

The values describes the path to that property in the event message.

domain.defineEvent({
  // optional, default is 'correlationId'
  // will use the command id as correlationId, so you can match it in the sender
  correlationId: 'correlationId',

  // optional, default is 'id'
  id: 'id',

  // optional, default is 'name'
  name: 'name',

  // optional, default is 'aggregate.id'
  aggregateId: 'aggregate.id',

  // optional, only makes sense if contexts are defined in the 'domainPath' structure
  context: 'context.name',

  // optional, only makes sense if aggregates with names are defined in the 'domainPath' structure
  aggregate: 'aggregate.name',

  // optional, default is 'payload'
  payload: 'payload',

  // optional, default is 'revision'
  // will represent the aggregate revision, can be used in next command
  revision: 'revision',

  // optional
  version: 'version',

  // optional, if defined the values of the command will be copied to the event (can be used to transport information like userId, etc..)
  meta: 'meta',

  // optional, if defined the commit date of the eventstore will be saved in this value
  commitStamp: 'commitStamp',

  // optional, if defined and the eventstore db implemntation supports this the position of the event in the eventstore will be saved in this value
  position: 'position'
});

Define the id generator function [optional]

you can define a synchronous function

domain.idGenerator(function () {
  var id = require('uuid').v4().toString();
  return id;
});

or you can define an asynchronous function

domain.idGenerator(function (callback) {
  setTimeout(function () {
    var id = require('uuid').v4().toString();
    callback(null, id);
  }, 50);
});

Define the aggregate id generator function [optional]

you can define a synchronous function

domain.aggregateIdGenerator(function () {
  var id = require('uuid').v4().toString();
  return id;
});

or you can define an asynchronous function

domain.aggregateIdGenerator(function (callback) {
  setTimeout(function () {
    var id = require('uuid').v4().toString();
    callback(null, id);
  }, 50);
});

Wire up events [optional]

you can define a synchronous function

// pass events to bus
domain.onEvent(function (evt) {
  bus.emit('event', evt);
});

or you can define an asynchronous function

// pass events to bus
domain.onEvent(function (evt, callback) {
  bus.emit('event', evt, function ack () {
    callback();
  });
});

Initialization

domain.init(function (err, warnings) {
  // this callback is called when all is ready...
  // warnings: if no warnings warnings is null, else it's an array containing errors during require of files
});

// or

domain.init(); // callback is optional

Handling a command

domain.handle({
  id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
  name: 'enterNewPerson',
  aggregate: {
    id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
    name: 'person'
  },
  context: {
    name: 'hr'
  },
  payload: {
    firstname: 'Jack',
    lastname: 'Huston'
  },
  revision: 0,
  version: 1,
  meta: {
    userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
  }
}); // callback is optional

or

domain.handle({
  id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
  name: 'renamePerson',
  aggregate: {
    id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
    name: 'person'
  },
  context: {
    name: 'hr'
  },
  payload: {
    firstname: 'Jack',
    lastname: 'Huston'
  },
  revision: 0,
  version: 1,
  meta: {
    userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
  }
}, function (err) {
  // this callback is called when command is handled successfully or unsuccessfully
  // err can be of type:
  // - null
  // - Error
  //   {
  //     name: 'Error',
  //     message: 'optional message'
  //   }
  // - ValidationError
  //   {
  //     name: 'ValidationError',
  //     message: 'some message',
  //     more: { /* more infos */ }
  //   }
  // - BusinessRuleError
  //   {
  //     name: 'BusinessRuleError',
  //     message: 'some message',
  //     more: { /* more infos */ }
  //   }
  // - AggregateDestroyedError
  //   {
  //     name: 'AggregateDestroyedError',
  //     message: 'Aggregate has already been destroyed!',
  //     more: {
  //       aggregateId: 'ad10d2c0-6509-4cb0-86d2-76312d930001',
  //       aggregateRevision: 6
  //     }
  //   }
  // - AggregateConcurrencyError
  //   {
  //     name: 'AggregateConcurrencyError',
  //     message: 'Actual revision in command is "3", but expected is "2"!',
  //     more: {
  //       aggregateId: 'ad10d2c0-6509-4cb0-86d2-76312d930001',
  //       aggregateRevision: 2,
  //       commandRevision: 3
  //     }
  //   }
});

more infos, can be useful if testing

domain.handle({
  id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
  name: 'renamePerson',
  aggregate: {
    id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
    name: 'person'
  },
  context: {
    name: 'hr'
  },
  payload: {
    firstname: 'Jack',
    lastname: 'Huston'
  },
  revision: 0,
  version: 1,
  meta: {
    userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
  }
}, function (err, events, aggregateData, metaInfos) {
  // this callback is called when command is handled successfully or unsuccessfully
  // err: is the same as described before

  // events: same as passed in 'onEvent' function
  // events: in case of no error here is the array of all events that should be published
  // events: in case of error are the one of these Errors (ValidationError, BusinessRuleError, AggregateDestroyedError, AggregateConcurrencyError)
  // converted in an event with the event name defined in the options (default is 'commandRejected')

  // aggregateData: represents the aggregateData after applying the resulting events

  // metaInfos: { aggregateId: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70', aggregate: 'person', context: 'context' }
});

Request domain information

After the initialization you can request the domain information:

domain.init(function (err) {
  domain.getInfo();
  // ==>
  // { contexts: [
  //   {
  //      "name": "hr",
  //      "aggregates": [
  //        {
  //          "name": "person",
  //          "version": 3,
  //          "commands": [
  //            {
  //              "name": "enterNewPerson",
  //              "version": 0,
  //							"preconditions": [...]
  //            },
  //            {
  //              "name": "unregisterAllContactInformation",
  //              "version": 2,
  //							"preconditions": [...]
  //            },
  //            {
  //              "name": "unregisterAllContactInformation",
  //              "version": 1,
  //							"preconditions": [...]
  //            }
  //          ],
  //          "events": [
  //            {
  //              "name": "enteredNewPerson",
  //              "version": 3
  //            },
  //            {
  //              "name": "enteredNewPerson",
  //              "version": 0
  //            },
  //            {
  //              "name": "enteredNewPerson",
  //              "version": 2
  //            },
  //            {
  //              "name": "unregisteredEMailAddress",
  //              "version": 0
  //            },
  //            {
  //              "name": "unregisteredPhoneNumber",
  //              "version": 0
  //            }
  //          ],
  //          "businessRules": [
  //            {
  //              "name": "atLeast1EMail",
  //              "description": "at least one character should be in email address"
  //            },
  //            {
  //              "name": "nameEquality",
  //              "description": "firstname should never be equal lastname"
  //            }
  //          ]
  //        }
  //      ]
  //   }
  //]}
});

Components definition

Context

module.exports = require('cqrs-domain').defineContext({
  // optional, default is the directory name
  name: 'hr'
});

Externally loaded context ( self-loaded )

A special option to define a context with all its aggregates, commands, events and rules exists by adding the externallyLoaded option to the context :

module.exports = require('cqrs-domain').defineContext({
  // optional, default is the directory name
  name: 'hr',
  externallyLoaded: true
});

When doing so the context will be added 'as-is' to the domain, this means it won't go trough the normal tree loading and parsing process.
This option is aimed mainly at plugin developers, as it leaves the responsibility of structuring the domain right in the hand of the one defining the context ( most-probably a plug-in ).

Aggregate

module.exports = require('cqrs-domain').defineAggregate({
  // optional, default is last part of path name
  name: 'person',

  // optional, default 0
  version: 3,

  // optional, default ''
  defaultCommandPayload: 'payload',

  // optional, default ''
  defaultEventPayload: 'payload',

  // optional, default ''
  defaultPreConditionPayload: 'payload',

  // optional, default false
  // by skipping the history, only the last event will be loaded and defaultly not applyed (just to ensure the revision number increment)
  skipHistory: true,

  // optional, default false
  // only optionally needed when skipHistory is set to true, only the last event will be loaded and applyed
  applyLastEvent: true,

  // optional, default false
  // will publish the events but will not commit them to the eventstore
  disablePersistence: false
},

// optionally, define some initialization data...
{
  emails: ['[email protected]'],
  phoneNumbers: []
})

// optionally, define snapshot need algorithm...
.defineSnapshotNeed(function (loadingTime, events, aggregateData) {
  // loadingTime is the loading time in ms of the eventstore data
  // events are all loaded events in an array
  // aggregateData represents the aggregateData after applying the resulting events
  return events.length >= 200;
})

// optionally, define if snapshot should be ignored
// if true, the whole event stream will be loaded
.defineIgnoreSnapshot({
  version: 0
}, function (data) {
  return true;
})
//.defineIgnoreSnapshot({
//  version: 0
//}, true)
//.defineIgnoreSnapshot({
//  version: 0
//}) // default true

// optionally, define conversion algorithm for older snapshots
// always convert directly to newest version...
// when loaded a snapshot and it's an older snapshot, a new snapshot with same revision but with newer aggregate version will be created
.defineSnapshotConversion({
  version: 1
}, function (data, aggregate) {
  // data is the snapshot data
  // aggregate is the aggregate object

  aggregate.set('emails', data.emails);
  aggregate.set('phoneNumbers', data.phoneNumbers);

  var names = data.name.split(' ');
  aggregate.set('firstname', names[0]);
  aggregate.set('lastname', names[1]);
})
// optionally, define committingSnapshotTransformer (i.e. for GDPR: to encrypt data in storage)
.defineCommittingSnapshotTransformer({
  version: 1
}, function (data) {
  // data is the snapshot data
  data.firstname = encrypt(data.firstname);
return data;
})
// or async
.defineCommittingSnapshotTransformer({
  version: 1
}, function (data, callback) {
  // data is the snapshot data
  encrypt(data.firstname, function (err, encrypted) {
  data.firstname = encrypted;
  callback(err, data);
});
})
// optionally, define loadingSnapshotTransformer (i.e. for GDPR: to decrypt stored data)
.defineLoadingSnapshotTransformer({
  version: 1
}, function (data) {
  // data is the snapshot data
  data.firstname = decrypt(data.firstname);
return data;
})
// or async
.defineLoadingSnapshotTransformer({
  version: 1
}, function (data, callback) {
  // data is the snapshot data
  decrypt(data.firstname, function (err, decrypted) {
  data.firstname = decrypted;
  callback(err, data);
});
})
// optionally, define idGenerator function for new aggregate ids
// sync
.defineAggregateIdGenerator(function () {
  return require('uuid').v4().toString();
});
// or async
.defineAggregateIdGenerator(function (callback) {
  setTimeout(function () {
    var id = require('uuid').v4().toString();
    callback(null, id);
  }, 50);
})
// optionally, define idGenerator function for new aggregate ids that are command aware
// if you define it that way, the normal defineAggregateIdGenerator function will be replaced
// sync
.defineCommandAwareAggregateIdGenerator(function (cmd) {
  return cmd.id + require('uuid').v4().toString();
});
// or async
.defineCommandAwareAggregateIdGenerator(function (cmd, callback) {
  setTimeout(function () {
    var id = cmd.id + require('uuid').v4().toString();
    callback(null, id);
  }, 50);
});

Command validation

All command schemas are json schemas. Hint http://jsonary.com/documentation/json-schema/

Internally the tv4 module is used for validation. Additionaly you can extend the tv4 instance with other functionality like tv4-formats, so you can easily use format constraints (i.e. 'email') for your 'string'-types. To extend tv4 just catch the validator before having initialized the domain:

domain.extendValidator(function (validator) {

  // own formats
  validator.addFormat(require('tv4-formats'));
  validator.addFormat('mySpecialFormat', function (data) {
    if (data === 'special') {
      return null;
    }
    return 'wrong format for special';
  });

  // or other schemas
  validator.addSchema({ 'mySharedSchema': { /* the schema json */ } });
  validator.addSchema('myOtherSharedSchema', { /* the schema json */ });

  // or replace the core valitator
  validator.validator(function (options, schema) {
    // options.schemas => all schemas
    // options.formats => all formats

    // sync        
    return function (cmdDataToValidate) {
      if (everythingIsOk) {
        return null;
      } else {
        return new require('cqrs-domain').errors.ValidationError('command not valid', { 'because': 'of this' });
      }
    };
    // or async
    return function (cmdDataToValidate, callback) {
      externalAsyncValidator(cmdDataToValidate, function(errors){
        if (!error) {
            callback();
        } else {
            callback(new require('cqrs-domain').errors.ValidationError('command not valid', { 'because': 'of this' }));
        }
      });
    };

  });

});

Each command schema title should match the command name. Example: enterNewPerson.json

To support multiple versions look at: unregisterAllContactInformation.json

or: unregisterAllContactInformation_v1.json unregisterAllContactInformation_v2.json

You can also have an hierarchical command extension look at:

Pre-Load-Condition

Can be used to perform some business rules before handling the command. Contrary to Pre-Conditions, these rules are applied BEFORE the aggregate is loaded.

This allows you to (for example) run checks against external information by using closures.

Tip: Pre-load conditions are especially useful when you have checks that you want to run on an aggregate, but when it is OK for those checks to run on potentially stale data (eg a view model). By doing these checks before the aggregate is locked, you avoid creating a locking bottleneck at the aggregate level, and can keep your aggregate smaller because the information for those checks is externalized to the domain. This helps for performance if the domain you are in is performance critical, and helps you keep focus on the real, strongly consistent invariants in your domain.

A Command can have multiple pre-load-conditions.

var externalDataLoader = require('some_file');

module.exports = require('cqrs-domain').definePreLoadCondition({
  // the command name
  // optional, default is file name without extension,
  // if name is '' it will handle all commands that matches the appropriate aggregate
  // if name is an array of strings it will handle all commands that matches the appropriate name
  name: 'checkSomeViewModel',

  // optional, default 0
  version: 2,

  // optional, if not defined it will use what is defined as default in aggregate or pass the whole command
  payload: 'payload',

  // optional
  description: 'firstname should always be set',

  // optional, default Infinity, all pre-conditions will be sorted by this value
  priority: 1
}, function (data, callback) {
  // data is the command data
  // callback is optional, if not defined as function argument you can throw errors or return errors here (sync way)

  if (externalDataLoader.get(data.id) !== data.value) {
    return callback('not allowed');
    // or
    // return callback(new Error('not allowed'));
    // or
    // return callback(new Error()); // if no error message is defined then the description will be taken
    // or
    // return callback(new require('cqrs-domain').BusinessRuleError('not allowed', { /* more infos */ }));
  }

  callback(null);

  // or if callback is not defined as function argument
  // if (externalDataLoader.get(data.id) !== data.value)
//   return 'not allowed';
//   // or
//   // return new Error('not allowed');
//   // or
//   // return new Error(); // if no error message is defined then the description will be taken
//   // or
//   // throw new Error(); // if no error message is defined then the description will be taken
//   // or
//   // throw new Error('not allowed');
//   // or
//   // throw new require('cqrs-domain').BusinessRuleError('not allowed', { /* more infos */ });
// }
});

Pre-Condition

Can be used to perform some business rules before handling the command. The aggregate is locked and loaded before the pre-condition is applied.

A Command can have multiple pre-conditions.

module.exports = require('cqrs-domain').definePreCondition({
  // the command name
  // optional, default is file name without extension,
  // if name is '' it will handle all commands that matches the appropriate aggregate
  // if name is an array of strings it will handle all commands that matches the appropriate name
  name: 'unregisterAllContactInformation',

  // optional, default 0
  version: 2,

  // optional, if not defined it will use what is defined as default in aggregate or pass the whole command
  payload: 'payload',

  // optional
  description: 'firstname should always be set',

  // optional, default Infinity, all pre-conditions will be sorted by this value
  priority: 1
}, function (data, aggregate, callback) {
  // data is the command data
  // aggregate is the aggregate object
  // callback is optional, if not defined as function argument you can throw errors or return errors here (sync way)

  if (!agg.has('firstname')) {
    return callback('not personalized');
    // or
    // return callback(new Error('not personalized'));
    // or
    // return callback(new Error()); // if no error message is defined then the description will be taken
    // or
    // return callback(new require('cqrs-domain').BusinessRuleError('not personalized', { /* more infos */ }));
  }
  callback(null);

  // or if callback is not defined as function argument
  // if (!agg.has('firstname')) {
//   return 'not personalized';
//   // or
//   // return new Error('not personalized');
//   // or
//   // return new Error(); // if no error message is defined then the description will be taken
//   // or
//   // throw new Error(); // if no error message is defined then the description will be taken
//   // or
//   // throw new Error('not personalized');
//   // or
//   // throw new require('cqrs-domain').BusinessRuleError('not personalized', { /* more infos */ });
// }
});

Command

Collect all needed infos from aggregate to generate your event(s).

Move checks out of here, the correct places are "business rules", "pre-conditions" or "pre-load consitions"!

Do NOT manipulate the aggregate here!

module.exports = require('cqrs-domain').defineCommand({
  // optional, default is file name without extension
  name: 'enterNewPerson',

  // optional, default 0
  version: 1,

  // optional, if not defined it will use what is defined as default in aggregate or pass the whole command
  payload: 'payload',

  // optional, default undefined
  // if true, ensures the aggregate to exists already before this command was handled
  // if false, ensures the aggregate to not exists already before this command was handled
  existing: true
}, function (data, aggregate) {
  // data is the command data
  // aggregate is the aggregate object

  // if (aggregate.get('someAttr') === 'someValue' && aggregate.has('special')) { ... }

  aggregate.apply('enteredNewPerson', data);
  // or
  // aggregate.apply('enteredNewPerson', data, version);
  // or
  // aggregate.apply({
  //   event: 'enteredNewPerson',
  //   payload: data
  // });
})

// if defined it will load all the requested event streams
// useful if making bigger redesigns in domain and you need to handle a command on a new aggregate
.defineEventStreamsToLoad(function (cmd) {
  return [{ // order is new to old
    context: 'hr',
    aggregate: 'mails',
    aggregateId: cmd.meta.newAggId
  },{
    context: 'hr',
    aggregate: 'persons',
    aggregateId: cmd.aggregate.id
  }];
});

Event

This is the place where you should manipulate your aggregate.

module.exports = require('cqrs-domain').defineEvent({
  // optional, default is file name without extension
  name: 'enteredNewPerson',

  // optional, default 0
  version: 3,

  // optional, if not defined it will use what is defined as default in aggregate or pass the whole event...
  payload: 'payload'
},
// passing a function is optional
function (data, aggregate) {
  // data is the event data
  // aggregate is the aggregate object

  aggregate.set('firstname', data.firstname);
  aggregate.set('lastname', data.lastname);
  // or
  // aggregate.set(data);
});

Business Rule

module.exports = require('cqrs-domain').defineBusinessRule({
  // optional, default is file name without extension
  name: 'nameEquality',

  // optional
  description: 'firstname should never be equal lastname',

  // optional, default Infinity, all business rules will be sorted by this value
  priority: 1
}, function (changed, previous, events, command, callback) {
  // changed is the new aggregate object
  // previous is the old aggregate object
  // events is the array with the resulting events
  // command the handling command
  // callback is optional, if not defined as function argument you can throw errors or return errors here (sync way)

  if (changed.get('firstname') === changed.get('lastname')) {
    return callback('names not valid');
    // or
    // return callback(new Error('names not valid'));
    // or
    // return callback(new Error()); // if no error message is defined then the description will be taken
    // or
    // return callback(new require('cqrs-domain').BusinessRuleError('names not valid', { /* more infos */ }));
  }
  callback(null);

  // or if callback is not defined as function argument
  // if (changed.get('firstname') === changed.get('lastname')) {
//   return 'names not valid';
//   // or
//   // return new Error('names not valid');
//   // or
//   // return new Error(); // if no error message is defined then the description will be taken
//   // or
//   // throw new Error(); // if no error message is defined then the description will be taken
//   // or
//   // throw new Error('names not valid');
//   // or
//   // throw new require('cqrs-domain').BusinessRuleError('names not valid', { /* more infos */ });
// }
});

EventTransformer

i.e. useful for GDPR relevant data... to have your data encrypted in the eventstore

// i.e. encrypt module.exports = require('cqrs-domain').defineCommittingEventTransformer({ // optional, default is file name without extension name: 'enteredNewPerson',

  // optional, default 0
  version: 3
},
// passing a function is optional
function (evt) {
  evt.payload.firstname = encrypt(evt.payload.firstname);
return evt;
});

// or async module.exports = require('cqrs-domain').defineCommittingEventTransformer({ // optional, default is file name without extension name: 'enteredNewPerson',

  // optional, default 0
  version: 3
},
// passing a function is optional
function (evt, callback) {
  encrypt(evt.payload.firstname, function (err, encrypted) {
  evt.payload.firstname = encrypted;
  callback(err, evt);
});
});

// i.e decrypt module.exports = require('cqrs-domain').defineLoadingEventTransformer({ // optional, default is file name without extension name: 'enteredNewPerson',

  // optional, default 0
  version: 3
},
// passing a function is optional
function (evt) {
  evt.payload.firstname = decrypt(evt.payload.firstname);
return evt;
});

// or async module.exports = require('cqrs-domain').defineLoadingEventTransformer({ // optional, default is file name without extension name: 'enteredNewPerson',

  // optional, default 0
  version: 3
},
// passing a function is optional
function (evt, callback) {
  decrypt(evt.payload.firstname, function (err, decrypted) {
  evt.payload.firstname = decrypted;
  callback(err, evt);
});
});

Command Handler (Be careful!!!)

Is your use case not solvable without a custom command handling? Sagas? Micro-Services?

module.exports = require('cqrs-domain').defineCommandHandler({
  // optional, default is file name without extension
  name: 'enterNewSpecialPerson',

  // optional, default 0
  version: 1,

  // optional, if not defined it will pass the whole command...
  payload: 'payload'
}, function (aggId, cmd, commandHandler, callback) {
  // aggId is the aggregate id
  // cmd is the command data

  commandHandler.loadAggregate(aggId, function (err, aggregate, stream) {
    if (err) {
      return callback(err);
    }

    callback(null, [{ my: 'special', ev: 'ent' }]);

//    // check if destroyed, check revision, validate command
//    var err = commandHandler.verifyAggregate(aggregate, cmd);
//    if (err) {
//      return callback(err);
//    }
//
//    // call api or emit a command or whatever...
//    // and at the end perhaps you call: commandHandler.handle(cmd, callback);
  });
});

ES6 default exports

Importing ES6 style default exports is supported for all definitions where you also use module.exports:

module.exports = defineContext({...});

works as well as

exports.default = defineContext({...});

as well as (must be transpiled by babel or tsc to be runnable in node)

export default defineContext({...});

Also:

exports.default = defineAggregate({...});
exports.default = defineCommand({...});
exports.default = defineEvent({...});
// etc...

Exports other than the default export are then ignored by this package's structure loader.

Release notes

License

Copyright (c) 2018 Adriano Raiano

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

node-cqrs-domain's People

Contributors

adrai avatar alemhnan avatar bikerp avatar boekkooi avatar dependabot[bot] avatar emmkong avatar gabriella-bankova avatar glockenbeat avatar goloroden avatar irt-fbachmann avatar jwoudenberg avatar marcbachmann avatar nanov avatar orh avatar repkins avatar sbiaudet avatar sdaves avatar ssipos90 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-cqrs-domain's Issues

Timing issue on init for mongo eventstore when forkEventDispatching is false

Hi, how's it going?

I was getting an initialization error when using a mongo eventstore and using the new non-forked event dispatcher. It seems that down in eventStore.js this code:

this.dispatcher = new EventDispatcher(this.options);
this.dispatcher.useLogger(this.logger)
                    .usePublisher(this.publisher)
                    .useStorage(this.storage)
                    .start();

was getting called before the mongo initialization had finished so the 'events' member,etc didn't exist. I've confirmed this with a quick hack:

var storage = this.storage;
async.until(
            function(){
                return storage.isConnected
            },
            function(){
                setTimeout(console.log('waiting...'),1000);
            },
            function(){
                this.dispatcher = new EventDispatcher(this.options);
                this.dispatcher.useLogger(this.logger)
                    .usePublisher(this.publisher)
                    .useStorage(this.storage)
                    .start();
            });

the above code seems to fix the problem. though of course it's not that robust yet ;) I can try patching it myself if you'd like

Redis module for command bumper has wrong default prefix

Tiny bug. While traversing your code I visually spotted that the default prefix in the command bumper for redis is still set to "aggregatelock". You probably want something like "commandbumper" instead.

(Off-topic: Aside from that, GREAT work on all of your cqrs-related modules, @adrai! For broad acceptance and understanding you'll need plenty more documentation, but it's getting there. I would suggest making some visuals instead of just text, to more easily spot how everything interacts. I can try to help out at some point when I can.)

Merge commandHandlers to single file

Is there a way to merge these command handlers :

SalesCommandHandler.js
module.exports = commandHandlerBase.extend({
aggregate: 'SalesAggregate',
commands: ['CreateSale', 'UpdateSale', 'DeleteSale' ]
});

JournalCommandHandler.js
module.exports = commandHandlerBase.extend({
aggregate: 'JournalAggregate',
commands: ['CreateJournal']
});

Can I merge them into single file?

Communicating between bounded contexts

Hey @adrai,

I would love your input on this. It regularly happens that we need to communicate between several bounded contexts at times. Ie, a change occurs in BC1 and we want BC2 to react or update some data in response to that. Let's assume it is OK if this update is eventually consistent (we are in a microservice situation). Creating one large bounded context or aggregate is not really an option in this case.

Since node-cqrs-domain does not seem to have a direct way of listening for / reacting to events originating from outside the domain, how would you organise this using your packages?

Some potential solutions I can imagine:

  • using saga's to convert such events to commands: seems a little hackey since saga's are more intended for long lived processes
  • using a hacked together custom command handler. Inevitably dirty.
  • writing a custom converter which we hook up to node-cqrs-domain and which converts the events to commands

I'm probably missing a few ideas, which is why I'm reaching out. What would you recommend?

Thanks again for the great work!
Hilke

Delayed business rules validation

I'd like to have a delayed business rules validation. In fact, if business rules failed i'd like to have only a notification/event without rejected error.

Your feeling ?

What are best practice to calculate total property of an aggregate ?

With previous version of cqrs-domain we recalculate lines total in an invoice aggregate. With new version we wrote this code. But when we read lines property for aggregate, invoiceLineAdded was not played and lines is empty. So how do you do with this situation ?

module.exports = require('cqrs-domain').defineCommand({
name: 'addInvoiceLine',
payload: 'payload'
}, function (data, aggregate) {
aggregate.apply('invoiceLineAdded', data);
updateTotal(aggregate);
});

function updateTotal(aggregate) {
var lines = aggregate.get('lines');
total = // calculate total .....
aggregate.apply('invoiceTotalUpdated', total);
};

Feature req: More flexible mapping: command <-> pre-condition

Instead of just being able to map command and pre-condition 1:1 or a pre-condition to all commands of an aggregate, I'd need to be able to map commands and pre-conditions more flexibly.

Current situation:
I've got two commands depending on the same pre-condition. Both belonging to the same aggregate, but these are not the only commands of the aggregate.

Suggestion:

module.exports = require('cqrs-domain').defineCommand({
  preConditions: ['myPreCondition']
}, function(data, aggregate) {
  ...
});

Using 'instanceof' to check for the given type

Adriano,

Please refer to: thenativeweb/node-eventstore#28

cqrs-domain has the similar issue, where type check using instanceof would fail if the object instance has been instantiated by a version of cqrs-domain other than the one that actually performs the check.

See, for example, https://github.com/adrai/node-cqrs-domain/blob/master/lib/structure/structureLoader.js#L21 and below:

function isContext (item) {
  if (item.fileType !== 'js') {
    return false;
  }

  return item.value instanceof Context;
}

...

In my case, I have a module A that requires another module B, which in turn requires cqrs-domain.

  • A
    • cqrs-domain (1)
    • B
      • cqrs-domain (2)

For testing purposes A defines the domain tree and so also needs to require cqrs-domain. Sure enough, the structureLoader fails to load anything but *.json files.

Would this fix the problem?

...
return item.value.constructor.name === 'Context'

Note: the problem is not limited to structureLoader, of course.

-Leo

Validation rule doesn't work

Hi,

I'm trying to use the validation rule to validate the commands but it seems that the json files (schemas) are not used. Maybe I forgot something...

Thanks

SagaHandler throwing error

I am creating poc with a simple sample. We intend to use this for our new project.

I am facing an issue in creating the Saga. It throws error from SagaHandlerBase.

Here is my sagaHandler code. This handler lies in sagaHandlers folder.

var EventEmitter = require('events').EventEmitter,
sagaHandlerBase = require('cqrs-domain').sagaHandlerBase;

var stream = new EventEmitter();
module.exports = sagaHandlerBase.extend({
events: ['SaleCreated'],
saga: 'overridden load!'
});

Saga code is in sagas folder and it is :

var EventEmitter = require('events').EventEmitter;
var sagaBase = require('cqrs-domain').sagaBase;

var commandEmitter = new EventEmitter();

module.exports = sagaBase.extend({
SaleCreated: function(evt) {
console.log( 'SaleSaga : ' + evt.id);
var cmd = new ({Id : evt.Id, AccountName : evt.CustomerName});
var command = {id : evt.Id, payload:cmd, command : 'CreateJournal'};
this.sendCommand(command);
commandEmitter.emit('done');
}
});

Here is error callstack :

D:\BizOS\BizosNodeCQRS\node_modules\cqrs-domain\lib\bases\sagaHandlerBase.js:91
saga = new this.Saga(id);
^
TypeError: undefined is not a function
at Object.SagaHandler.loadSaga (D:\BizOS\BizosNodeCQRS\node_modules\cqrs-domain\lib\bases\sagaHandlerBase.js:91:20)

at SagaHandler.defaultHandle (D:\BizOS\BizosNodeCQRS\node_modules\cqrs-domain\lib\bases\sagaHandlerBase.js:40:22)

at async.iterator.fn (D:\BizOS\BizosNodeCQRS\node_modules\async\lib\async.js:517:34)

at async.waterfall.wrapIterator (D:\BizOS\BizosNodeCQRS\node_modules\async\lib\async.js:441:34)

at process.startup.processNextTick.process._tickCallback (node.js:244:9)

domain: SaleCreated

I am unable to debug this in WebStorm. It faces the same issue as events. It runs through SagaHandler in run mode but not in debug mode.

Thanks,

Bhoomi.

Incrementing aggregate revisions

Hi,
I'm trying to figure out where, how the aggregate revision gets incremented. I've created a simple Aggregate, and I have some 'mutation' events that I send to it. When I send several, the revision is always the same. However, if I rerun the process it gets incremented. I just can't figure out exactly where, how this is happening in looking at the aggregateBase.js and of course i'd like the revision updated with any changes being applied

domain.on('event'..) not called if eventStore configured

Hi, have been playing with your library, good stuff! I've noticed that if an eventStore is configured for the domain, the callback for domain.on('event'..) never gets called. Is this the desired behavior? I'm still getting familiar with CQRS in general, but I was thinking that the store(if configured) would just serve as a eventlog, but the .on() would always be called to support event subscribers (that populate the 'read side') etc, etc.

need eventbus

i think need eventbus , manage events and restore / retry.

how to deal with side effects?

Is there a best practice on how to deal with side effects (e.g. email sending, external API calls)? If implemented in events, a replay of events would re-trigger them, which is usually not what you want.

How do business rules work?

I'm trying to understand this as I see the definition API, but I can't figure out how to associate the rule with a given aggregate type

Recommended strategy for disconnects

node-cqrs-domain has various events that designate disconnections in the event store, command bumper and aggregate lock.

In practice, we are using mongo as an event store. In this case, we need to handle all disconnections, since ignoring them can cause the node-cqrs-domain to hang; the backing mongo driver will not call back at all when the connection fails.

What would be your recommended strategy to handle this situation? Do you crash the entire process when a disconnect hits, or do you attempt to reconnect somehow?

Is there a way to utilize more domain instances

I'm using node-cqrs-domain in a multi-tenant project and I need customize the domain per tenant.

I need to have a domain instance per session. What is the best way to do that ? At this time getting a domain instance is static and I can have only 1 instance.

How to cause CommandRejected event to be generated?

domain.createCommandRejectedEvent() will not create a CommandRejected event unless it's error parameter is one of 5 pre-defined Error types from lib/errors.

But how can my code import any of those types? So far, I have been unable to come up with anything better than:

var ValidationError = require('../../../../../../node_modules/cqrs-domain/lib/errors/validationError.js');

Of course, I am new to node.js, so... :-)

e2e example

@adrai

would it be possible for you to post a simple example that demonstrates end-2-end use of cqrs-domain, cqrs-eventdenormalizer, and viewmodel configured using mongoDb?

Business rule fail doesnt cause save to cancel

This code looks great. I've spent the day digging into it but do have a couple issues:

I took an exact copy of the sample and changed itemBusinessRules as follows in the code below. All I did was add a rule that you cant add text that already exists.

Also according to other people, debugging with WebStorm seems to break the functionality. Has anyone else solved this? Its a real pain to not have debugging.

Thanks,
Sean

PS I added the code to init the repository in the domain server.js
PSS I took Udis course a couple years ago. Is the idea to have a separate node instance per AC or is it ok to have multiple aggregates in the same application. I only really have one component that I need to scale like crazy and am ok having the rest on the same node.

var repository = require('viewmodel').read
, itemRepo = repository.extend({collectionName: 'item'});

module.exports = {
itemAggregate: [

function checkForError(changed, previous, events, callback) {

  itemRepo.find({text: changed.text}, function(err, items) {
    if (err){
      //todo
    }

    console.log("There are " + items.length + " items with text= " + changed.text);

    if(items.length > 0){
      return callback('cannot use existing text');
    }
  });

  if (changed.text.toLowerCase().indexOf('error') >= 0) {
    return callback('This is just a sample rule!');
  }

  callback(null);
}

]
};

Manually define aggregates

I could not find a way to define aggregates without relying on the structureLoader mechanism.
I like the idea behind it but want to do it slightly different so that I can define multiple artifacts in the same file.

Would there be a way to provide a hook/api to define an aggregate manually somehow without rewriting too much?

I was able to define events and commands manually after init on existing aggregate definitions but for aggregates I didn't find a way.

How to append extra info (like session info ,data from async request) to the event?

I want to append some info in event like session info, authToken , project key,etc.. to my data.

For this, i made a file in which there are async call to query the database.. the result should be appended to data .. and then aggregate.apply() method should be called..

I want to do this before event is created , so i think aggregate.apply() method is right place.. what you think?Is this the right place to append data to the event?

Secondly,I was thinking about using meta, but don't know how could i use this?

In domain.defineCommand() you have mention about meta that

// optional, if defined theses values will be copied to the event (can be used to transport information like userId, etc..)
  meta: 'meta' 

By this do you mean that the values of meta should be come from command?

Domain validation/rules design question

During the handling of a given command, and a domain validation or business rule determines there is a problem, is there a way to bubble the violation back to the calling client in the context of the original request?

How to check existence of another aggregate id in preconditions ?

Hi,

Your framework looks nice. I tried it a little and it seems quite what I need. Though, how is it possible to achieve this : Given the aggregates :

Person
which has an array of names
the names are a hash of (typeId : number, value : string)
Like

person  = {
    id : 1,
    names : [
        {typeId : 1, value : 'John Cage Inc.'}
        {typeId : 2, value : 'John'}
        {typeId : 3, value : 'Cage'}
    ]
}

PersonNameType
which is a simple string
like Firstname
or Lastname
or PostalName

like :

postalNameType = {
    id : 1,
    value : 'Postal Name' 
}

I defined a command and event for Person "enterNewName"/"enteredNewName"

var domain = require('cqrs-domain');

module.exports = domain.defineEvent({

    name: 'enteredNewName', 

    payload: 'payload' // if not defined it will pass the whole event...

}, function (data, aggregate) {

    aggregate.attributes.names.push(data);

});

and I want to add a precondition "mustBeExistingNameType"

var domain = require('cqrs-domain');

module.exports = domain.definePreCondition({

    description: 'Must specify an existing name type id',

}, function (command, agg, callback) {

    // Check that the given command.payload.typeId exists 
    // as an Id of the PersonNameType aggregate
});

How is it possible to verify that the given typeId exists without having to write a custom command handler? Otherwise, could you indicate how to write this custom command handler?

Thanks

Sagas

Hi, Its me again!

I was going through the sagas pattern.

According to CQRS Journey guide Saga is a Process Manager.
http://msdn.microsoft.com/en-us/library/jj591569.aspx

This Process Manager routes the messages between multiple AR's.

I also came across Clemens Vasters saga explanation :
http://blogs.msdn.com/b/clemensv/archive/2012/08/31/sagas.aspx

The Saga implementation that node-cqrs-domain has, is of a Process Manager.
Since it is a distributed transaction, we need to take care of compensation actions, if anything fails.

Clemens solution has forward & backward messaging mechanism. Actually I liked the idea of RoutingSlip.

How best we can implement compensation actions in node-cqrs-domain module?

Command validation in .js instead of .json?

Currently command validation is done only in .json files. I strongly feel pure json notation is rather heavy ( "), which makes it more difficult to read and maintain.

What if we also allow these validations to be written as .js files? Ie, we do

// some_validation.js
module.exports = {
  title: 'command/hr/person',
  allOf: [
    {
      '$ref': '/command/hr'
    },
    {
      properties: {
        aggregate: {
          type: 'object',
          properties: {
            name: {
              type: 'string',
              pattern: 'person' // I can also add comments now
            }
          }
        }
      }
    }
  ]
}

instead of

// some_validation.json (which can't contain this comment or the json is invalid ;-))
 {
  "title": "command/hr/person",
  "allOf": [
    {
      "$ref": "/command/hr"
    },
    {
      "properties": {
        "aggregate": {
          "type": "object",
          "properties": {
            "name": {
              "type": "string",
              "pattern": "person"
            }
          }
        }
      }
    }
  ]
}

Pro's:

  • much easier to define and maintain your tv4 files (in my opinion anyway, not sure if you agree)
  • more flexibility: allows for other js features (comments, even function calls for whatever reason, ...)

Cons:

  • the files themselves are more difficult to distinguish from the other definitions (right now the validation files stand out because they are json)
  • you need to use module.exports
  • validation (isSchema()) becomes slightly weaker, but this validation was rather weak in the first place

The changes needed to the lib code are minimal (exactly one line).

What do you think?

Domain Entities

Is there a way to explicitly define Entities in Domain?

I am feeling a need for Entities raising their own Events and handling the same.

How to define domain without 'domainPath'?

I'm trying to integrate this library with my application. One issue I run into is that currently I 'compile' all my code (client and server) into a single JavaScript file (long story). Anyway, this currently prevents me from using this library since the domain definitions are loaded from files located by the 'domainPath' option.

Could you imagine another way to load the definitions? Something not based on loading files? I'm not familiar with the code base yet, so I couldn't find another way myself ...

manually build aggregate state for unit testing

Is there a way to manually set the state of an aggregate instance? Currently I am defining and handling all the commands it takes to put an aggregate instance in a specific state. Instead is there a way to just assume it correctly arrived at that state (like passing in a snapshot) and then handle the event?

think about optimizing with immutable objects

var _ = require('lodash');

var evt = {
  "id": "-",
  "aggregate": {
    "name": "person"
  },
  "name": "changePersonInfo",
  "context": {
    "name": "operations"
  },
  "payload": {
    "gender": "male",
    "firstName": "Rolf",
    "lastName": "Bauer",
    "info": "some string changed",
    "entryDate": "2014-10-01T16:13:39.213Z",
    "exitDate": "2014-10-06T16:13:39.213Z",
    "email": "[email protected]",
    "phoneNumber": "0791231289",
    "pictureId": "C0FF33"
  },
  "meta": {
    "siteId": "someSiteId"
  }
};


var evts = [];

for (var i = 0; i < 1; i++) {
  var clonedEvt = _.cloneDeep(evt);

  clonedEvt.id = i + 1;
  evts.push(clonedEvt);
}

var cloneDeepResult = 0;
var jsonStringifyParseResult = 0;
var deepFreezeResult = 0;

var start = Date.now();
evts.forEach(function (e, i) {
  console.log(i + 1);
  _.cloneDeep(e);
});
var end = Date.now();
cloneDeepResult = end - start;


var start = Date.now();
evts.forEach(function (e, i) {
  console.log(i + 1);
  JSON.parse(JSON.stringify(e));
});
var end = Date.now();
jsonStringifyParseResult = end - start;


function deepFreeze(o) {
  var prop, propKey;
  Object.freeze(o); // First freeze the object.
  for (propKey in o) {
    prop = o[propKey];
    if (!o.hasOwnProperty(propKey) || !(typeof prop === 'object') || Object.isFrozen(prop)) {
      // If the object is on the prototype, not an object, or is already frozen,
      // skip it. Note that this might leave an unfrozen reference somewhere in the
      // object if there is an already frozen object containing an unfrozen object.
      continue;
    }

    deepFreeze(prop); // Recursively call deepFreeze.
  }
}

var start = Date.now();
evts.forEach(function (e, i) {
  console.log(i + 1);
  deepFreeze(e);
});
var end = Date.now();
deepFreezeResult = end - start;


console.log('_.cloneDeep(e);: ' + cloneDeepResult + 'ms');
console.log('JSON.parse(JSON.stringify(e)): ' + jsonStringifyParseResult + 'ms');
console.log('deepFreeze(e): ' + deepFreezeResult + 'ms');

Aggregate Attributes

Hi,

Is there a way to specify explicit attributes in Aggregate.

In AggregateBase.js file there is Aggregate object literal which has attributes property. This attributes property has 3 more additional properties. Is there a way to extend this Attributes of Aggregate and define my own prop bags instead of always relying on the command and/or eventstore to generate my AR?

'stopping' the domain for unit testing purposes

Hi again, I am in the process of writing some unit tests that do things like, send a command, then validate that the correct events are fired. This is working ok so far, however, I guess with the forking or whatever is going on in the background, the test process never terminates. Is there a way to explicitly kill it via the api (e.g in a 'tearDown')

node-cqrs with Meteor, can 'domainPath' be optional?

Hi, I'm trying to use cqrs-domain in a meteor app as it's one of the more mature JS/CQRS implementations. Given Meteor's build process, where it basically bundles up all the .js files, etc effectively renders whatever you set as a 'domainPath' invalid. given that we can call say 'require('cqrs-domain').defineAggregate()' directly now. Is the domainPath even used in this case. I used this library on a node project some time ago, and I know it would do some 'magic' with the aggregate definitions, etc in the domain path. BUt it looks like they can be defined explicitly in code now.

How do I set up fixtures?

Hi!

Is there a guide on how to set up the fixtures required for .init()? What do I put inside the domainPath directory?

Performing pre-lock checks on data before command is executed

Looking for some insight on my problem below, perhaps I missed something.

The issue

I have a situation where it seems necessary to perform checks, much like preconditions, on commands. However, we have a lot of checks in some use cases. Performing these checks completely within the bounds of an aggregate - as a classic approach would require - would force us to redefine the aggregate boundaries to incorporate all necessary data, which would result in aggregates that are too large.

Large aggregates are a problem not only for performance issues with regard to loading data - this can be mitigated with snapshots for the most part. Locking is a bigger issue however. If we want to perform checks using a larger aggregate, for example the Unique Entity Business Rule defined in #70, you introduce locking on this larger aggregate (in the case of #70 this also introduces other issues, but I will park that for a bit). This larger aggregate poses a problem in high-volume environments because every time you want to perform a check, the aggregate gets locked, which introduces delays or can cause AggregateConcurrencyExceptions. Summary: as you are well aware, locking is the primary enemy of scaling.

Possible solution

Assuming that most of our checks do not need to be strongly consistent - we can accept stale data for most of these checks, I ended up looking for a solution that allows me to run certain pre-checks before the aggregate is locked. To do so, I've experimentally extended the default workflow with a PreLoadCondition. The PreLoadCondition runs just after validateCommand, but before aggregateLoad. It accepts a preLoadContext, which is an object you can define however you wish in the PreLoadCondition definition, allowing you to (for example) insert view models or whatever other things you like. By using closures you can use (for example) view models or whatever you like in the validation.

This allows you to use external data sources, such as view models or external services, for a form of command validation after the command is validated syntax-wise, but before the command is allowed to enter the domain.

Alternatives

I realize that there are several alternatives, such as:

  • using saga's to run these checks over various aggregates and domains. This is a valid alternative, but it introduces a lot of event and command chatter and leaks and scatters a lot of domain logic into other sections of the application;
  • using external services to (pre)validate data as it enters the domain, before it gets handled by the domain, which has problem of introducing external dependencies, adding complexity, scattering logic, and losing out on command validation.

Does this make sense? Am I missing a ridiculously simple alternative? Thanks a bunch for your insights everyone!

Domain throws Error if sent command is missing command name

Expected behavior: Domain triggers a "commandRejected" event (behaves similar to an ordinary failed command, e.g. validation, business rule, etc.)

Actual behavior: Domain throws an error: 'Please pass in a command name!'

The problem hereby is: the consumer of the cqrs-domain module has to handle these malformed commands specifically. (Differently to other malformed commands, where "only" some validation rule is violated)

@adrai I'm currently working on a fix for this issue

store commands?

I think there is a lot of value in storing each incoming command as well, not just the resulting events. Do you agree? Would that be outside the scope of the library?

Feature request: Breaking hard dependency on 'node-eventstore'

Adriano,

We have been using node-cqrs-domain intensively for quite awhile in development here, and like it a lot, but there is one thing that really throws us off: that the generic domain framework is coupled with the very specific implementation of Event Store (Node clone of NEventStore, to be precise.)

In our case we are using Greg Young's Event Store ("GetEventStore"), and to use it with node-cqrs-domain we have no other choice but to implement the custom "store" (in node-eventstore terms), which acts as an adapter to/from our Node http interface to GetEventStore.

For us, node-eventstore does not add any value, but rather creates extra overhead, which the adapter has to deal with (like event conversion, etc.)

In our opinion, in a perfect world, the domain framework would not know what "Event Store" means. It would be given "something" that knows how, for the given aggregate type and id, to load that aggregate from its history; and another "something" that would know how to deal with snapshots.

But because we don't live in a perfect world, would replacing specific type dependency with interface dependency be something you might consider taking on?

The Domain constructor takes options argument, which has eventStore property describing configuration parameters for <node-eventstore>\eventStore. How about checking if options.eventStore property is a function, and if it is, then treat it as a factory method for creating an event store?

I believe that this is the public interface the custom event store would need to implement to make node-cqrs-domain happy:

getFromSnapshot(query, function(err, snapshot, stream));
createSnapshot(query, function (err));
getNewId(function (err, id));
on(evtName, function ());
init(callback);
setEventToDispatched(evt, function (err));

Yes, node-cqrs-domain would still be dependent on node-eventstore module (because of EventStream), but we would be able to live with that :-)

What do you think?

Browser inuse

I'd like to use my domain in server and in offline scenario. I use Phonegap for IOS and Android Platform and WinJS for windows 8.

What's the best way ? I've never use browserify or others.

Perhaps we could implement AMD compatibility ?

thanks

Unique entity business rule

Hi,
I don't understand how to create a business rule which says : email adress is unique for all persons. I'm not able to access to the persons repository.
Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.