Jump To …

storage.js

lib/storage/inMemory/storage.js v0.4.0
under MIT License
(by) Jan Muehlemann (jamuhl)
   , Adriano Raiano (adrai)

An inMemory implemetation for storage. Use only for development purpose.
For production there is a wide range of options (mongoDb, redis, couchDb) or role your own implementation.

var uuid = require('./uuid')
  , root = this
  , inMemoryStorage
  , Storage;

if (typeof exports !== 'undefined') {
    inMemoryStorage = exports;
} else {
    inMemoryStorage = root.inMemoryStorage = {};
}

inMemoryStorage.VERSION = '0.5.0';

Create new instance of storage.

inMemoryStorage.createStorage = function(options, callback) {
    return new Storage(options, callback);
};

inMemory storage

Storage = function(options, callback) { 

    this.filename = __filename;
    this.isConnected = false;

    if (typeof options === 'function')
        callback = options;

    this.options = options;

    this.store = {};
    this.snapshots = {};
    
    if (callback) {
        this.connect(callback);
    }
};

Storage.prototype = {

connect: connects the underlaying database.

storage.connect(callback)

  • callback: function(err, storage){}
    connect: function(callback) {
        this.isConnected = true;

        if (callback) callback(null, this);
    },

addEvents: saves all events.

storage.addEvents(events, callback)

  • events: the events array
  • callback: function(err){}
    addEvents: function(events, callback) {
        if (!events || events.length === 0) { 
            callback(null);
            return;
        }
        
        if (!this.store[events[0].streamId]) {
            this.store[events[0].streamId] = [];
        }
        
        this.store[events[0].streamId] =  this.store[events[0].streamId].concat(events);
        callback(null);
    },
    

addSnapshot: stores the snapshot

storage.addSnapshot(snapshot, callback)

  • snapshot: the snaphot to store
  • callback: function(err){} [optional]
    addSnapshot: function(snapshot, callback) {
        if (!this.snapshots[snapshot.streamId]) {
            this.snapshots[snapshot.streamId] = [];
        }
        
        this.snapshots[snapshot.streamId].push(snapshot);
        callback(null);
    },

getEvents: loads the events from minRev to maxRev.

storage.getEvents(streamId, minRev, maxRev, callback)

  • streamId: id for requested stream
  • minRev: revision startpoint
  • maxRev: revision endpoint (hint: -1 = to end) [optional]
  • callback: function(err, snapshot, eventStream){}
    getEvents: function(streamId, minRev, maxRev, callback) {
        
        if (typeof maxRev === 'function') {
            callback = maxRev;
            maxRev = -1;
        }
        
        if (!this.store[streamId]) {
           callback(null, []);
        }
        else {
            if (maxRev === -1) {
                callback(null, this.store[streamId].slice(minRev));
            }
            else {
                callback(null, this.store[streamId].slice(minRev, maxRev));
            }
        }
    },
    

getAllEvents: loads the events.

warning: don't use this in production!!!

storage.getAllEvents(callback)

  • callback: function(err, events){}
    getAllEvents: function(callback) {
        var events = [];
        for (var i in this.store) {
            events = events.concat(this.store[i]);
        }
        
        events.sort(function(a, b){
             return a.commitStamp - b.commitStamp;
        });
        
        callback(null, events);
    },

getLastEventOfStream: loads the last event from the given stream in storage.

storage.getLastEventOfStream(streamId, callback)

  • streamId: the stream id
  • callback: function(err, event){}
    getLastEventOfStream: function(streamId, callback) {
        
        if (!this.store[streamId]) {
           callback(null, null);
        }
        else if (this.store[streamId].length) {
            callback(null, this.store[streamId][this.store[streamId].length - 1]);
        } else {
            callback(null, null);
        }

    },

getEventRange: loads the range of events from given storage.

storage.getEventRange(index, amount, callback)

  • index: entry index
  • amount: amount of events
  • callback: function(err, events){}
    getEventRange: function(index, amount, callback) {
        var events = [];
        for (var i in this.store) {
            events = events.concat(this.store[i]);

            if (events.length >= (index + amount)) {
                break;
            }
        }
        
        events = events.slice(index, (index + amount));

        events.sort(function(a, b){
             return a.commitStamp - b.commitStamp;
        });

        callback(null, events);
    },

getSnapshot: loads the next snapshot back from given max revision or the latest if you don't pass in a maxRev.

storage.getSnapshot(streamId, maxRev, callback)

  • streamId: id for requested stream
  • maxRev: revision endpoint (hint: -1 = to end)
  • callback: function(err, snapshot){}
    getSnapshot: function(streamId, maxRev, callback) {
        
        if (typeof maxRev === 'function') {
            callback = maxRev;
            maxRev = -1;
        }
        
        if (!this.snapshots[streamId]) {
           callback(null, {});
        }
        else {
            if (maxRev == -1) {
                callback(null, this.snapshots[streamId][this.snapshots[streamId].length - 1]);
            } else {
                var snaps = this.snapshots[streamId];
                for (var i = snaps.length -1; i >= 0; i--) {
                    if (snaps[i].revision <= maxRev) {
                        callback(null, snaps[i]);
                        return;
                    }
                }
            }
        }
    },
    

getUndispatchedEvents: loads all undispatched events.

storage.getUndispatchedEvents(callback)

  • callback: function(err, events){}
    getUndispatchedEvents: function(callback) {
        var array = [];
        
        for (var ele in this.store) {
            var elem = this.store[ele];
            for (var evt in elem) {
                if (elem[evt].dispatched === false) {
                    array.push(elem[evt]);
                }
            }
        }
        
        callback(null, array);
    },
    

setEventToDispatched: sets the given event to dispatched.

hint: instead of the whole event object you can pass: {_id: 'commitId'}

storage.setEventToDispatched(event, callback)

  • event: the event
  • callback: function(err, events){} [optional]
    setEventToDispatched: function(evt) {
        evt.dispatched = true;
    },
    

getId: loads a new id from storage.

storage.getId(callback)

  • callback: function(err, id){}
    getId: function(callback) {
        callback(null, uuid().toString());
    }
};