eventStore.js | |
---|---|
| |
The eventstore is the main module delegating all work to it's eventDispatcher, storage implementation, ... Example: | var EventDispatcher = require('./eventDispatcher')
, interfaces = require('./interfaces')
, util = require('./util')
, async = require('async')
, cp = require('child_process')
, root = this
, eventStore
, Store;
if (typeof exports !== 'undefined') {
eventStore = exports;
} else {
eventStore = root.eventStore = {};
}
eventStore.VERSION = '0.5.0'; |
Create new instance of the event store. | eventStore.createStore = function(options) {
return new Store(options);
}; |
EventStore | Store = function(options) {
this.options = options || {};
this.storage = undefined;
this.publisher = undefined;
this.logger = undefined;
this.dispatcher = undefined;
};
Store.prototype = {
|
start: will inject missing modules by using default inMemory or fake instances and finally start the dispatcher to publish undispatched (not yet published) events. | start: function() { |
set default usings if not configured | if (!this.logger && this.options.logger === 'console') {
this.use(require('./logger/consoleLogger'));
}
if (!this.storage) {
this.use(require('./storage/inMemory/storage').createStorage());
}
if (!this.publisher) {
this.use(require('./publisher/fakePublisher').createPublisher());
} |
connect storage if not yet connected | if (!this.storage.isConnected) {
if (typeof this.storage.connect === 'function') this.storage.connect();
} |
if fork enabled, start event dispatcher as child process... | if (this.storage.filename && this.storage.options) {
if (this.logger) {
this.logger.info('Start event dispatcher as child process!');
}
var self = this;
this.dispatcher = cp.fork(__dirname + '/eventDispatcherProcess.js');
this.dispatcher.send({ action: 'use', payload: { options: this.options, storageModulePath: this.storage.filename } });
this.dispatcher.on('message', function(m) {
if (m.action === 'publish') {
self.publisher.publish(JSON.deserialize(m.payload));
}
});
this.dispatcher.send({ action: 'start' }); |
create a handle function on fork | this.dispatcher.addUndispatchedEvents = function(evts) {
self.dispatcher.send({ action: 'addUndispatchedEvents', payload: JSON.stringify(evts) });
};
} |
else, start event dispatcher in same process... | else {
if (this.logger) {
this.logger.info('Start event dispatcher in same process!');
}
this.dispatcher = new EventDispatcher(this.options);
this.dispatcher.useLogger(this.logger)
.usePublisher(this.publisher)
.useStorage(this.storage)
.start();
}
},
|
configure: configure your eventstore to use wished modules Example: | configure: function(fn) {
fn.call(this);
return this;
},
|
use: use this function to inject your implementation for following:
| use: function(module) {
if (!module) return;
if (util.checkInterface(module, interfaces.IStorage, {silent: true})) {
this.storage = module;
}
if (util.checkInterface(module, interfaces.IPublisher, {silent: true})) {
this.publisher = module;
}
if (util.checkInterface(module, interfaces.ILogger, {silent: true})) {
this.logger = module;
}
},
|
log: Just a helper function to shorten logging calls | log: function(msg, level) {
if (!this.logger)
return;
if (level) {
this.logger[level](msg);
} else {
this.logger.info(msg);
}
},
|
hasConfigurationErrors: Another helper function to checks for configuration errors as we need at least a storage implementation at most parts. | hasConfigurationErrors: function(callback) {
var err;
if (!this.storage) {
err = new Error('Configure EventStore to use a storage implementation.');
}
if (err && callback) {
callback(err);
}
return err;
},
|
getFromSnapshot: loads the next snapshot back from given max revision or the latest if you don't pass in a maxRev. In addition you will get the eventstream from snapshot's revision to the given maxRev.
| getFromSnapshot: function(streamId, maxRev, callback) {
if (this.hasConfigurationErrors(callback)) return;
if (typeof maxRev === 'function') {
callback = maxRev;
maxRev = -1;
}
var self = this
, snapshot
, eventStream;
async.waterfall([
function getSnapshot(callback) {
self.storage.getSnapshot(streamId, maxRev, callback);
},
function getEventStream(snap, callback) {
var rev = 0;
if (snap && snap.revision !== undefined) {
rev = snap.revision + 1;
}
self.getEventStream(streamId, rev, maxRev, function(err, stream) {
if (err) callback(err);
snapshot = snap;
eventStream = stream;
callback(null);
});
}],
function (err) {
if (err) {
callback(err);
} else {
callback(null, snapshot, eventStream);
}
}
);
},
|
createSnapshot: stores a new snapshot
| createSnapshot: function(streamId, revision, data, callback) {
if (this.hasConfigurationErrors(callback)) return;
var snapshot = new Snapshot(null, streamId, revision, data);
var self = this;
async.waterfall([
function getNewIdFromStorage(callback) {
self.storage.getId(callback);
},
function commit(id, callback) {
self.log('get new id "' + id + '" from storage');
snapshot.id = id;
self.storage.addSnapshot(snapshot, callback);
}],
function (err) {
if (err) {
if (callback) callback(err);
} else {
self.log('added new snapshot: ' + JSON.stringify(snapshot));
if (callback) callback(null);
}
}
);
},
|
getEventStream: loads the eventstream from revMin to revMax.
| getEventStream: function(streamId, revMin, revMax, callback) {
if (this.hasConfigurationErrors(callback)) return;
if (typeof revMin === 'function') {
callback = revMin;
revMin = 0;
revMax = -1;
} else if (typeof revMax === 'function') {
callback = revMax;
revMax = -1;
}
var self = this;
this.storage.getEvents(streamId, revMin, revMax, function(err, events) {
if (err) {
callback(err);
} else {
callback(null, new EventStream(self, streamId, events));
}
});
},
|
commit: commits all uncommittedEvents in the eventstream. hint: directly use the commit function on eventstream
| commit: function(eventstream, callback) {
if (this.hasConfigurationErrors(callback)) return;
self = this;
async.waterfall([
function getNewCommitId(callback) {
self.storage.getId(callback);
},
function commitEvents(id, callback) { |
start committing. | var event
, currentRevision = eventstream.currentRevision();
for (i = 0, len = eventstream.uncommittedEvents.length; i < len; i++) {
event = eventstream.uncommittedEvents[i];
event.commitId = id;
event.commitSequence = i;
event.commitStamp = new Date();
currentRevision++;
event.streamRevision = currentRevision;
}
self.storage.addEvents(eventstream.uncommittedEvents, function(err) {
if (err) callback(err);
});
|
push to undispatchedQueue | self.dispatcher.addUndispatchedEvents(eventstream.uncommittedEvents);
|
move to events and remove uncommitted events. | eventstream.events = eventstream.events.concat(eventstream.uncommittedEvents);
eventstream.uncommittedEvents = [];
callback(null);
}],
function (err) {
if (err) {
if (callback) callback(err);
} else {
if (callback) callback(null, eventstream);
}
}
);
}, |
getNewIdFromStorage: loads a new id from storage.
| getNewIdFromStorage: function(callback) {
if (this.hasConfigurationErrors(callback)) return;
this.storage.getId(callback);
},
|
getAllEvents: loads the events from given storage. warning: don't use this in production!!!
| getAllEvents: function(callback) {
if (this.hasConfigurationErrors(callback)) return;
this.storage.getAllEvents(function(err, events) {
if (typeof callback === 'function') {
callback(err, events);
}
});
}, |
getLastEventOfStream: loads the last event from the given stream in storage.
| getLastEventOfStream: function(streamId, callback) {
if (this.hasConfigurationErrors(callback)) return;
this.storage.getLastEventOfStream(streamId, function(err, event) {
if (typeof callback === 'function') {
callback(err, event);
}
});
}, |
getEvents: loads the events from given storage.
| getEvents: function(index, amount, callback) {
if (this.hasConfigurationErrors(callback)) return;
this.storage.getEventRange(index, amount, function(err, events) {
if (typeof callback === 'function') {
callback(err, events);
}
});
}
}; |
EventStreamThe eventstream is one of the main objects to interagate with the eventstore
| var EventStream = function(store, streamId, events) {
this.store = store;
this.streamId = streamId;
this.events = events || [];
this.uncommittedEvents = [];
this.lastRevision = -1;
};
EventStream.prototype = { |
currentRevision: This helper function returns the current stream revision. | currentRevision: function() {
var rev = this.lastRevision;
for (i = 0, len = this.events.length; i < len; i++) {
if (this.events[i].streamRevision > rev) {
rev = this.events[i].streamRevision;
}
}
return rev;
}, |
addEvent: adds an event to the uncommittedEvents array
| addEvent: function(event) {
var evt = new Event(this.streamId, event);
this.uncommittedEvents.push(evt);
},
|
commit: commits all uncommittedEvents `eventstream.commit(callback)
| commit: function(callback) {
this.store.commit(this, callback);
}
}; |
EventThe event object will be persisted to the storage. The orginal event will be saved in payload. | var Event = function(streamId, event) {
this.streamId = streamId || null;
this.streamRevision = null;
this.commitId = null;
this.commitSequence = null;
this.commitStamp = null;
this.header = null;
this.dispatched = false;
this.payload = event || null;
}; |
SnapshotThe snapshot object will be persisted to the storage. The orginal data will be saved in data. | var Snapshot = function(id, streamId, revision, data) {
this.id = id;
this.streamId = streamId;
this.revision = revision;
this.data = data;
}; |
attach public classes | Store.Event = Event;
Store.EventStream = EventStream;
Store.Snapshot = Snapshot;
|