Skip to content
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,27 @@ Sessionstore is a node.js module for multiple databases. It can be very useful i
})
}));

## Connecting to elasticsearch
## Connecting to elasticsearch (https://github.com/elastic/elasticsearch)

var sessionstore = require('sessionstore');

var express = require('express');
var app = express();

app.use(express.session({
store: sessionstore.createSessionStore({
type: '@elasticsearch',
node: 'localhost:9200', // optional
prefix: '', // optional
index: 'express', // optional
typeName: 'session', // optional
pingInterval: 1000, // optional
timeout: 10000 // optional
})
}));


## Connecting to elasticsearch legacy (https://github.com/elastic/elasticsearch-js-legacy)

var sessionstore = require('sessionstore');

Expand Down
222 changes: 222 additions & 0 deletions lib/databases/@elasticsearch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
var util = require('util'),
Session = require('../sessionInterface'),
use = require('../use'),
_ = require('lodash'),
// async = require('async'),
jsondate = require('jsondate'),
elasticsearch = use('@elastic/elasticsearch');


var ElasticSearchSessionStore = function (options) {
options = options || {};
Session.Store.call(this, options);

var defaults = {
index: 'express',
typeName: 'session',
prefix: '',
ttl: 60 * 60 * 24 * 14, // 14 days
pingInterval: 1000
};

_.defaults(options, defaults);

if (!options.nodes && !options.node) {
options.node = 'http://localhost:9200';
}

this.options = options;

this.index = this.options.index;
this.typeName = this.options.typeName;
};

util.inherits(ElasticSearchSessionStore, Session.Store);

_.extend(ElasticSearchSessionStore.prototype, {

connect: function (callback) {
var self = this;

this.isConnected = false;

this.client = new elasticsearch.Client(this.options);

var callbacked = false;
this.closeCalled = false;

var interval = setInterval(function () {
if (self.closeCalled) {
clearInterval(interval);
}

self.client.ping(function (err) {
if (err) {
if (self.isConnected) {
self.isConnected = false;
self.emit('disconnect');
}
if (callback && !callbacked) {
callbacked = true;
callback(err, self);
}
return;
}

if (!self.isConnected) {
// Github issue #39 - recover after temp ping error.
if (callbacked) {
// Already callbacked, so only restore isConnected state.
self.isConnected = true;
self.emit('connect');
} else {
// Not callbacked yet, so perform init logic and handle isConnected state.
self.client.indices.create({
index: self.index
}, function(err) {
if (err && err.message.toLowerCase().indexOf('already') >= 0) {
err = null;
}
if (err) {
if (callback && !callbacked) {
callbacked = true;
callback(err, self);
}
return;
}

self.client.indices.putMapping({
index: self.index,
type: self.typeName,
body: {
session: {
_ttl: { enabled: true, default: '14d' }
}
}
}, function(err) {
if (err) {
if (callback && !callbacked) {
callbacked = true;
callback(err, self);
}
return;
}

self.isConnected = true;
self.emit('connect');
if (callback && !callbacked) {
callbacked = true;
callback(err, self);
}
});
});
}
}
});
}, this.options.pingInterval);
},

disconnect: function (callback) {
this.closeCalled = true;
if (this.client) this.client.close();
if (callback) callback(null);
},

set: function (sid, sess, callback) {
var ttl = this.options.ttl * 1000;
if (sess && sess.cookie && sess.cookie.expires) {
ttl = (new Date(sess.cookie.expires)).getTime() - Date.now();
sess.expiresAt = new Date(sess.cookie.expires);
} else {
sess.expiresAt = new Date(Date.now() + this.options.ttl * 1000);
}

// sess._version = sess._version || 1;
// sess._version++;

var self = this;

this.client.exists({
index: this.index,
type: this.typeName,
id: this.options.prefix + sid
}, function (err, exists) {
var methodName = undefined;
if (exists.statusCode === 404) {
methodName = 'create';
}
self.client.index({
index: self.index,
type: self.typeName,
id: self.options.prefix + sid,
opType: methodName,
// version: sess._version > 2 ? sess._version - 1 : undefined,
//ttl: ttl + 'ms',
body: sess,
refresh: true
}, function (err, res) {
if (err && (err.message.toLowerCase().indexOf('version') >= 0)) {
return callback(new Error('ConcurrencyError: Session was updated by someone else!'));
}
callback(err, res);
});
});
},

//touch: function (sid, sess, callback) {
// this.set(sid, sess, callback);
//},

get: function (sid, callback) {
this.client.get({
index: this.index,
type: this.typeName,
id: this.options.prefix + sid
}, function (err, res) {
if (err && err.body && err.body.found === false) {
err = null;
}
if (err) return callback(err);
if (typeof res == 'undefined' || typeof res.body == 'undefined'){
return callback(null, null);
}
if (res.body._source) {
var sess = jsondate.parse(JSON.stringify(res.body._source));
if (sess.expiresAt && sess.expiresAt.getTime() > Date.now()) {
delete sess.expiresAt;
return callback(null, sess);
}
}
callback(null, null);
});
},

destroy: function (sid, callback) {
this.client.delete({
index: this.index,
type: this.typeName,
id: this.options.prefix + sid
}, function (err, res) {
if (err && err.message.toLowerCase().indexOf('not found') >= 0) {
err = null;
}
if (callback) callback(err);
});
},

clear: function (callback) {
var self = this;
this.client.indices.exists({index: this.index}, function (err, result) {
if (result){
self.client.indices.delete({index: self.index}, function (err) {
if (callback) callback(err);
});
} else {
if (callback) callback(err);
}
});
}

});

module.exports = ElasticSearchSessionStore;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"tolerance": "1.0.0"
},
"devDependencies": {
"@elastic/elasticsearch": "^7.5.1",
"cradle": ">=0.2.7",
"elasticsearch": ">= 10.0.0",
"eslint": ">=1.0.0",
Expand Down
2 changes: 1 addition & 1 deletion test/sessionStoreTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe('SessionStore', function() {

describe('with options containing a type property with the value of', function() {

var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'memcached', 'couchdb'];
var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'memcached', 'couchdb', '@elasticsearch'];

types.forEach(function(type) {

Expand Down