storage.js | |
---|---|
| |
An inMemory implemetation for storage. Use only for development purpose. | var uuid = require('./uuid')
, root = this
, inMemoryStorage
, Storage;
if (typeof exports !== 'undefined') {
inMemoryStorage = exports;
} else {
inMemoryStorage = root.inMemoryStorage = {};
}
inMemoryStorage.VERSION = '0.5.0'; |
Create new instance of storage. | inMemoryStorage.createStorage = function(options, callback) {
return new Storage(options, callback);
}; |
inMemory storage | Storage = function(options, callback) {
this.filename = __filename;
this.isConnected = false;
if (typeof options === 'function')
callback = options;
this.options = options;
this.store = {};
this.snapshots = {};
if (callback) {
this.connect(callback);
}
};
Storage.prototype = { |
connect: connects the underlaying database.
| connect: function(callback) {
this.isConnected = true;
if (callback) callback(null, this);
}, |
addEvents: saves all events.
| addEvents: function(events, callback) {
if (!events || events.length === 0) {
callback(null);
return;
}
if (!this.store[events[0].streamId]) {
this.store[events[0].streamId] = [];
}
this.store[events[0].streamId] = this.store[events[0].streamId].concat(events);
callback(null);
},
|
addSnapshot: stores the snapshot
| addSnapshot: function(snapshot, callback) {
if (!this.snapshots[snapshot.streamId]) {
this.snapshots[snapshot.streamId] = [];
}
this.snapshots[snapshot.streamId].push(snapshot);
callback(null);
}, |
getEvents: loads the events from minRev to maxRev.
| getEvents: function(streamId, minRev, maxRev, callback) {
if (typeof maxRev === 'function') {
callback = maxRev;
maxRev = -1;
}
if (!this.store[streamId]) {
callback(null, []);
}
else {
if (maxRev === -1) {
callback(null, this.store[streamId].slice(minRev));
}
else {
callback(null, this.store[streamId].slice(minRev, maxRev));
}
}
},
|
getAllEvents: loads the events. warning: don't use this in production!!!
| getAllEvents: function(callback) {
var events = [];
for (var i in this.store) {
events = events.concat(this.store[i]);
}
events.sort(function(a, b){
return a.commitStamp - b.commitStamp;
});
callback(null, events);
}, |
getLastEventOfStream: loads the last event from the given stream in storage.
| getLastEventOfStream: function(streamId, callback) {
if (!this.store[streamId]) {
callback(null, null);
}
else if (this.store[streamId].length) {
callback(null, this.store[streamId][this.store[streamId].length - 1]);
} else {
callback(null, null);
}
}, |
getEventRange: loads the range of events from given storage.
| getEventRange: function(index, amount, callback) {
var events = [];
for (var i in this.store) {
events = events.concat(this.store[i]);
if (events.length >= (index + amount)) {
break;
}
}
events = events.slice(index, (index + amount));
events.sort(function(a, b){
return a.commitStamp - b.commitStamp;
});
callback(null, events);
}, |
getSnapshot: loads the next snapshot back from given max revision or the latest if you don't pass in a maxRev.
| getSnapshot: function(streamId, maxRev, callback) {
if (typeof maxRev === 'function') {
callback = maxRev;
maxRev = -1;
}
if (!this.snapshots[streamId]) {
callback(null, {});
}
else {
if (maxRev == -1) {
callback(null, this.snapshots[streamId][this.snapshots[streamId].length - 1]);
} else {
var snaps = this.snapshots[streamId];
for (var i = snaps.length -1; i >= 0; i--) {
if (snaps[i].revision <= maxRev) {
callback(null, snaps[i]);
return;
}
}
}
}
},
|
getUndispatchedEvents: loads all undispatched events.
| getUndispatchedEvents: function(callback) {
var array = [];
for (var ele in this.store) {
var elem = this.store[ele];
for (var evt in elem) {
if (elem[evt].dispatched === false) {
array.push(elem[evt]);
}
}
}
callback(null, array);
},
|
setEventToDispatched: sets the given event to dispatched. hint: instead of the whole event object you can pass: {_id: 'commitId'}
| setEventToDispatched: function(evt) {
evt.dispatched = true;
},
|
getId: loads a new id from storage.
| getId: function(callback) {
callback(null, uuid().toString());
}
};
|