-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfactory.js
188 lines (165 loc) · 5.09 KB
/
factory.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/**
* @fileoverview Manages all of the current subscriptions.
*/
var events = require('events');
var mysql = require('mysql');
var PubHub = require('./lib/pubhub.js').PubHub;
var Subscription = require('./lib/subscription.js').Subscription;
var url = require('url');
var util = require('util');
/**
* @constructor
*/
function Factory() {
var self = this;
events.EventEmitter.call(self);
// TODO - bail if this file doesn't exist.
var optionsFile = require('fs').readFileSync('./local.json', 'utf8');
self.options = JSON.parse(optionsFile);
// Create the MySQL client and connect to the database.
var client = mysql.createClient(self.options.database);
client.useDatabase(self.options.database.database);
self.hubs = [];
// Connect to the db and load up all of the existing hubs.
client.query("SELECT feed FROM subscriptions", function onFind(err, docs, fields) {
if (err) {
self.emit('error', err);
return;
}
for (var x in docs) {
self.addHub(docs[x].feed, null, false);
}
});
// Close the MySQL connection since it's not used by us anymore.
client.end();
}
util.inherits(Factory, events.EventEmitter);
/**
* Stop all hubs.
*/
Factory.prototype.stop = function() {
var self = this;
for (var x in self.hubs) {
self.hubs[x].stop();
}
};
/**
* Adds/removes a subscription. The actual action is determined by
* sub.hub_mode.
*
* @param {object} sub
* A subscription object as handled by routes/subscribe.js:
* {
* hub_callback: The subscriber callback.
* hub_mode: 'subscribe' or 'unsubscribe'.
* hub_topic: The feed URL to subscribe to.
* hub_verify: 'sync' or 'async', not needed.
* hub_lease_seconds: Length of time to keep the feed active.
* hub_secret: Used to compute HMAC digest.
* hub_verify_token: Verification token.
* }.
* @see http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5
*/
Factory.prototype.subscribe = function(sub) {
var self = this;
var found = false;
var newSubscriber = {};
newSubscriber.callback = sub.hub_callback;
newSubscriber.lease_seconds = sub.hub_lease_seconds;
newSubscriber.created = Date.now();
newSubscriber.secret = sub.hub_secret;
newSubscriber.verify_token = sub.hub_verify_token;
// Check to see if a hub already exists for this feed.
// TODO - improve this search.
for (var x in self.hubs) {
if (self.hubs[x].getFeed() === sub.hub_topic) {
if (sub.hub_mode === 'subscribe') {
self.hubs[x].Subscription.addSubscriber(newSubscriber);
}
else {
self.hubs[x].Subscription.removeSubscriber(newSubscriber.callback);
}
found = true;
break;
}
}
// Add a new hub if we didn't find an existing one.
if (!found && sub.hub_mode === 'subscribe') {
self.addHub(sub.hub_topic, newSubscriber);
}
};
/**
* Handles a publish notification.
*
* @param {string} url
* The feed URL that notified us of new content.
*/
Factory.prototype.publish = function(url) {
var self = this;
var found = false;
// Find the feed that's being published.
// TODO - improve this search.
for (var x in self.hubs) {
if (self.hubs[x].getFeed() === url) {
found = true;
var options = self.hubs[x].getFeed(true);
options.headers = {
'User-Agent': 'PubHub (https://github.com/elliotttf/PubHub)'
};
// Update the subscription if we didn't know this feed could publish.
if (!self.hubs[x].Subscription.Subscription.push) {
self.hubs[x].Subscription.Subscription.push = true;
self.hubs[x].Subscription.save();
// Stop polling since we know the feed can talk to us now.
self.hubs[x].stop();
}
self.hubs[x].fetch(options);
break;
}
}
// If the feed wasn't found, add an empty feed with no subscribers,
// we'll notify subscribers when they come in.
if (!found) {
self.addHub(url);
}
};
/**
* Adds a new hub to the list.
*
* @param {string} url
* The feed url.
* @param {object} sub
* (optional) A subscriber object for this feed.
* @param {boolean} save
* (optional) true if the subscription should be saved after it is loaded.
*
* @see Subscription().
*/
Factory.prototype.addHub = function(url, sub, save) {
var self = this;
if (typeof sub !== 'undefined' && sub !== null) {
var newSubscription = new Subscription(url, sub);
}
else {
var newSubscription = new Subscription(url);
}
newSubscription.on('loaded', function onLoaded(loadedSub) {
newSubscription = null;
if (typeof save === 'undefined' || save === true) {
loadedSub.save();
}
var newHub = new PubHub(loadedSub);
var index = (self.hubs.push(newHub) - 1);
// We always start with a polling model until the source publishes to us.
self.hubs[index].listen();
self.hubs[index].on('changed', function onChanged(data) {
self.hubs[index].stop();
self.hubs[index].Subscription.updateData(data);
self.hubs[index].publish(data);
});
self.hubs[index].on('published', function published(msg) {
self.hubs[index].listen();
});
});
};
exports.Factory = Factory;