--- a/client/cloda-completers.js
+++ b/client/cloda-completers.js
@@ -1,13 +1,13 @@
var ContactCompleter = {
type: "contact",
complete: function(aAutocomplete, aText) {
console.log("Contact completer firing on", aText);
- Gloda.dbContacts.view("contact_ids/by_suffix", {
+ Gloda.dbContacts.view("raindrop!contacts!all/by_suffix", {
startkey: aText,
endkey: aText + "\u9999",
include_docs: true,
limit: 10,
success: function(result) {
var seen = {};
var nodes = [];
result.rows.forEach(function (row) {
@@ -26,27 +26,30 @@ var ContactCompleter = {
});
}
};
var TagCompleter = {
type: "tag",
complete: function(aAutocomplete, aText) {
console.log("Tag completer firing on", aText);
- Gloda.dbMessages.view("tags/all_tags", {
+ Gloda.dbMessages.view("raindrop!tags!all/all", {
+ startkey: aText,
+ endkey: aText + "\u9999",
success: function(result) {
+ var nodes = [];
+ if (result.rows.length==0) {
+ console.log("Tag completer can't see any tags starting with", aText);
+ return;
+ }
var tagNames = result.rows[0].value;
console.log("Tag completer got tag names:", tagNames);
- var nodes = [];
- var textLength = aText.length;
tagNames.forEach(function (tagName) {
- if (tagName.substring(0, textLength) == aText) {
- var node = $("<div/>")[0];
- ElementXBL.prototype.addBinding.call(node, "autocomplete.xml#tag-completion");
- node.setTagName(tagName);
- nodes.push(node);
- }
+ var node = $("<div/>")[0];
+ ElementXBL.prototype.addBinding.call(node, "autocomplete.xml#tag-completion");
+ node.setTagName(tagName);
+ nodes.push(node);
});
aAutocomplete.haveSomeResults(aText, nodes, TagCompleter, 100);
}
});
}
};
--- a/client/cloda.js
+++ b/client/cloda.js
@@ -33,40 +33,39 @@ var GlodaConversationProto = {
if (!message._parent) {
topnodes.push(message);
}
}
topnodes.sort(function (a,b) { return a.timestamp - b.timestamp; } );
return topnodes;
},
get subject() {
- return this.messages[0].subject();
+ return this.messages[0]['subject'];
},
};
var GlodaMessageProto = {
- subject: function() {
- return this.headers["Subject"];
- },
_bodyTextHelper: function(aPart) {
var result = "";
if (aPart.parts) {
return aPart.parts.map(this._bodyTextHelper, this).join("");
}
else if (aPart.contentType == "text/plain")
return aPart.data;
else
return "";
},
// something scary happens when these are getters in terms of putting things back
bodyText: function() {
- return this._bodyTextHelper(this.bodyPart);
+ return this['body']
+ // return this._bodyTextHelper(this.bodyPart);
},
bodySnippet: function() {
- return this.bodyText().substring(0, 128);
+ return this['body_preview'].substring(0, 128);
+ // return this.bodyText().substring(0, 128);
},
_rawSetDefault: function(aKey, aDefaultValue) {
var raw = this.__proto__;
console.log("this", this, "raw", raw);
if (aKey in raw) {
var val = raw[aKey];
if (val != null)
return val;
@@ -117,16 +116,17 @@ GlodaConvQuery.prototype = {
this.constraintsPending = aConstraints.length;
this.constraints.forEach(this.dispatchConstraint, this);
},
dispatchConstraint: function(aConstraint) {
var viewName = aConstraint.view;
delete aConstraint.view;
aConstraint["success"] = this.wrappedProcessResults;
+ console.log("executing view", viewName);
Gloda.dbMessages.view(viewName, aConstraint);
},
/**
* Result handling function for constraints issued by queryForConversations.
* Each result set has rows whose values are conversation ids. Once all
* constraints have return their results, we load the conversations
* by a call to getConversations.
*/
@@ -159,18 +159,33 @@ GlodaConvQuery.prototype = {
* Retrieve conversations by id, also loading any involved contacts and
* reflecting them onto the messages themselves.
*/
getConversations: function(aConversationIds, aCallback, aCallbackThis) {
this.callback = aCallback;
this.callbackThis = aCallbackThis;
var dis = this;
- Gloda.dbMessages.view("by_conversation/by_conversation", {
- keys: aConversationIds, include_docs: true,
+ Gloda.dbMessages.view("raindrop!messages!by/by_conversation", {
+ keys: aConversationIds,
+ success: function(result) {
+ dis.processConversationIDsFetch(result);
+ }
+ });
+ },
+ processConversationIDsFetch: function(result) {
+ // we receive the list of conversations with the value being the ID of
+ // the document holding the message itself. Get all listed messages.
+ var msg_ids = [];
+ var rows = result.rows, iRow, row, contact_id;
+ for (iRow = 0; iRow < rows.length; iRow++) {
+ msg_ids.push(rows[iRow].value);
+ }
+ var dis = this;
+ Gloda.dbMessages.allDocs({ keys: msg_ids, include_docs: true,
success: function(result) {
dis.processConversationFetch(result);
}
});
},
processConversationFetch: function(result) {
// we receive the list of fetched messages. we need to group them by
// conversation (this should be trivially easy because they should come
@@ -191,22 +206,24 @@ GlodaConvQuery.prototype = {
oldest: message.timestamp, newest: message.timestamp,
involves_contact_ids: {}, raw_messages: []
};
conversation.raw_messages.push(message);
if (conversation.oldest > message.timestamp)
conversation.oldest = message.timestamp;
if (conversation.newest < message.timestamp)
conversation.newest = message.timestamp;
+ /**********
for (var iContactId = 0; iContactId < message.involves_contact_ids.length;
iContactId++) {
contact_id = message.involves_contact_ids[iContactId];
conversation.involves_contact_ids[contact_id] = true;
seenContactIds[contact_id] = true;
}
+ ******/
}
console.log("seenContactIds", seenContactIds);
var contact_ids = [];
for (contact_id in seenContactIds)
contact_ids.push(contact_id);
console.log("contact lookup list:", contact_ids);
@@ -250,37 +267,37 @@ GlodaConvQuery.prototype = {
convList.push(conversation);
wrapped_messages = [];
for (var iMsg = 0; iMsg < conversation.raw_messages.length; iMsg++) {
var raw_message = conversation.raw_messages[iMsg];
raw_message.__proto__ = GlodaMessageProto;
var message = {__proto__: raw_message};
message.from = contacts[message.from_contact_id];
- message.to = mapContactList(message.to_contact_ids);
- message.cc = mapContactList(message.cc_contact_ids);
- message.involves = mapContactList(message.involves_contact_ids);
+ message.to = "fix me!!" // mapContactList(message.to_contact_ids);
+ message.cc = "fix me too!!" // mapContactList(message.cc_contact_ids);
+ message.involves = "fix me 3!!!" // mapContactList(message.involves_contact_ids);
wrapped_messages.push(message);
}
conversation.messages = wrapped_messages;
conversation.messages.sort(function (a,b) {return a.timestamp - b.timestamp;});
}
convList.sort(function (a, b) { return b.newest - a.newest; });
console.log("callback with conv list", convList);
this.callback.call(this.callbackThis, convList);
}
};
var MAX_TIMESTAMP = 4000000000;
var Gloda = {
- dbContacts: $.couch.db("contacts"),
- dbMessages: $.couch.db("messages"),
+ dbContacts: $.couch.db("raindrop"),
+ dbMessages: $.couch.db("raindrop"),
_init: function () {
},
queryByStuff: function(aInvolvedContactIds, aTagNames, aDiscussionIds, aCallback, aCallbackThis) {
// -- for each involved person, get the set of conversations they're in
var constraints = [];
@@ -290,18 +307,18 @@ var Gloda = {
view: "by_involves/by_involves",
startkey: [contact._id, 0], endkey: [contact._id, MAX_TIMESTAMP]
};
}, this));
}
if (aTagNames && aTagNames.length) {
constraints = constraints.concat(aTagNames.map(function (tagName) {
return {
- view: "by_tags/by_tags",
- startkey: [tagName, 0], endkey: [tagName, MAX_TIMESTAMP]
+ view: "raindrop!conversations!by/by_tags",
+ key: tagName,
};
}, this));
}
if (aDiscussionIds && aDiscussionIds.length) {
constraints = constraints.concat(aDiscussionIds.map(function (discussion) {
return {
view: "by_mailing_list/by_list_id",
startkey: [discussion.id, 0], endkey: [discussion.id, MAX_TIMESTAMP]
--- a/client/index.xhtml
+++ b/client/index.xhtml
@@ -61,18 +61,18 @@
ru.addStaticMailbox("Trash",":trash");
ru.addStaticMailbox("Junk",":junk");
ru.addDynamicMailbox("People","people-by-frecency");
ru.addDynamicMailbox("Search","searches");
ru.addDynamicMailbox("Discussion","discussion");
- var dbMessages = $.couch.db("messages");
- dbMessages.view("by_list_id/by_mailing_list", {
+ var dbMessages = $.couch.db("raindrop");
+ dbMessages.view("raindrop!mailing_lists!all/by_list_id", {
include_docs: false,
group : true,
success: function(json) {
var parent = $("#discussion");
json.rows.forEach(function(row) {
var li = $(document.createElement("li")).addClass("discussion-list");
parent.append(li
.append($(document.createElement("a"))
--- a/client/jquery.couch.js
+++ b/client/jquery.couch.js
@@ -7,17 +7,35 @@
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
(function($) {
$.couch = $.couch || {};
- $.fn.extend($.couch, {
+ $.extend($.couch, {
+
+ activeTasks: function(options) {
+ options = options || {};
+ $.ajax({
+ type: "GET", url: "/_active_tasks", dataType: "json",
+ complete: function(req) {
+ var resp = $.httpData(req, "json");
+ if (req.status == 200) {
+ if (options.success) options.success(resp);
+ } else if (options.error) {
+ options.error(req.status, resp.error, resp.reason);
+ } else {
+ alert("Active task status could not be retrieved: " +
+ resp.reason);
+ }
+ }
+ });
+ },
allDbs: function(options) {
options = options || {};
$.ajax({
type: "GET", url: "/_all_dbs",
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 200) {
@@ -28,27 +46,42 @@
alert("An error occurred retrieving the list of all databases: " +
resp.reason);
}
}
});
},
config: function(options, section, option, value) {
+ options = options || {};
+ var url = "/_config/";
+ if (section) {
+ url += encodeURIComponent(section) + "/";
+ if (option) {
+ url += encodeURIComponent(option);
+ }
+ }
+ if (value === undefined) {
+ var method = "GET";
+ } else {
+ var method = "PUT";
+ var data = toJSON(value);
+ }
$.ajax({
- type: "GET", url: "/_config/",
+ type: method, url: url, contentType: "application/json",
+ dataType: "json", data: toJSON(value), processData: false,
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 200) {
if (options.success) options.success(resp);
} else if (options.error) {
options.error(req.status, resp.error, resp.reason);
} else {
- alert("An error occurred retrieving the server configuration: " +
- resp.reason);
+ alert("An error occurred retrieving/updating the server " +
+ "configuration: " + resp.reason);
}
}
});
},
db: function(name) {
return {
name: name,
@@ -71,17 +104,17 @@
}
}
});
},
create: function(options) {
options = options || {};
$.ajax({
type: "PUT", url: this.uri, contentType: "application/json",
- dataType: "json", data: "", processData: false,
+ dataType: "json", data: "", processData: false,
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 201) {
if (options.success) options.success(resp);
} else if (options.error) {
options.error(req.status, resp.error, resp.reason);
} else {
alert("The database could not be created: " + resp.reason);
@@ -154,33 +187,66 @@
options.error(req.status, resp.error, resp.reason);
} else {
alert("An error occurred retrieving a list of all documents: " +
resp.reason);
}
}
});
},
- openDoc: function(docId, options) {
+ allDesignDocs: function(options) {
+ options = options || {};
+ this.allDocs($.extend({startkey:"_design", endkey:"_design0"}, options));
+ },
+ allApps: function(options) {
options = options || {};
- $.ajax({
+ var self = this;
+ if (options.eachApp) {
+ this.allDesignDocs({
+ success: function(resp) {
+ $.each(resp.rows, function() {
+ self.openDoc(this.id, {
+ success: function(ddoc) {
+ var index, appPath, appName = ddoc._id.split('/');
+ appName.shift();
+ appName = appName.join('/');
+ index = ddoc.couchapp && ddoc.couchapp.index;
+ if (index) {
+ appPath = ['', name, ddoc._id, index].join('/');
+ } else if (ddoc._attachments && ddoc._attachments["index.html"]) {
+ appPath = ['', name, ddoc._id, "index.html"].join('/');
+ }
+ if (appPath) options.eachApp(appName, appPath, ddoc);
+ }
+ });
+ });
+ }
+ });
+ } else {
+ alert("please provide an eachApp function for allApps()");
+ }
+ },
+ openDoc: function(docId, options, ajaxOptions) {
+ options = options || {};
+ ajaxOptions = ajaxOptions || {};
+ $.ajax($.extend({
type: "GET",
url: this.uri + encodeURIComponent(docId) + encodeOptions(options),
dataType: "json",
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 200) {
if (options.success) options.success(resp);
} else if (options.error) {
options.error(req.status, resp.error, resp.reason);
} else {
alert("The document could not be retrieved: " + resp.reason);
}
}
- });
+ }, ajaxOptions));
},
saveDoc: function(doc, options) {
options = options || {};
if (doc._id === undefined) {
var method = "POST";
var uri = this.uri;
} else {
var method = "PUT";
@@ -230,63 +296,63 @@
}
var body = {language: language, map: mapFun};
if (reduceFun != null) {
if (typeof(reduceFun) != "string")
reduceFun = reduceFun.toSource ? reduceFun.toSource() : "(" + reduceFun.toString() + ")";
body.reduce = reduceFun;
}
$.ajax({
- type: "POST", url: this.uri + "_slow_view" + encodeOptions(options),
+ type: "POST", url: this.uri + "_temp_view" + encodeOptions(options),
contentType: "application/json",
data: toJSON(body), dataType: "json",
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 200) {
if (options.success) options.success(resp);
} else if (options.error) {
options.error(req.status, resp.error, resp.reason);
} else {
alert("An error occurred querying the database: " + resp.reason);
}
}
});
},
view: function(name, options) {
options = options || {};
+ name = name.split('/');
if (options.keys) {
$.ajax({
- type: "POST",
- url: this.uri + "_view/" + name + encodeOptions(options),
+ type: "POST", url: this.uri + "_design/" + name[0] + "/_view/" + name[1] + encodeOptions(options),
contentType: "application/json",
data: toJSON({keys: options.keys}), dataType: "json",
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 200) {
if (options.success) options.success(resp);
} else if (options.error) {
options.error(req.status, resp.error, resp.reason);
} else {
- alert("An error occurred accessing the view: " + resp.reason);
+ alert("An error occurred accessing the view '" + name + "': " + resp.reason);
}
}
});
return;
}
$.ajax({
- type: "GET", url: this.uri + "_view/" + name + encodeOptions(options),
+ type: "GET", url: this.uri + "_design/" + name[0] + "/_view/" + name[1] + encodeOptions(options),
dataType: "json",
complete: function(req) {
var resp = $.httpData(req, "json");
if (req.status == 200) {
if (options.success) options.success(resp);
} else if (options.error) {
options.error(req.status, resp.error, resp.reason);
} else {
- alert("An error occurred accessing the view: " + resp.reason);
+ alert("An error occurred accessing the view '" + name + "': " + resp.reason);
}
}
});
}
};
},
info: function(options) {
--- a/client/messages.xml
+++ b/client/messages.xml
@@ -124,19 +124,20 @@
<xbl:implementation><![CDATA[
({
conversation: null,
setConversation: function(aConversation) {
this.conversation = aConversation;
this.shadowTree.getElementById("oldestMessageDate").textContent =
makeTimestampFriendly(this.conversation.oldest);
var firstMessage = this.conversation.messages[0];
- this.shadowTree.getElementById("subject").textContent = firstMessage.subject();
- this.shadowTree.getElementById("snippet").textContent = firstMessage.bodySnippet();
- this.shadowTree.getElementById("author").textContent = firstMessage.from.name;
+ console.log("fetching info from first message", firstMessage);
+ this.shadowTree.getElementById("subject").textContent = firstMessage['subject'];
+ this.shadowTree.getElementById("snippet").textContent = firstMessage['body'];
+ //this.shadowTree.getElementById("author").textContent = firstMessage.from.name;
this.shadowTree.getElementById("date").textContent =
makeTimestampFriendly(firstMessage.timestamp);
var tagsNode = this.shadowTree.getElementById("tags");
ElementXBL.prototype.addBinding.call(tagsNode, "tags.xml#tags");
tagsNode.setMessage(firstMessage);
var replyNodes = this.shadowTree.getElementById("replies");
@@ -179,20 +180,20 @@
:bound-element > .author > .date:before { content: "\2014 "; }
:bound-element > .body { padding-left: 1em; color: #666; white-space: nowrap; overflow: hidden; }
]]></xbl:style>
</xbl:resources>
<xbl:implementation><![CDATA[
({
setMessage: function(aMessage) {
this.message = aMessage;
- this.shadowTree.getElementById("from").textContent = aMessage.from.name;
+ //this.shadowTree.getElementById("from").textContent = aMessage.from.name;
this.shadowTree.getElementById("date").textContent =
makeTimestampFriendly(aMessage.timestamp);
- this.shadowTree.getElementById("snippet").textContent = aMessage.bodySnippet();
+ this.shadowTree.getElementById("snippet").textContent = aMessage['body_preview'];
var tagsNode = this.shadowTree.getElementById("tags");
ElementXBL.prototype.addBinding.call(tagsNode, "tags.xml#tags");
tagsNode.setMessage(aMessage);
},
})
]]></xbl:implementation>
</xbl:binding>
@@ -203,18 +204,18 @@
<span id="subject"></span><br/>
<pre id="body"></pre>
</div>
</xbl:template>
<xbl:implementation><![CDATA[
({
setMessage: function(aMessage) {
this._message = aMessage;
- this.shadowTree.getElementById("subject").textContent = this._message.subject();
- this.shadowTree.getElementById("body").textContent = this._message.bodyPart.data;
+ this.shadowTree.getElementById("subject").textContent = this._message['subject'];
+ this.shadowTree.getElementById("body").textContent = this._message['body_preview'];
},
})
]]></xbl:implementation>
<xbl:handlers>
<xbl:handler event="click"><![CDATA[
]]></xbl:handler>
</xbl:handlers>
--- a/docs/INSTALL
+++ b/docs/INSTALL
@@ -1,41 +1,148 @@
-Installation Steps
-
-Have Python 2.5 or later installed.
+Late Breaking News/Known Problems
+=================================
- If not 2.6:
- install setup_tools
- easy_install simplejson
+* Error handling is poor. I'm trying to get a handle on the best way to
+ manage errors in a twisted environment before adding too much handling
+ which may end up inappropriate.
+
+Installation Steps
+==================
+
+Have Python 2.6 or later installed.
Prereqs:
easy_install couchdb (0.5 or later)
-Install couchdb trunk
+Install couchdb trunk. This currently means you need to google for
+instructions that work only on Linux - Windows doesn't work. If you want to
+work on Windows you must install Couch on a Linux box and point your windows
+~/.raindrop file at the server.
-Pull asuth's version of gocept-imapapi:
- hg clone http://clicky.visophyte.org/cgi-bin/hgwebdir.cgi/asuth/gocept-imapapi/
- cd gocept-imapapi
- python setup.py develop
+Install twisted.
-install junius:
+Install 'paisley' from launchpad.
+
+Either install junius:
cd junius/server/python/junius
python setup.py develop
+*or* just add it to your PYTHONPATH
+ set PYTHONPATH=c:\src\path\to\junius\server\python
-Configure IMAP account w/ some data in it, and edit bootstrap.py to point to it.
+install modules required for 'protocols' - note that these are technically
+optional - if the modules aren't installed a warning will be generated and
+the protocol will simply be disabled
+
+* imap - PyOpenSSL (link?)
+* skype - Skype4Py (link?)
+* twitter - python-twitter (link?)
+
+configure raindrop:
+ * edit ~/.raindrop
+
+ * if your 'local' couchdb isn't on localhost, add a section along the lines of:
+ [couch-local]
+ host=hostname
+ port=port
-With Couchdb running:
+ * Add imap accounts along the lines of
+ [account-some-account-name] # the 'account-' prefix is important!
+ kind=imap
+ host=imap.gmail.com
+ port=993
+ username=username@gmail.com
+ password=topsecret
+ ssl=True
+
+ * Add a 'test' account - for the 'test suite' - along the lines of:
+ [account-test]
+ kind=test
+ username=test # not used, but we die without it!
+ num_test_docs=1 # this defaults to 5 - but 1 is useful sometimes!
+
+** NOTE: if you edit this file, you must run 'run-raindrop.py install-accounts'
+to get the new data into the raindrop database (the accounts are automatically
+updated if the database is created, but we don't detect when the accounts
+change)
+
+
+With the couchdb server running....
-Setup the tables:
- python bootstrap.py
+Setup the database and upload views and other content:
+
+ % run-raindrop.py
+
+you should see output similar to:
+
+ INFO:model:created new database
+ INFO:junius.bootstrap:Adding account 'test'
+ INFO:junius.bootstrap:client files are different - updating doc
+ INFO:junius.bootstrap:design doc '_design/raindrop!accounts!all' has changed - updating
+ ...
+ Nothing left to do - terminating.
+
+
+Go to http://127.0.0.1:5984/_utils/index.html to check that the table(s) are
+setup
+
+Test everything using the 'test suite' (one day it *will* be a test suite :)
+
+ % run-raindrop.py -p test sync-messages process
-Go to
-to check that the tables are setup
+ THEN repeat the above - sorry about that - the first run didn't quite
+ do everything it was supposed to.
+
+ % run-raindrop.py -p test sync-messages process
+
+ Note the '-p' says to only load the 'test' protocol; without it we
+ will attempt to load all protocols (eg, imap, skype, twitter, etc).
+ But further note that we don't attempt a protocol - even the 'test'
+ protocol - until we have an account of that 'type' set up.
-Load the mail:
- python getmail.py (loads IMAP data)
+ Add '-l debug' to the above command-line for debug log messages. This
+ can get quite verbose; you can also say -l log.name=debug to set a specific
+ log to debug.
+
+ See --help for more.
+
+
+Get *real* messages:
+
+ % run-raindrop.py sync-messages
+ % run-raindrop.py process
(reload http://127.0.0.1:5984/_utils/index.html to see stuff in the messages view)
-Go to http://127.0.0.1:5984/junius/files/index.xhtml
+Go to http://127.0.0.1:5984/junius/files/index.xhtml and do autocomplete to do
+searches. If you have many documents, please be patient as the views are
+generated for the first time.
+
+You can edit any of the files in the 'schema' or 'client' directories, and
+when run-raindrop is next started, the changes to these files will be detected
+and they will be sent to the database. We don't detect the changes while we
+are running though, only at startup.
+
+Unprocessing:
+
+To delete all the intermediate messages in the DB, execute:
+
+ % run-raindrop.py unprocess
-and do autocomplete to do searches.
+The next time you execute a 'process' command, all your messages will be
+processed from the start.
+
+
+Error handling:
+If there is an error during a 'process' operation (ie, an exception in the
+converter), we will write an 'error document' and continue processing the
+work queue. We don't make a very loud noise at the end when this happens -
+you need to notice it in the log output as it is running. At any time you
+can execute:
+
+ % run-raindrop.py retry-errors
+
+To re-process those messages - but this will generally result in exactly the
+same error unless the converter has been fixed to deal with the condition
+causing the error. Alternatively, execute 'unprocess' to force reprocessing
+of all messages, not only those which previously caused an error.
+
new file mode 100644
--- /dev/null
+++ b/schema/accounts/all/all-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+ if (doc.type == "account") {
+ emit(null, doc);
+ }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/contacts/all/by_identity-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+ if (doc.type == "contact") {
+ for each (var identity in doc.identities) {
+ emit([identity.kind, identity.value], doc);
+ }
+ }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/contacts/all/by_suffix-map.js
@@ -0,0 +1,16 @@
+function(doc) {
+ if (doc.type == "contact") {
+ var i, suffix;
+ for (i = 0; i < doc.name.length; i++) {
+ suffix = doc.name.substring(i);
+ if (suffix && suffix[0] != " ")
+ emit(suffix, null);
+ }
+ for each (var identity in doc.identities) {
+ for (i = 0; i < identity.value.length; i++)
+ suffix = identity.value.substring(i);
+ if (suffix && suffix[0] != " ")
+ emit(suffix, null);
+ }
+ }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/conversations/by/by_involves-map.js
@@ -0,0 +1,6 @@
+function(doc) {
+ if (doc.type == "message") {
+ for each (var contact_id in doc.involves_contact_ids)
+ emit([contact_id, doc.timestamp], doc.conversation_id);
+ }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/conversations/by/by_mailing_list-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+ if (doc.type == 'message') {
+ if (doc.headers && doc.headers["List-Id"]) {
+ var parts = doc.headers["List-Id"].match(/[\W\w\s]*<(.+)>.*/);
+ emit([parts[1], doc.timestamp], doc.conversation_id);
+ }
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/conversations/by/by_tags-map.js
@@ -0,0 +1,6 @@
+function(doc) {
+ if (doc.type && doc.type=='anno/tags' && doc.tags) {
+ for (var i = 0; i < doc.tags.length; i++)
+ emit(doc.tags[i], doc.conversation_id || '');
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/mailing_lists/all/by_list_id-map.js
@@ -0,0 +1,18 @@
+function(doc) {
+ if (doc.type == 'message') {
+ if (doc.headers && doc.headers["List-Id"]) {
+ var parts = doc.headers["List-Id"].match(/([\W\w]*)\s*<(.+)>.*/);
+ var values = {
+ "List-Id" : doc.headers["List-Id"],
+ "id" : parts[2],
+ "name" : parts[1]
+ };
+ for each (var headerId in ["List-Post","List-Archive","List-Help",
+ "List-Subscribe","List-Unsubscribe"]) {
+ if (doc.headers[headerId])
+ values[headerId] = doc.headers[headerId];
+ }
+ emit(parts[2], values);
+ }
+ }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/mailing_lists/all/by_list_id-reduce.js
@@ -0,0 +1,10 @@
+function(keys, values, rereduce) {
+ var output = {};
+ output.count = values.length;
+ for (var idx in values) {
+ for (var elm in values[idx]) {
+ output[elm] = values[idx][elm];
+ }
+ }
+ return output;
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_conversation-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+ if (doc.type && doc.type=='anno/tags' && doc.conversation_id) {
+ var id = doc._id;
+ var emit_id = id.substr(0, id.indexOf('!')) + '!message';
+ emit(doc.conversation_id, emit_id);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_doc_type-map.js
@@ -0,0 +1,5 @@
+function(doc)
+{
+ if (doc.type)
+ emit(doc.type, doc._rev);
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_header_message_id-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+ if (doc.type == "message" && doc.subtype == "rfc822") {
+ emit(doc.header_message_id, null);
+ }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_storage-map.js
@@ -0,0 +1,10 @@
+// This is intended to be a generic view, usable by all accounts for all
+// *raw* message types.
+// Key is always: doc_subtype, account_id, storage_spec_key, and
+// 'startkey' and 'endkey' can be used to select specific message
+// subtypes for a particular account.
+// Storage key is any value which is determined by each message subtype.
+function(doc) {
+ if (doc.type == 'rawMessage' && doc.subtype && doc.account_id && doc.storage_key)
+ emit([doc.subtype, doc.account_id, doc.storage_key], null);
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/README.txt
@@ -0,0 +1,9 @@
+These are the views for the protocol implementations.
+
+Note that each protocol doesn't get its own design document - instead, the
+design documents are structured such a ones likely to be used together are
+grouped together, so they all get built at once.
+
+For example, the 'seen' design document includes one view per protocol - the
+expectation is in general, each protocol will need to process the same set of
+"new" documents to determine what they need to fetch.
new file mode 100644
--- /dev/null
+++ b/schema/proto/errors/errors-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+ if (doc.type=='core/error/msg')
+ // we use include_docs on this view, so get the ID magically...
+ emit(doc.raindrop_seq, null);
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/imap-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+ if (doc.type=='proto/imap') {
+ // imap uses a storage_key field which is already [foldername, uid]
+ // we include [flags, _rev] as the value so the protocol impl can see
+ // if things have changed.
+ emit(doc.storage_key, [doc.imap_flags, doc._rev]);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/skype-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+ if (doc.type && doc.type=='proto/skype-chat') {
+ emit([doc.skype_chatname, null], null);
+ } else if (doc.type && doc.type=='proto/skype-msg') {
+ emit([doc.skype_chatname, doc.skype_id], null);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/twitter-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+ if (doc.type=='proto/twitter') {
+ // Twitter uses an incrementing ID per user, and allows us to fetch
+ // only tweets since that ID. This allows us to use map/reduce to
+ // find the exact ID we need per user.
+ emit(doc.twitter_user.toString(), doc.twitter_id);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/twitter-reduce.js
@@ -0,0 +1,10 @@
+function(keys, values)
+{
+ // we just need the largest ID we've ever seen.
+ // *sob* - where is max()?
+ ret = 0;
+ for each (var v in values)
+ if (v > ret)
+ ret = v;
+ return ret;
+}
new file mode 100644
--- /dev/null
+++ b/schema/tags/all/all-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+ if (doc.type && doc.type=='anno/tags' && doc.tags) {
+ var id = doc._id;
+ var emit_id = id.substr(0, id.indexOf('!')) + '!message';
+ for (var i = 0; i < doc.tags.length; i++)
+ emit(doc.tags[i], null);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/schema/tags/all/all-reduce.js
@@ -0,0 +1,19 @@
+function(keys, values, rereduce) {
+ var keySet = {}, i, j;
+ if (!rereduce) {
+ for (i = 0; i < keys.length; i++)
+ keySet[keys[i][0]] = true;
+ }
+ else {
+ for (i = 0; i < values.length; i++) {
+ var inSet = values[i];
+ for (j = 0; j < inSet.length; j++)
+ keySet[inSet[j]] = true;
+ }
+ }
+ var out = [];
+ for (var key in keySet)
+ out.push(key);
+ out.sort();
+ return out;
+}
deleted file mode 100644
--- a/server/python/junius/getmail.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-
-import base64, datetime, email.utils, email.header
-import pprint
-
-from gocept.imapapi.account import Account
-
-import junius.model as model
-
-'''
-Fetch new mail.
-'''
-
-class JuniusAccount(object):
- def __init__(self, dbs, account_def):
- self.dbs = dbs
- self.account_def = account_def
- self.imap_account = Account(account_def.host, account_def.port,
- account_def.username, account_def.password,
- account_def.ssl)
-
- def sync(self):
- self.considerFolders(self.imap_account.folders)
-
- def considerFolders(self, folders):
- for folder in folders.values():
- self.syncFolder(folder)
- self.considerFolders(folder.folders)
-
- def syncFolder(self, folder):
- print '***** Sync-ing', folder.path
-
- folderStatus = self.account_def.folderStatuses.get(folder.path, {})
- #curValidity = folder._validity()
- #if curValidity != folderStatus['validity']:
- # pass
-
- # -- find out about what message id's we already know about
- # this is really just the most primitive synchronization logic
- # available! but that's okay.
- known_uids = set()
- startkey=[self.account_def.id, folder.path, 0]
- endkey=[self.account_def.id, folder.path, 4000000000]
- for row in model.Message.by_storage(self.dbs.messages, startkey=startkey, endkey=endkey).rows:
- known_uids.add(row.key[2])
-
- processed = 0
- skipped = 0
- for message in folder.messages.values():
- uid = int(message.UID)
- if uid not in known_uids:
- try:
- self.grok_message(message)
- except:
- print "ERROR groking messages"
- # http://trac.mozillamessaging.com/raindrop/ticket/3
- processed += 1
- else:
- skipped += 1
- print ' processed', processed, 'skipped', skipped
-
- def grok_email_addresses(self, *address_strings):
- seen_contacts = {}
- result_lists = []
- involved_list = []
- for address_string in address_strings:
- cur_results = []
- cur_addresses = email.utils.getaddresses((address_string,))
- for name, address in cur_addresses:
- # XXX TODO: we can use 'keys' instead of just key.
- contacts = model.Contact.by_identity(self.dbs.contacts,
- key=['email', address])
- if len(contacts):
- # the contact exists, use it
- contact = list(contacts)[0]
- if contact.id in seen_contacts:
- contact = seen_contacts[contact.id]
- else:
- involved_list.append( (address,contact) )
- seen_contacts[contact.id] = contact
- else:
- # the contact does't exist, create it
- if not name:
- name = address
- else:
- try:
- pieces = email.header.decode_header(name)
- encoded_pieces = [piece.decode(encoding or "utf-8") for piece, encoding in pieces]
- name = u"".join(encoded_pieces)
- except (LookupError, UnicodeError):
- name = u""+name
-
- contact = model.Contact(
- name=name,
- identities=[{'kind': 'email', 'value': address}]
- )
- contact.store(self.dbs.contacts)
- involved_list.append( (address,contact) )
- seen_contacts[contact.id] = contact
- cur_results.append( (address,contact) )
- result_lists.append(cur_results)
- result_lists.append(involved_list)
- return result_lists
-
- def extract_message_id(self, message_id_string, acceptNonDelimitedReferences):
- # this is a port of my fix for bug 466796, the comments should be ported
- # too if we keep this logic...
- whitespaceEndedAt = None
- firstMessageIdChar = None
- foundLessThan = False
- message_len = len(message_id_string)
- i = 0
- while i < message_len:
- char = message_id_string[i]
- # do nothing on whitespace
- if char in r' \r\n\t':
- pass
- else:
- if char == '<':
- i += 1 # skip over the '<'
- firstMessageIdChar = i
- foundLessThan = True
- break
- if whitespaceEndedAt is None:
- whitespaceEndedAt = i
- i += 1
-
- # if we hit a '<', keep going until we hit a '>' or the end
- if foundLessThan:
- while i < message_len:
- char = message_id_string[i]
- if char == '>':
- # it's valid, update reference, making sure to stop before the '>'
- return [message_id_string[firstMessageIdChar:i],
- message_id_string[i+1:]]
- i += 1
-
- # if we are at the end of the string, we found some non-whitespace,
- # and the caller requested that we accept non-delimited whitespace,
- # give them that as their reference. (otherwise, leave it empty)
- if acceptNonDelimitedReferences and whitespaceEndedAt:
- return [message_id_string[whitespaceEndedAt:], '']
- return [None, '']
-
- def extract_message_ids(self, message_id_string):
- references = []
- while message_id_string:
- ref, message_id_string = self.extract_message_id(message_id_string,
- not references)
- if ref:
- references.append(ref)
- return references
-
- def grok_message_conversation(self, imsg):
- self_header_message_id = imsg.headers['Message-Id'][1:-1]
- refs_str = imsg.headers.get('References') or imsg.headers.get('In-Reply-To') or ''
- conversation_id = None
- conversations = {}
- self_message = None
- header_message_ids = self.extract_message_ids(refs_str)
- unseen = set(header_message_ids)
-
- # save off the list of referenced messages
- references = header_message_ids[:]
- # see if the self-message already exists...
- header_message_ids.append(self_header_message_id)
-
- messages = model.Message.by_header_id(self.dbs.messages,
- keys=header_message_ids)
- for message in messages:
- if message.header_message_id == self_header_message_id:
- self_message = message
- else:
- unseen.remove(message.header_message_id)
- conversation_id = message.conversation_id
-
- if conversation_id is None:
- # we need to allocate a conversation_id...
- conversation_id = self_header_message_id
-
- # create dudes who are missing
- if unseen:
- missing_messages = []
- for header_message_id in unseen:
- missing_messages.append(model.Message(
- conversation_id=conversation_id,
- header_message_id=header_message_id,
- ))
- self.dbs.messages.update(missing_messages)
-
- return conversation_id, self_message, references
-
- def grok_message(self, imsg):
- attachments = {}
- bodyPart = self.grok_part(imsg, imsg.body, attachments)
-
- # XXX the gocept header logic unfortunately is case-sensitive...
- # XXX also, doesn't support repeated values...
- # (but we can live with these limitations for now)
-
- from_contacts, to_contacts, cc_contacts, involves_contacts = self.grok_email_addresses(
- imsg.headers.get('From', ''), imsg.headers.get('To', ''),
- imsg.headers.get('Cc', ''))
-
- conversation_id, existing_message, references = self.grok_message_conversation(imsg)
-
- timestamp = email.utils.mktime_tz(email.utils.parsedate_tz(imsg.headers['Date']))
-
- cmsg = model.Message(
- account_id=self.account_def.id,
- storage_path=imsg.parent.path,
- storage_id=int(imsg.UID),
- #
- conversation_id=conversation_id,
- header_message_id=imsg.headers.get('Message-Id')[1:-1],
- references=references,
- #
- from_contact_id=from_contacts[0][1].id,
- to_contact_ids=[c.id for k,c in to_contacts],
- cc_contact_ids=[c.id for k,c in cc_contacts],
- involves_contact_ids=[c.id for k,c in involves_contacts],
- #
- from_contact=dict([ (from_contacts[0][1].id, dict([ ("name",from_contacts[0][1].name),
- ("email",from_contacts[0][0]) ]) ) ]),
- to_contacts=dict([ (c.id , dict([("name" , c.name),("email",k)]) ) for k,c in to_contacts]),
- cc_contacts=dict([ (c.id , dict([("name" , c.name),("email",k)]) ) for k,c in cc_contacts]),
- involves_contacts=dict([ (c.id , dict([("name" , c.name),("email",k)]) ) for k,c in involves_contacts]),
- #
- date=datetime.datetime.utcfromtimestamp(timestamp),
- timestamp=timestamp,
- #
- read=r'\Seen' in imsg.flags,
- #
- headers=dict(imsg.headers),
- bodyPart=bodyPart,
- _attachments=attachments
- )
-
- if existing_message:
- cmsg.id = existing_message.id
- # this is ugly, we should really just have the logic above use a
- # style that allows it to work with new or existing...
- cmsg._data['_rev'] = existing_message.rev
-
- cmsg.store(self.dbs.messages)
-
-
- def grok_part(self, msg, part, attachments, depth=0):
- contentType = part['content_type']
- partNumber = part['partnumber']
- me = {'contentType': contentType,
- 'partNumber': partNumber}
- if contentType.startswith('multipart/'):
- parts = me['parts'] = []
- for subpart in part.parts:
- parts.append(self.grok_part(msg, subpart, attachments, depth+1))
- else:
- me['parameters'] = part['parameters']
- data = part.fetch()
- # XXX perhaps we should recursively process the nested part dude?
- # (if contentType == 'message/rfc822')
- if contentType.startswith('text/'):
- me['data'] = data
- else:
- attachments[partNumber] = {'content_type': contentType,
- 'data': base64.b64encode(data)}
- return me
-
-class Grabber(object):
- def __init__(self, dbs):
- self.dbs = dbs
-
- def syncAccounts(self):
- for account in model.Account.all(self.dbs.accounts):
- if account.kind == 'imap':
- junius_account = JuniusAccount(self.dbs, account)
- junius_account.sync()
-
-if __name__ == '__main__':
- import os
- #acct = JuniusAccount('localhost', 8143, os.environ['USER'], 'pass')
- dbs = model.fab_db()
- grabber = Grabber(dbs)
- grabber.syncAccounts()
deleted file mode 100644
--- a/server/python/junius/getskype.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#!/usr/bin/env python
-'''
-Fetch skype contacts and chats.
-'''
-
-import base64, datetime, email.utils
-import pprint
-import time
-import re
-from urllib2 import urlopen
-
-import Skype4Py
-global skype
-import junius.model as model
-
-
-
-class SkypeAccount(object):
- def __init__(self, dbs, account_def, skype):
- self.dbs = dbs
- self.account_def = account_def
- self.skype = skype
- self.re_tags = re.compile(r'#(\w+)')
- self._knownContactsByHandle= {}
-
- def create_account_if_necessary(self):
- self.author = self.create_contact_if_necessary()
-
- def create_contact_if_necessary(self, handle=None):
- #print "skype_user = " + skype_user.FullName
- if handle in self._knownContactsByHandle:
- return self._knownContactsByHandle[handle]
- if not handle:
- skype_user = self.skype.CurrentUser
- handle = skype_user.Handle
- else:
- if isinstance(handle, Skype4Py.user.IUser):
- skype_user = handle
- else:
- print "getting user for handle", handle
- skype_user = skype.User(handle)
- contacts = model.Contact.by_identity(self.dbs.contacts,
- key=['skype', skype_user.Handle])
-
- if len(contacts) == 0:
- # the contact does't exist, create it
-
- attachments = {}
- #fname = '/tmp/foo.png' # os.getcwd() + '/' + 'skypeavatar' + skype_user.Handle + '.jpg'
- #open(fname,'w').close();
- #skype_user.SaveAvatarToFile(fname) - fails, don't know why.
- # XXXX - skype_user.SaveAvatarToFile -> get the image!
- if False: ######
- try:
- response = urlopen(account.profile_image_url)
- attachments['default'] = { 'content_type': response.info()['Content-Type'],
- 'data': base64.b64encode(response.read()) }
- except:
- pass
-
- identities = [{'kind': 'skype', 'value': skype_user.Handle}]
- if skype_user.Homepage:
- identities.append({'kind': 'url' , 'value' : skype_user.Homepage })
-
- contact = model.Contact(
- name=skype_user.FullName,
- identities=identities,
- #location=account.location, xxxxxx ???????? what is this?
- _attachments=attachments
- )
- #print "XXX adding contact: ", skype_user.FullName
-
- contact.store(self.dbs.contacts)
- else:
- contact = [model.Contact.load(self.dbs.contacts,contact.value['_id']) for contact in contacts.rows][0]
-
- self._knownContactsByHandle[handle] = contact
- return contact
-
- def sync(self):
-
- print '***** Finding existing messages:',
- # -- find out about what message id's we already know about
- # this is really just the most primitive synchronization logic
- # available! but that's okay.
- known_uids = set()
- for row in model.Message.by_header_id(self.dbs.messages).rows:
- known_uids.add(row.key)
-
- print len(known_uids)
- processed = 0
- skipped = 0
-
- print '***** Fetching skype messages'
- for i, chat in enumerate(self.skype.Chats):
- messages = chat.Messages
- involves = self.grok_involves(self.author, chat)
- print "chat %d of %d '%s' (%d messages, %d contacts)" % \
- (i, len(self.skype.Chats), chat.Name, len(messages), len(involves))
-
- for msg in messages:
- if str(msg.Id) not in known_uids:
- self.grok_message(self.author, chat, msg, involves)
- processed += 1
- else:
- skipped += 1
- print ' processed', processed, 'skipped', skipped
-
- print '***** Fetching contacts'
-
- for f in self.skype.Friends:
- self.create_contact_if_necessary(f)
-
- print 'synchronization of contacts completed'
-
-
- def grok_message(self, author, chat, msg, involves):
- # Its possible we should make the first message in a chat the "parent"
- # and have all other messages reference that parent??
-
- # The status of a message includes ones that only make sense for sent
- # (eg, sending, sent) as well as for received. An explicit status of
- # 'read' exists - so we assume if it is 'received' it isn't read yet.
- is_read = msg.Status != Skype4Py.cmsReceived
- msgauthor = self.create_contact_if_necessary(msg.FromHandle)
- cmsg = model.Message(
- account_id=self.account_def.id,
- # Is this expected to be a real URL?
- storage_path='http://skype.com/%s' % (chat.Name,),
- storage_id=str(msg.Id),
- #
- conversation_id=chat.Name.replace('/', '').replace('#', ''),
- header_message_id=msg.Id,
- references=[], # should reference the 'parent' (ie, the chat iself?)
- # XXX - fixup 'from' and 'to' handling.
- from_contact_id=str(msgauthor.id),
- from_contact={ str(msgauthor.id) : { "name" : msg.FromDisplayName} },
- to_contact_ids=[],
- cc_contact_ids=[],
- involves_contact_ids=[involved for involved in involves],
- involves_contacts=involves,
- date=msg.Datetime,
- timestamp=time.mktime(msg.Datetime.timetuple()),
- #
- read=is_read,
- tags=self.re_tags.findall(msg.Body),
- headers={ "Subject" : chat.FriendlyName },
- bodyPart={"data":msg.Body, "contentType":"text/plain"},
- _attachments={}
- )
-
- cmsg.store(self.dbs.messages)
-
- def grok_involves(self, author, chat):
- involves = { author.id : { 'name' : author.name } }
- for m in chat.Members:
- account = self.create_contact_if_necessary(m)
- user = skype.User(m.Handle)
- involves[account.id] = { 'username' : m.Handle,
- 'name' : user.FullName}
- return involves
-
-
-class Grabber(object):
- def __init__(self, dbs):
- self.dbs = dbs
-
- def syncAccounts(self):
- global skype
- skype = Skype4Py.Skype()
- print "attaching to skype..."
- skype.Attach()
- print "Synching..."
- for account in model.Account.all(self.dbs.accounts):
- if account.kind == 'skype':
- # XXX - something needs to check we are currently logged
- # in to this account.
- junius_account = SkypeAccount(self.dbs, account, skype)
- junius_account.create_account_if_necessary();
- junius_account.sync()
-
-if __name__ == '__main__':
- import os
- dbs = model.fab_db()
- grabber = Grabber(dbs)
- grabber.syncAccounts()
deleted file mode 100644
--- a/server/python/junius/gettweets.py
+++ /dev/null
@@ -1,196 +0,0 @@
-#!/usr/bin/env python
-
-import base64, datetime, email.utils
-import pprint
-import re
-import sys
-from urllib2 import urlopen, HTTPError
-
-import twitter
-
-import junius.model as model
-
-'''
-Fetch contacts, and tweets.
-'''
-
-
-class JuniusAccount(object):
- def __init__(self, dbs, account_def):
- self.dbs = dbs
- self.account_def = account_def
- self.twitter_account = twitter.Api(username=account_def.username, password=account_def.password)
- self.re_involves = re.compile(r'@(\w+)')
- self.re_tags = re.compile(r'#(\w+)')
- self._contactsByHandle = {}
-
- # -- find out about what message id's we already know about
- # this is really just the most primitive synchronization logic
- # available! but that's okay.
- self.known_uids = set()
- #startkey=0
- #endkey=4000000000
- #for row in model.Message.by_header_id(self.dbs.messages, startkey=startkey, endkey=endkey).rows:
- ## XXX optimize this to only look for tweets from this user
- for row in model.Message.by_header_id(self.dbs.messages).rows:
- self.known_uids.add(row.key)
- try:
- self.twitter_user = self.twitter_account.GetUser(self.account_def.username)
- except HTTPError, e:
- print e.url
- raise e
- self.author = self.create_contact_if_necessary(self.account_def.username)
- self.sync(self.twitter_user)
-
- def create_contact_if_necessary(self, username):
- if username in self._contactsByHandle:
- return self._contactsByHandle[username]
- print "looking for contact with username ", username
- contacts = model.Contact.by_identity(self.dbs.contacts,
- key=['twitter', username])
- try:
- twitter_account = self.twitter_account.GetUser(username)
- except HTTPError, e:
- if e.code == 404:
- return None
- raise e
-
- if len(contacts) == 0:
- # the contact does't exist, create it
-
- attachments = {}
- if twitter_account.profile_image_url:
- try:
- response = urlopen(twitter_account.profile_image_url)
- attachments['default'] = { 'content_type': response.info()['Content-Type'],
- 'data': base64.b64encode(response.read()) }
- except:
- pass
-
- identities = [{'kind': 'twitter', 'value': username }]
- if twitter_account.url:
- identities.append({'kind': 'url' , 'value' : twitter_account.url })
-
- contact = model.Contact(
- name=twitter_account.name,
- identities=identities,
- location=twitter_account.location,
- _attachments=attachments
- )
-
- contact.store(self.dbs.contacts)
- else:
- contact = [model.Contact.load(self.dbs.contacts,contact.value['_id']) for contact in contacts.rows][0]
- self._contactsByHandle[username] = contact
- return contact
-
- def find_contacts(self):
- """for all of my contacts w/ email addresses, look for twitter accounts,
- and create them if they are found"""
- twitter_contacts = model.Contact.by_identity(self.dbs.contacts, startkey=['twitter', ''],
- endkey=['twitter', 'ZZZZ'])
- twitter_ids = {}
- for contact in twitter_contacts.rows:
- for identity in contact.value['identities']:
- twitter_ids[identity['value']] = contact
-
- contacts = model.Contact.by_identity(self.dbs.contacts, startkey=['email', ''],
- endkey=['email', 'ZZZZ'])
- print "contacts =", contacts
- for c in contacts.rows:
- for identity in c.value['identities']:
- email = identity['value']
- print "looking for twitter account for " + email,
- try:
- twitteruser = self.twitter_account.GetUserByEmail(email)
- except HTTPError, e:
- if e.code == 404:
- print "nope"
- continue
- raise e
- print '=>', twitteruser.id
- if twitteruser.screen_name not in twitter_ids:
- contact = self.create_contact_if_necessary(twitteruser.screen_name)
-# if contact: # this will easily get past the twitter API hourly limits.
-# self.sync(twitteruser)
-
- sys.exit(0)
-
- def sync(self, twitter_account):
- print '***** Fetching tweets for', twitter_account.screen_name
- processed = 0
- skipped = 0
-
- for message in self.twitter_account.GetUserTimeline(twitter_account.screen_name):
- if str(message.id) not in self.known_uids:
- self.grok_message(self.author, message)
- processed += 1
- else:
- skipped += 1
- print ' processed', processed, 'skipped', skipped
-
-
-
- def grok_message(self, author, imsg):
- involves = self.grok_involves(author, imsg)
- print "grokking message from " + author.name
- cmsg = model.Message(
- account_id=self.account_def.id,
- storage_path='http://twitter.com/' + imsg.user.screen_name + '/status/' + str(imsg.id),
- storage_id=str(imsg.id),
- #
- conversation_id=imsg.GetInReplyToStatusId() if imsg.GetInReplyToStatusId() else str(imsg.id),
- header_message_id=str(imsg.id),
- references=[],
- #
- from_contact_id=str(author.id),
- from_contact={ str(author.id) : { "name" : author.name } },
- to_contact_ids=[],
- cc_contact_ids=[],
- involves_contact_ids=[involved for involved in involves],
- involves_contacts=involves,
- #
- date=datetime.datetime.utcfromtimestamp(imsg.created_at_in_seconds),
- timestamp=int(imsg.created_at_in_seconds),
- #
- read=False,
- tags=self.re_tags.findall(imsg.text),
- headers={ "Subject" : "" },
- bodyPart={"data":imsg.text, "contentType":"text/plain"},
- _attachments={}
- )
-
- cmsg.store(self.dbs.messages)
-
- def grok_involves(self, author, imsg):
- involves = { author.id : { 'name' : author.name } }
- usernames = self.re_involves.findall(imsg.text)
- for username in usernames:
- account = self.create_contact_if_necessary(username)
- if account:
- involves[account.id] = { 'username' : username,
- 'name' : account.name }
- return involves
-
-class Grabber(object):
- def __init__(self, dbs):
- self.dbs = dbs
-
- def syncAccounts(self):
- for account in model.Account.all(self.dbs.accounts):
- if account.kind == 'twitter':
- junius_account = JuniusAccount(self.dbs, account)
-
- for friend in junius_account.twitter_account.GetFriends():
- junius_account.create_contact_if_necessary(friend.screen_name)
- print 'got friend accounts'
-
-
- junius_account.find_contacts()
-
-
-if __name__ == '__main__':
- import os
- dbs = model.fab_db()
- grabber = Grabber(dbs)
- grabber.syncAccounts()
rename from server/python/junius/__init__.py
rename to server/python/raindrop/__init__.py
rename from server/python/junius/bootstrap.py
rename to server/python/raindrop/bootstrap.py
--- a/server/python/junius/bootstrap.py
+++ b/server/python/raindrop/bootstrap.py
@@ -1,159 +1,268 @@
#!/usr/bin/env python
'''
Setup the CouchDB server so that it is fully usable and what not.
'''
-import model
-
import sys
+import twisted.web.error
+from twisted.internet import defer
import os, os.path, mimetypes, base64, pprint
-
-import junius.model as model
-
-def setup_account(dbs):
- if len(model.Account.all(dbs.accounts)):
- print 'Account presumably already exists, not adding it!'
- return
-
- # we want a file of the form:
- # hostname,port,username,password,ssl?
- # example:
- # mail.example.com,993,bob@example.com,sekret,True
- import os, os.path
- configPath = os.path.join(os.environ['HOME'], ".junius")
- f = open(configPath, 'r')
- data = f.read()
- f.close()
- host, portstr, username, password, sslstr = data.split(',')
- ssl = not (sslstr.strip().lower() in ['false', 'f', 'no', '0'])
-
- account = model.Account(
- kind='imap', host=host, port=int(portstr), ssl=ssl,
- username=username, password=password,
- )
- account.store(dbs.accounts)
+import model
+import hashlib
-def setup_twitter_account(dbs):
- # we want a file of the form:
- # username,password
- # example:
- # davidascher,sekret
- import os, os.path
- configPath = os.path.join(os.environ['HOME'], ".junius_twitter")
- if not os.path.isfile(configPath):
- print "Skipping twitter - no config file '%s'" % (configPath,)
- return
- f = open(configPath, 'r')
- data = f.read()
- f.close()
- username, password = data.strip().split(',')
-
- accts = [acct for acct in model.Account.all(dbs.accounts) if acct.kind=='twitter' and acct.username==username]
- if accts and len(accts) > 0:
- print 'Twitter account for %s already exist, not automatically adding a new one!' % username
- return
-
- account = model.Account(
- kind='twitter', username=username, password=password,
- )
- account.store(dbs.accounts)
+from .config import get_config
+from .model import get_db
-def setup_skype_account(dbs):
- # For now we just use whoever is currently logged into skype (but we
- # probably do want a config file and should skip things if that user
- # isn't current.)
-
- # XXX - paramaterized views?
- accts = [acct for acct in model.Account.all(dbs.accounts) if acct.kind=='skype']
- if accts:
- print 'Skype accounts for %r already exist, not automatically adding a new one!' % \
- [acct.username for a in accts]
- return
-
- try:
- import Skype4Py
- except ImportError:
- print "skipping skype as the Skype4Py module isn't installed"
- else:
- skype = Skype4Py.Skype()
- skype.Timeout = 10000 # default of 30 seconds is painful...
- print 'Connecting to Skype (please check skype; you may need to authorize us)'
- try:
- skype.Attach()
- except Skype4Py.SkypeAPIError, exc:
- print 'Failed to attach to Skype (is it installed and running? Is authorization pending?)'
- print ' error was:', exc
- return
- print 'Creating skype account for current user', skype.CurrentUser.Handle
- account = model.Account(
- kind='skype', username=skype.CurrentUser.Handle,
- )
- account.store(dbs.accounts)
+import logging
+logger = logging.getLogger(__name__)
def path_part_nuke(path, count):
for i in range(count):
path = os.path.dirname(path)
return path
FILES_DOC = 'files' #'_design/files'
-def install_client_files(dbs):
- '''
- cram everyone in 'client' into the 'junius' app database
+# Updating design documents when not necessary can be expensive as all views
+# in that doc are reset. So we've created a helper - a simple 'fingerprinter'
+# which creates a dict, each entry holding the finterprint of a single item
+# (usually a file), used when files on the file-system are the 'source' of a
+# couchdb document. The 'fingerprint' is stored with the document, so later we
+# can build the fingerprint of the file-system, and compare them to see if we
+# need to update the document. Simple impl - doesn't use the 'stat' of the
+# file at all, but always calculates the md5 checksum of the content.
+class Fingerprinter:
+ def __init__(self):
+ self.fs_hashes = {}
+
+ def get_finger(self, filename):
+ # it doesn't make sense to calc a file's fingerprint twice
+ assert filename not in self.fs_hashes
+ ret = self.fs_hashes[filename] = hashlib.md5()
+ return ret
+
+ def get_prints(self):
+ return dict((n,h.hexdigest()) for (n, h) in self.fs_hashes.iteritems())
+
+
+def install_client_files(whateva, options):
'''
- if FILES_DOC in dbs.junius:
- print 'Design doc already exists, will be updating/overwriting files'
- design_doc = dbs.junius[FILES_DOC]
- attachments = design_doc['_attachments'] = {}
- else:
- design_doc = {}
+ cram everyone in 'client' into the app database
+ '''
+ d = get_db()
+
+ def _opened_ok(doc):
+ logger.debug("document '%(_id)s' already exists at revision %(_rev)s",
+ doc)
+ return doc
+
+ def _open_not_exists(failure, *args, **kw):
+ failure.trap(twisted.web.error.Error)
+ if failure.value.status != '404': # not found.
+ failure.raiseException()
+ return {} # return an empty doc.
+
+ def _maybe_update_doc(design_doc):
+ fp = Fingerprinter()
attachments = design_doc['_attachments'] = {}
-
- # we cannot go in a zipped egg...
- junius_root_dir = path_part_nuke(model.__file__, 4)
- client_dir = os.path.join(junius_root_dir, 'client')
- print 'listing contents of', client_dir
-
- for filename in os.listdir(client_dir):
- path = os.path.join(client_dir, filename)
- if filename.endswith("~") or filename.startswith("."): continue;
- if os.path.isfile(path):
- f = open(path, 'rb')
- ct = mimetypes.guess_type(filename)[0]
- if ct is None and sys.platform=="win32":
- # A very simplistic check in the windows registry.
- import _winreg
- try:
- k = _winreg.OpenKey(_winreg.HKEY_CLASSES_ROOT,
- os.path.splitext(filename)[1])
- ct = _winreg.QueryValueEx(k, "Content Type")[0]
- except EnvironmentError:
- pass
- assert ct, "can't guess the content type for '%s'" % filename
- attachments[filename] = {
- 'content_type': ct,
- 'data': base64.b64encode(f.read())
- }
- f.close()
- print 'filename "%s" (%s)' % (filename, ct)
-
- dbs.junius[FILES_DOC] = design_doc
+ # we cannot go in a zipped egg...
+ root_dir = path_part_nuke(model.__file__, 4)
+ client_dir = os.path.join(root_dir, 'client')
+ logger.debug("listing contents of '%s' to look for client files", client_dir)
+
+ for filename in os.listdir(client_dir):
+ path = os.path.join(client_dir, filename)
+ if os.path.isfile(path):
+ f = open(path, 'rb')
+ ct = mimetypes.guess_type(filename)[0]
+ if ct is None and sys.platform=="win32":
+ # A very simplistic check in the windows registry.
+ import _winreg
+ try:
+ k = _winreg.OpenKey(_winreg.HKEY_CLASSES_ROOT,
+ os.path.splitext(filename)[1])
+ ct = _winreg.QueryValueEx(k, "Content Type")[0]
+ except EnvironmentError:
+ pass
+ assert ct, "can't guess the content type for '%s'" % filename
+ data = f.read()
+ fp.get_finger(filename).update(data)
+ attachments[filename] = {
+ 'content_type': ct,
+ 'data': base64.b64encode(data)
+ }
+ f.close()
+ logger.debug("filename '%s' (%s)", filename, ct)
+ new_prints = fp.get_prints()
+ if options.force or design_doc.get('fingerprints') != new_prints:
+ logger.info("client files are different - updating doc")
+ design_doc['fingerprints'] = new_prints
+ return d.saveDoc(design_doc, FILES_DOC)
+ logger.debug("client files are identical - not updating doc")
+ return None
+
+ defrd = d.openDoc(FILES_DOC)
+ defrd.addCallbacks(_opened_ok, _open_not_exists)
+ defrd.addCallback(_maybe_update_doc)
+ return defrd
-def main():
- import sys
- if 'nuke' in sys.argv:
- print 'NUKING DATABASE!!!'
- model.nuke_db()
+def install_accounts(whateva):
+ db = get_db()
+ config = get_config()
+
+ def _opened_ok(doc):
+ logger.info("account '%(_id)s' already exists, will be updating existing account",
+ doc)
+ return doc
+
+ def _open_not_exists(failure, doc_id, *args, **kw):
+ failure.trap(twisted.web.error.Error)
+ if failure.value.status != '404': # not found.
+ failure.raiseException()
+ return {'_id': doc_id} # return an empty doc for the account.
+
+ def _update_acct(doc, info):
+ logger.debug("updating %s with %s", doc, info)
+ doc.update(info)
+ doc['type'] = 'account'
+ return db.saveDoc(doc, doc['_id'])
+
+ def _open_doc(whateva, key):
+ return db.openDoc(key)
+
+ d = defer.Deferred()
+
+ for acct_name, acct_info in config.accounts.iteritems():
+ acct_id = acct_info['_id']
+ logger.info("Adding account '%s'", acct_id)
+ d.addCallback(_open_doc, acct_id)
+ d.addCallbacks(_opened_ok, _open_not_exists, errbackArgs=(acct_id,))
+ d.addCallback(_update_acct, acct_info)
+
+ # Start the chain and return.
+ d.callback(None)
+ return d
+
- dbs = model.fab_db(update_views='updateviews' in sys.argv)
+# Functions working with design documents holding views.
+def install_views(whateva, options):
+ def _doc_not_found(failure):
+ return None
+
+ def _got_existing_docs(results, docs):
+ put_docs = []
+ for (whateva, existing), doc in zip(results, docs):
+ if existing:
+ assert existing['_id']==doc['_id']
+ assert '_rev' not in doc
+ if not options.force and \
+ doc['fingerprints'] == existing.get('fingerprints'):
+ logger.debug("design doc %r hasn't changed - skipping",
+ doc['_id'])
+ continue
+ existing.update(doc)
+ doc = existing
+ logger.info("design doc %r has changed - updating", doc['_id'])
+ put_docs.append(doc)
+ return get_db().updateDocuments(put_docs)
+
+ schema_src = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ "../../../schema"))
+
+ docs = [d for d in generate_view_docs_from_filesystem(schema_src)]
+ logger.debug("Found %d documents in '%s'", len(docs), schema_src)
+ assert docs, 'surely I have *some* docs!'
+ # ack - I need to open existing docs first to get the '_rev' property.
+ dl = []
+ for doc in docs:
+ deferred = get_db().openDoc(doc['_id']).addErrback(_doc_not_found)
+ dl.append(deferred)
+
+ return defer.DeferredList(dl
+ ).addCallback(_got_existing_docs, docs)
+
- setup_account(dbs)
- setup_twitter_account(dbs)
- setup_skype_account(dbs)
- install_client_files(dbs)
-
+def _build_views_doc_from_directory(ddir):
+ # all we look for is the views.
+ ret = {}
+ fprinter = Fingerprinter()
+ ret_views = ret['views'] = {}
+ # The '-map.js' file is the 'trigger' for creating a view...
+ tail = "-map.js"
+ rtail = "-reduce.js"
+ files = os.listdir(ddir)
+ for f in files:
+ fqf = os.path.join(ddir, f)
+ if f.endswith(tail):
+ view_name = f[:-len(tail)]
+ try:
+ with open(fqf) as f:
+ data = f.read()
+ ret_views[view_name] = {'map': data}
+ fprinter.get_finger(view_name+tail).update(data)
+ except (OSError, IOError):
+ logger.warning("can't open map file %r - skipping this view", fqf)
+ continue
+ fqr = os.path.join(ddir, view_name + rtail)
+ try:
+ with open(fqr) as f:
+ data = f.read()
+ ret_views[view_name]['reduce'] = data
+ fprinter.get_finger(view_name+rtail).update(data)
+ except (OSError, IOError):
+ # no reduce - no problem...
+ logger.debug("no reduce file %r - skipping reduce for view '%s'",
+ fqr, view_name)
+ else:
+ # avoid noise...
+ if not f.endswith(rtail) and not f.startswith("."):
+ logger.info("skipping non-map/reduce file %r", fqf)
-if __name__ == '__main__':
- main()
+ ret['fingerprints'] = fprinter.get_prints()
+ logger.debug("Document in directory %r has views %s", ddir, ret_views.keys())
+ if not ret_views:
+ logger.warning("Document in directory %r appears to have no views", ddir)
+ return ret
+
+
+def generate_view_docs_from_filesystem(root):
+ # We use the same file-system layout as 'CouchRest' does:
+ # http://jchrisa.net/drl/_design/sofa/_show/post/release__couchrest_0_9_0
+ # note however that we don't create a design documents in exactly the same
+ # way - the view is always named as specified, and currently no 'map only'
+ # view is created (and if/when it is, only it will have a "special" name)
+ # See http://groups.google.com/group/raindrop-core/web/maintaining-design-docs
+
+ # This is pretty dumb (but therefore simple).
+ # root/* -> directories used purely for a 'namespace'
+ # root/*/* -> directories which hold the contents of a document.
+ # root/*/*-map.js and maybe *-reduce.js -> view content with name b4 '-'
+ logger.debug("Starting to build design documents from %r", root)
+ for top_name in os.listdir(root):
+ fq_child = os.path.join(root, top_name)
+ if not os.path.isdir(fq_child):
+ logger.debug("skipping non-directory: %s", fq_child)
+ continue
+ # so we have a 'namespace' directory.
+ num_docs = 0
+ for doc_name in os.listdir(fq_child):
+ fq_doc = os.path.join(fq_child, doc_name)
+ if not os.path.isdir(fq_doc):
+ logger.info("skipping document non-directory: %s", fq_doc)
+ continue
+ # have doc - build a dict from its dir.
+ doc = _build_views_doc_from_directory(fq_doc)
+ # XXX - note the artificial 'raindrop' prefix - the intent here
+ # is that we need some way to determine which design documents we
+ # own, and which are owned by extensions...
+ # XXX - *sob* - and that we shouldn't use '/' in the doc ID at the
+ # moment (well - we probably could if we ensured we quoted all the
+ # '/' chars, but that seems too much burden for no gain...)
+ doc['_id'] = '_design/' + ('!'.join(['raindrop', top_name, doc_name]))
+ yield doc
+ num_docs += 1
+
+ if not num_docs:
+ logger.info("skipping sub-directory without child directories: %s", fq_child)
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/config.py
@@ -0,0 +1,74 @@
+import ConfigParser, logging, os, os.path
+
+__all__ = ['get_config']
+
+class Config(object):
+ COUCH_DEFAULTS = {'host': 'localhost', 'port': 5984, 'name': 'raindrop'}
+ def __init__(self):
+ self.parser = ConfigParser.SafeConfigParser()
+
+ self.couches = {'local': self.COUCH_DEFAULTS.copy()}
+ self.accounts = {}
+
+ # XXX - this seems wrong: most of the time the 'config' - particularly the
+ # list of accounts etc - will come from the DB. The config file should only
+ # be used during bootstrapping.
+ self.load()
+
+
+ def dictifySection(self, section_name, defaults=None, name=None):
+ '''
+ Given a config section name, suck up its contents into a dictionary. Poor
+ man's type detection turns lowercase true/false into the boolean of that
+ type, things that can be int()ed into ints, and otherwise things get to
+ stay strings. Defaults are applied before dictification, and the name is
+ an optional default for 'name' if specified (which overrides the defaults
+ dict.)
+ '''
+ results = {}
+ if defaults:
+ results.update(defaults)
+ if name:
+ results['name'] = name
+ for name, value in self.parser.items(section_name):
+ if value.lower() in ('true', 'false'):
+ value = (value.lower() == 'true')
+ else:
+ try:
+ value = int(value)
+ except:
+ pass
+
+ results[name] = value
+ return results
+
+ def load(self):
+ self.parser.read([os.path.expanduser('~/.raindrop')])
+
+ COUCH_PREFIX = 'couch-'
+ ACCOUNT_PREFIX = 'account-'
+ for section_name in self.parser.sections():
+ if section_name.startswith(COUCH_PREFIX):
+ couch_name = section_name[len(COUCH_PREFIX):]
+ self.couches[couch_name] = self.dictifySection(section_name,
+ self.COUCH_DEFAULTS)
+
+ if section_name.startswith(ACCOUNT_PREFIX):
+ account_name = section_name[len(ACCOUNT_PREFIX):]
+ acct = self.accounts[account_name] = \
+ self.dictifySection(section_name, None, account_name)
+ if 'id' in acct:
+ acct['_id'] = acct['id']
+ del acct['id']
+ else:
+ acct['_id'] = acct['username']
+
+ self.local_couch = self.couches['local']
+ self.remote_couch = self.couches.get('remote') # may be None
+
+CONFIG = None
+def get_config():
+ global CONFIG
+ if CONFIG is None:
+ CONFIG = Config()
+ return CONFIG
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core extensions
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core message processor extensions.
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/email.py
@@ -0,0 +1,16 @@
+# This is an extension which converts a message/raw/email to a 'message'
+import logging
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class EmailConverter(base.ConverterBase):
+ def convert(self, doc):
+ # for now, the email repr has all we need.
+ ret = doc.copy()
+ for n in ret.keys():
+ if n.startswith('_') or n.startswith('raindrop'):
+ del ret[n]
+ del ret['type']
+ return ret
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/message.py
@@ -0,0 +1,30 @@
+# This is an extension which hacks some annotations to a 'message', creating
+# an annotated message.
+import re
+import logging
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+# *sob* this re still fails to get unicode.
+# | >>> re.compile(r"[^\w]+", re.UNICODE).split(u"\xa9opyright me")
+# | [u'', u'opyright', u'me']
+re_tags = re.compile(r"[^\w]+", re.UNICODE)
+
+
+class MessageAnnotator(base.ConverterBase):
+ def convert(self, doc):
+ # for now, if a 'proto' couldn't detect tags by itself, all words in
+ # the body become tags.
+ try:
+ tags = doc['tags']
+ except KeyError:
+ bits = re_tags.split(doc['body'])
+ tags = list(set(b.lower() for b in bits if len(b)>3))
+ conversation_id = doc.get('conversation_id')
+ if conversation_id is None:
+ conversation_id = doc.get('subject')
+ return {'tags': tags,
+ 'conversation_id' : conversation_id,
+ }
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/rfc822.py
@@ -0,0 +1,101 @@
+# This is an extension which converts a message/raw/rfc822 to a
+# message/raw/rfc822
+from __future__ import absolute_import # stop 'email' import finding our ext
+
+import logging
+from email import message_from_string
+from email.utils import mktime_tz, parsedate_tz
+from twisted.internet import defer
+
+from ...proc import base
+
+
+logger = logging.getLogger(__name__)
+
+
+def _safe_convert_bytes(val, charset):
+ # Convert a byte string to *some* unicode object, ignoring (but logging)
+ # unicode errors we may see in the wild...
+ # TODO: This sucks; we need to mimick what firefox does in such cases...
+ charset = charset or 'ascii'
+ try:
+ ret = val.decode(charset)
+ except UnicodeError, exc:
+ logger.error("Failed to decode mail from %r: %s", charset, exc)
+ # no charset failed to decode as declared - try utf8
+ try:
+ ret = val.decode('utf-8')
+ except UnicodeError, exc:
+ logger.error("Failed to fallback decode mail from utf8: %s", exc)
+ ret = val.decode('utf-8', 'ignore')
+ return ret
+
+
+# an 'rfc822' message stores the unpacked version of the rfc822 stream, a-la
+# the interface provided by the 'email' package. IOW, we never bother storing
+# the raw stream, just a 'raw' unpacked version of it.
+# This helper function takes a raw rfc822 string and returns a 'document'
+# suitable for storing as an rfc822 message.
+def doc_from_bytes(b):
+ msg = message_from_string(b)
+ doc = {}
+ mp = doc['multipart'] = msg.is_multipart()
+ headers = doc['headers'] = {}
+ # Given we have no opportunity to introduce an object which can ignore
+ # the case of headers, we lowercase the keys
+ # We can see all kinds of stuff in the wild. It's not uncommon to see
+ # ascii body but a utf8 name encoded in the header. Its also possible
+ # to see a header encoded the same as the declared content.
+ # Who wins? We try the content encoding first then fallback to utf.
+ charset = msg.get_charset() or msg.get_content_charset()
+ for hn, hv in msg.items():
+ headers[hn.lower()] = _safe_convert_bytes(hv, charset)
+
+ # XXX - technically msg objects are recursive; handling that requires
+ # more thought. For now, assume they are flat.
+ # Unlikely, but if we *aren't* text based also return as attachments.
+ if mp or msg.get_content_maintype() != 'text':
+ # a multi-part message - flatten it here by walking the list, but
+ # only looking at the 'leaf' nodes.
+ attachments = doc['_attachments'] = {}
+ i = 1
+ for attach in msg.walk():
+ if not attach.is_multipart():
+ name = attach.get_filename()
+ if not name:
+ name = "subpart %d" % i
+ attachments[name] = {'content_type': attach.get_content_type(),
+ 'data': attach.get_payload(decode=True),
+ }
+ i += 1
+ else:
+ body_bytes = msg.get_payload(decode=True)
+ body = _safe_convert_bytes(body_bytes, msg.get_content_charset())
+ doc['body'] = body
+ return doc
+
+
+class RFC822Converter(base.ConverterBase):
+ def convert(self, doc):
+ # a 'rfc822' stores 'headers' as a dict
+ headers = doc['headers']
+ # for now, 'from' etc are all tuples of [identity_type, identity_id]
+ # XXX - todo - find one of the multi-part bits to use as the body.
+ try:
+ body = doc['body']
+ except KeyError:
+ assert doc['multipart']
+ body = 'This is a multipart message - todo - find the body!'
+ ret = {'from': ['email', headers['from']],
+ 'subject': headers['subject'],
+ 'body': body,
+ 'body_preview': body[:128], # good enuf for now...
+ }
+ try:
+ dval = headers['Date']
+ except KeyError:
+ pass
+ else:
+ if dval:
+ ret['timestamp'] = mktime_tz(parsedate_tz(dval))
+ return ret
rename from server/python/junius/model.py
rename to server/python/raindrop/model.py
--- a/server/python/junius/model.py
+++ b/server/python/raindrop/model.py
@@ -1,323 +1,463 @@
-import os, os.path
-from couchdb import schema, design
+import sys
+import logging
+import time
+from urllib import urlencode, quote
+import base64
+
+import twisted.web.error
+from twisted.internet import defer
+from twisted.python.failure import Failure
-class WildField(schema.Field):
- '''
- Allows us to have dictionaries without schemas.
- '''
- def _to_python(self, value):
- return value
-
- def _to_json(self, value):
- return value
+try:
+ import simplejson as json
+except ImportError:
+ import json # Python 2.6
+
+import paisley
+from .config import get_config
+
+
+config = get_config()
+
+class _NotSpecified:
+ pass
-class Account(schema.Document):
- kind = schema.TextField()
- host = schema.TextField(default='')
- port = schema.IntegerField(default=0)
- username = schema.TextField()
- password = schema.TextField()
- ssl = schema.BooleanField(default=False)
-
- folderStatuses = WildField(default={})
-
- # could we just do _all_docs? I don't want the damn design docs though...
- # (ironically, this is the first one :)
- all = schema.View('all', '''\
- function(doc) {
- emit(null, doc);
- }''')
+logger = logging.getLogger('model')
+
+DBs = {}
+
+# XXXX - this relies on couch giving us some kind of 'sequence id'. For
+# now we use a timestamp, but that obviously sucks in many scenarios.
+if sys.platform == 'win32':
+ # woeful resolution on windows and equal timestamps are bad.
+ clock_start = time.time()
+ time.clock() # time.clock starts counting from zero the first time its called.
+ def get_seq():
+ return clock_start + time.clock()
+else:
+ get_seq = time.time
+
+
+def encode_proto_id(proto_id):
+ # a 'protocol' gives us a 'blob' used to identify the document; we create
+ # a real docid from that protocol_id; we base64-encode what was given to
+ # us to avoid the possibility of a '!' char, and also to better accomodate
+ # truly binary strings (eg, a pickle or something bizarre)
+ return base64.encodestring(proto_id).replace('\n', '')
+
-class Contact(schema.Document):
- name = schema.TextField()
- identities = schema.ListField(schema.DictField(schema.Schema.build(
- kind = schema.TextField(),
- value = schema.TextField()
- )))
- location = schema.TextField()
- _attachments = WildField(default={})
- all = schema.View('contacts', '''\
- function(doc) {
- emit(doc._id, null);
- }''')
- #: expose contacts by their identities
- by_identity = schema.View('contacts', '''\
- function(doc) {
- for each (var identity in doc.identities) {
- emit([identity.kind, identity.value], doc);
- }
- }''')
- #: expose all suffixes of the contact name and identity values
- by_suffix = schema.View('contact_ids', '''\
- function(doc) {
- var i;
- for (i = 0; i < doc.name.length; i++)
- emit(doc.name.substring(i), null);
- for each (var identity in doc.identities) {
- for (i = 0; i < identity.value.length; i++)
- emit(identity.value.substring(i), null);
- }
- }''', include_docs=True)
- #: expose contacts with pictures
- with_pictures = schema.View('contacts', '''\
- function(doc) {
- if (doc._attachments['default'])
- emit(doc._id, null);
- }''')
+# from the couchdb package; not sure what makes these names special...
+def _encode_options(options):
+ retval = {}
+ for name, value in options.items():
+ if name in ('key', 'startkey', 'endkey', 'include_docs') \
+ or not isinstance(value, basestring):
+ value = json.dumps(value, allow_nan=False, ensure_ascii=False)
+ retval[name] = value
+ return retval
+
+
+class CouchDB(paisley.CouchDB):
+ def postob(self, uri, ob):
+ # This seems to not use keep-alives etc where using twisted.web
+ # directly does?
+ body = json.dumps(ob, allow_nan=False,
+ ensure_ascii=False).encode('utf-8')
+ return self.post(uri, body)
+
+ #def openView(self, *args, **kwargs):
+ # paisley doesn't handle encoding options...
+ #return super(CouchDB, self).openView(*args, **_encode_options(kwargs)
+ # )
+ # Ack - couch 0.9 view syntax...
+ def openView(self, dbName, docId, viewId, **kwargs):
+ #uri = "/%s/_view/%s/%s" % (dbName, docId, viewId)
+ uri = "/%s/_design/%s/_view/%s" % (dbName, docId, viewId)
+
+ if kwargs:
+ uri += "?%s" % (urlencode(_encode_options(kwargs)),)
+
+ return self.get(uri
+ ).addCallback(self.parseResult)
+
+
+ def openDoc(self, dbName, docId, revision=None, full=False, attachment=""):
+ # paisley appears to use an old api for attachments?
+ if attachment:
+ uri = "/%s/%s/%s" % (dbName, docId, attachment)
+ return self.get(uri)
+ return super(CouchDB, self).openDoc(dbName, docId, revision, full)
+
+ # This is a potential addition to the paisley API; It is hard to avoid
+ # a hacky workaround due to the use of 'partial' in paisley...
+ def saveAttachment(self, dbName, docId, name, data,
+ content_type="application/octet-stream",
+ revision=None):
+ """
+ Save/create an attachment to a document in a given database.
+
+ @param dbName: identifier of the database.
+ @type dbName: C{str}
+
+ @param docId: the identifier of the document.
+ @type docId: C{str}
-class Message(schema.Document):
- account_id = schema.TextField()
- storage_path = schema.TextField()
- storage_id = schema.IntegerField()
-
- conversation_id = schema.TextField()
- header_message_id = schema.TextField()
- references = WildField()
-
- # canonical contacts
- from_contact_id = schema.TextField()
- to_contact_ids = schema.ListField(schema.TextField())
- cc_contact_ids = schema.ListField(schema.TextField())
- # convenience contacts with enough semantics to not just map it (for now)
- involves_contact_ids = schema.ListField(schema.TextField())
+ #param name: name of the attachment
+ @type name: C{str}
+
+ @param body: content of the attachment.
+ @type body: C{sequence}
+
+ @param content_type: content type of the attachment
+ @type body: C{str}
- # actual contact objects, a little duplication; but for good, not evil
- from_contact = WildField()
- to_contacts = WildField(default={})
- cc_contacts = WildField(default={})
- involves_contacts = WildField(default={})
-
- date = schema.DateTimeField()
- timestamp = schema.IntegerField()
-
- # general attribute info...
- read = schema.BooleanField()
-
- # user-added meta-information
- tags = WildField()
-
- headers = WildField()
- bodyPart = WildField()
- _attachments = WildField(default={})
+ @param revision: if specified, the revision of the attachment this
+ is updating
+ @type revision: C{str}
+ """
+ # Responses: ???
+ # 409 Conflict, 500 Internal Server Error
+ url = "/%s/%s/%s" % (dbName, docId, name)
+ if revision:
+ url = url + '?rev=' + revision
+ # *sob* - and I can't use put as it doesn't allow custom headers :(
+ # and neither does _getPage!!
+ # ** start of self._getPage clone setup...** (plus an import or 2...)
+ from twisted.web.client import HTTPClientFactory
+ kwargs = {'method': 'PUT',
+ 'postdata': data}
+ kwargs["headers"] = {"Accept": "application/json",
+ "Content-Type": content_type,
+ }
+ factory = HTTPClientFactory(url, **kwargs)
+ from twisted.internet import reactor
+ reactor.connectTCP(self.host, self.port, factory)
+ d = factory.deferred
+ # ** end of self._getPage clone **
+ d.addCallback(self.parseResult)
+ return d
- # -- conversation views
- # no ghosts!
- conversation_info = schema.View('conversations', '''\
- function(doc) {
- if (doc.timestamp)
- emit(doc.conversation_id,
- {oldest: doc.timestamp, newest: doc.timestamp, count: 1,
- involves: doc.involves_contact_ids});
- }''', '''\
- function(keys, values, rereduce) {
- out = values[0];
- out_involves = {};
- function involve_fuse(l) {
- for (var il = 0; il < l.length; il++)
- out_involves[l[il]] = true;
- }
- involve_fuse(out.involves);
- for (var i = 1; i < values.length; i++) {
- var cur = values[i];
- if (cur.oldest < out.oldest)
- out.oldest = cur.oldest;
- if (cur.newest > out.newest)
- out.newest = cur.newest;
- out.count += cur.count;
- involve_fuse(cur.involves);
- }
- out.involves = [];
- for (var contact_id in out_involves)
- out.involves.push(contact_id);
- return out;
- }''', group=True, group_level=1)
- # no ghosts!
- by_conversation = schema.View('by_conversation', '''\
- function(doc) {
- if (doc.timestamp)
- emit(doc.conversation_id, null);
- }''', include_docs=True)
+ def updateDocuments(self, dbName, user_docs):
+ # update/insert/delete multiple docs in a single request using
+ # _bulk_docs
+ # from couchdb-python.
+ docs = []
+ for doc in user_docs:
+ if isinstance(doc, dict):
+ docs.append(doc)
+ elif hasattr(doc, 'items'):
+ docs.append(dict(doc.items()))
+ else:
+ raise TypeError('expected dict, got %s' % type(doc))
+ url = "/%s/_bulk_docs" % dbName
+ body = json.dumps({'docs': docs})
+ return self.post(url, body
+ ).addCallback(self.parseResult
+ )
+
+ def listDocsBySeq(self, dbName, reverse=False, startKey=0, limit=-1, **kw):
+ """
+ List all documents in a given database by the document's sequence number
+ """
+ # Response:
+ # {"total_rows":1597,"offset":0,"rows":[
+ # {"id":"test","key":1,"value":{"rev":"4104487645"}},
+ # {"id":"skippyhammond","key":2,"value":{"rev":"121469801"}},
+ # ...
+ uri = "/%s/_all_docs_by_seq" % (dbName,)
+ args = {}
+ if reverse:
+ args["reverse"] = "true"
+ if startKey > 0:
+ args["startkey"] = int(startKey)
+ if limit >= 0:
+ args["limit"] = int(limit)
+ # suck the rest of the kwargs in
+ args.update(_encode_options(kw))
+ if args:
+ uri += "?%s" % (urlencode(args),)
+ return self.get(uri
+ ).addCallback(self.parseResult)
+
+ # Hack so our new bound methods work.
+ def bindToDB(self, dbName):
+ super(CouchDB, self).bindToDB(dbName)
+ partial = paisley.partial # it works hard to get this!
+ for methname in ["saveAttachment", "updateDocuments",
+ "listDocsBySeq"]:
+ method = getattr(self, methname)
+ newMethod = partial(method, dbName)
+ setattr(self, methname, newMethod)
+
+
+# XXX - get_db should die as a global/singleton - only our DocumentModel
+# instance should care about that. Sadly, bootstrap.py is a (small; later)
+# problem here...
+def get_db(couchname="local", dbname=_NotSpecified):
+ dbinfo = config.couches[couchname]
+ if dbname is _NotSpecified:
+ dbname = dbinfo['name']
+ key = couchname, dbname
+ try:
+ return DBs[key]
+ except KeyError:
+ pass
+ logger.info("Connecting to couchdb at %s", dbinfo)
+ db = CouchDB(dbinfo['host'], dbinfo['port'], dbname)
+ DBs[key] = db
+ return db
- # -- message (id) views
- # ghosts are okay!
- by_header_id = schema.View('by_header_id', '''\
- function(doc) {
- emit(doc.header_message_id, null);
- }''', include_docs=True)
+def quote_id(doc_id):
+ # A '/' should be impossible now we base64 encode the string given
+ # by an extension - but it doesn't hurt.
+ # Note the '!' character seems to work fine with couch (ie, we use it
+ # unquoted when constructing views), so we allow that for no better
+ # reason than the logs etc are clearer...
+ return quote(doc_id, safe="!")
+
+class DocumentModel(object):
+ """The layer between 'documents' and the 'database'. Responsible for
+ creating the unique ID for each document (other than the raw document),
+ for fetching documents based on an ID, etc
+ """
+ def __init__(self, db):
+ self.db = db
+
+ @classmethod
+ def build_docid(cls, category, doc_type, provider_id):
+ """Builds a docid from (category, doc_type, provider_id)
+
+ The exact order of the fields may depend on who can best take
+ advantage of a specific order, so is subject to change in the
+ prototype. This method gives the code a consistent orderindg
+ regardless of the actual impl."""
+ return "!".join([category, provider_id, doc_type])
+
+ @classmethod
+ def split_docid(cls, docid):
+ """Splits a docid into (category, doc_type, provider_id)
+
+ Is likely to raise ValueError on an 'invalid' docid"""
+ cat, prov_id, doc_type = docid.split("!")
+ return cat, doc_type, prov_id
+
+ def open_view(self, *args, **kwargs):
+ # A convenience method for completeness so consumers don't hit the
+ # DB directly (and to give a consistent 'style'). Is it worth it?
+ return self.db.openView(*args, **kwargs)
- # no ghosts!
- by_timestamp = schema.View('by_timestamp', '''\
- function(doc) {
- if (doc.timestamp)
- emit(doc.timestamp, null);
- }''', include_docs=True)
+ def open_attachment(self, doc_id, attachment, **kw):
+ """Open an attachment for the given docID. As this is generally done
+ when processing the document itself, so the raw ID of the document
+ itself is known. For this reason, a docid rather than the parts is
+ used.
+
+ Unlike open_document, this never returns None, but raises an
+ exception if the attachment doesn't exist.
+ """
+ logger.debug("attempting to open attachment %s/%s", doc_id, attachment)
+ return self.db.openDoc(quote_id(doc_id), attachment=attachment, **kw)
+
+ def open_document(self, category, proto_id, ext_type, **kw):
+ """Open the specific document, returning None if it doesn't exist"""
+ docid = self.build_docid(category, ext_type, encode_proto_id(proto_id))
+ return self.open_document_by_id(docid, **kw)
+
+ def open_document_by_id(self, doc_id, **kw):
+ """Open a document by the already constructed docid"""
+ logger.debug("attempting to open document %r", doc_id)
+ return self.db.openDoc(quote_id(doc_id), **kw
+ ).addBoth(self._cb_doc_opened)
+
+ def _cb_doc_opened(self, result):
+ if isinstance(result, Failure):
+ result.trap(twisted.web.error.Error)
+ if result.value.status != '404': # not found
+ result.raiseException()
+ result = None # indicate no doc exists.
+ logger.debug("no document of that ID exists")
+ else:
+ logger.debug("opened document %(_id)r at revision %(_rev)s",
+ result)
+ return result
+
+ def _prepare_raw_doc(self, account, docid, doc, doc_type):
+ assert docid
+ assert doc_type
+ assert '_id' not in doc, doc # that isn't how you specify the ID.
+ doc['_id'] = docid
+ assert 'raindrop_account' not in doc, doc # we look after that!
+ doc['raindrop_account'] = account.details['_id']
+
+ assert 'type' not in doc, doc # we look after that!
+ doc['type'] = doc_type
+
+ assert 'raindrop_seq' not in doc, doc # we look after that!
+ doc['raindrop_seq'] = get_seq()
- # the key includes the timestamp so we can use it to limit our queries plus
- # pick up where we left off if we need to page/chunk.
- # we expose the conversation id as the value because set intersection
- # on a conversation-basis demands it, and it would theoretically be too
- # expensive to just return the whole document via include_docs.
- # (no ghosts!)
- by_involves = schema.View('by_involves', '''\
- function(doc) {
- for each (var contact_id in doc.involves_contact_ids)
- emit([contact_id, doc.timestamp], doc.conversation_id);
- }''')
-
- # -- user provided meta-info junk
- tagmap_func = '''\
- function(doc) {
- if (doc.tags) {
- for (var i = 0; i < doc.tags.length; i++)
- emit([doc.tags[i], doc.timestamp], doc.conversation_id);
- }
- }'''
- by_tags = schema.View('by_tags', tagmap_func)
-
- # by reusing tagmap_func, we are able to consume its output from the
- # previous view without introducing additional storage needs
- all_tags = schema.View('tags', tagmap_func, '''\
- function(keys, values, rereduce) {
- var keySet = {}, i, j;
- if (!rereduce) {
- for (i = 0; i < keys.length; i++)
- keySet[keys[i][0][0]] = true;
- }
- else {
- for (i = 0; i < values.length; i++) {
- var inSet = values[i];
- for (j = 0; j < inSet.length; j++)
- keySet[inSet[j]] = true;
- }
- }
- var out = [];
- for (var key in keySet)
- out.push(key);
- out.sort();
- return out;
- }''', group=False, group_level=0)
-
- # -- storage info views
- # so, this key is theoretically just wildly expensive
- # no ghosts!
- by_storage = schema.View('by_storage', '''\
- function(doc) {
- if (doc.timestamp)
- emit([doc.account_id, doc.storage_path, doc.storage_id], null);
- }''', include_docs=False)
-
- by_mailing_list = schema.View('by_list_id', '''\
- function(doc) {
- if (doc.headers && doc.headers["List-Id"]) {
- var parts = doc.headers["List-Id"].match(/([\\W\\w]*)\\s*<(.+)>.*/);
- var values = {"List-Id" : doc.headers["List-Id"],
- "id" : parts[2],
- "name" : parts[1] };
- for each (var headerId in ["List-Post","List-Archive","List-Help",
- "List-Subscribe","List-Unsubscribe"]) {
- if (doc.headers[headerId])
- values[headerId] = doc.headers[headerId];
- }
- emit(parts[2], values);
- }
- }''', '''\
- function(keys, values, rereduce) {
- var output = {};
- output.count = values.length;
- for (var idx in values) {
- for (var elm in values[idx]) {
- output[elm] = values[idx][elm];
- }
- }
- return output;
- }''', include_docs=False, group=True, group_level=1)
+ def create_raw_documents(self, account, doc_infos):
+ """Entry-point to create raw documents. The API reflects that
+ creating multiple documents in a batch is faster; if you want to
+ create a single doc, just put it in a list
+ """
+ docs = []
+ dids = [] # purely for the log :(
+ logger.debug('create_raw_documents preparing %d docs', len(doc_infos))
+ for (proto_id, doc, doc_type) in doc_infos:
+ docid = self.build_docid('msg', doc_type, encode_proto_id(proto_id))
+ self._prepare_raw_doc(account, docid, doc, doc_type)
+ # XXX - this hardcoding is obviously going to be a problem when
+ # we need to add 'contacts' etc :(
+ docs.append(doc)
+ dids.append(docid)
+ attachments = self._prepare_attachments(docs)
+ logger.debug('create_raw_documents saving docs %s', dids)
+ return self.db.updateDocuments(docs
+ ).addCallback(self._cb_saved_docs, attachments
+ )
+
+ def _prepare_attachments(self, docs):
+ # called internally when creating a batch of documents. Returns a list
+ # of attachments which should be saved separately.
+
+ # The intent is that later we can optimize this - if the attachment
+ # is small, we can keep it in the document base64 encoded and save
+ # a http connection. For now though we just do all attachments
+ # separately.
+
+ # attachment processing still need more thought - ultimately we need
+ # to be able to 'push' them via a generator or similar to avoid
+ # reading them entirely in memory. Further, we need some way of the
+ # document knowing if the attachment failed (or vice-versa) given we
+ # have no transactional semantics.
+ all_attachments = []
+ for doc in docs:
+ assert '_id' in doc # should have called prepare_ext_document!
+ try:
+ all_attachments.append(doc['_attachments'])
+ # nuke attachments specified
+ del doc['_attachments']
+ except KeyError:
+ # It is important we put 'None' here so the list of
+ # attachments is exactly parallel with the list of docs.
+ all_attachments.append(None)
+ assert len(all_attachments)==len(docs)
+ return all_attachments
+
+ def prepare_ext_document(self, prov_id, doc_type, doc):
+ """Called by extensions to setup the raindrop maintained attributes
+ of the documents, including the document ID
+ """
+ assert '_id' not in doc, doc # We manage IDs for all but 'raw' docs.
+ assert 'type' not in doc, doc # we manage this too!
+ assert 'raindrop_seq' not in doc, doc # we look after that!
+ doc['raindrop_seq'] = get_seq()
+ doc['type'] = doc_type
+ doc['_id'] = self.build_docid("msg", doc['type'], prov_id)
- by_list_id = schema.View('by_mailing_list', '''\
- function(doc) {
- if (doc.headers && doc.headers["List-Id"]) {
- var parts = doc.headers["List-Id"].match(/[\\W\\w\\s]*<(.+)>.*/);
- emit([parts[1], doc.timestamp], doc.conversation_id);
- }
- }''', include_docs=True)
-
-DATABASES = {
- # the app database proper, no real data
- 'junius': None,
- #
- 'accounts': Account,
- 'contacts': Contact,
- 'messages': Message,
-}
+ def create_ext_documents(self, docs):
+ for doc in docs:
+ assert '_id' in doc # should have called prepare_ext_document!
+ attachments = self._prepare_attachments(docs)
+ # save the document.
+ logger.debug('saving %d extension documents', len(docs))
+ return self.db.updateDocuments(docs,
+ ).addCallback(self._cb_saved_docs, attachments
+ )
-AVOID_REPLICATING = {
- 'accounts': 'Private info perhaps',
-}
+ def _cb_saved_docs(self, result, attachments):
+ # result: [{'rev': 'xxx', 'id': '...'}, ...]
+ logger.debug("saved %d documents", len(result))
+ ds = []
+ for dinfo, dattach in zip(result, attachments):
+ if dattach:
+ ds.append(self._cb_save_attachments(dinfo, dattach))
+ def return_orig(ignored_result):
+ return result
+ # XXX - the result set will *not* have the correct _rev if there are
+ # attachments :(
+ return defer.DeferredList(ds
+ ).addCallback(return_orig)
-class DBS(object):
- def __init__(self, server):
- self.server = server
-
-DEFAULT_COUCH_SERVER = 'http://localhost:5984/'
+ def _cb_save_attachments(self, saved_doc, attachments):
+ if not attachments:
+ return saved_doc
+ # Each time we save an attachment the doc gets a new revision number.
+ # So we need to do them in a chain, passing the result from each to
+ # the next.
+ remaining = attachments.copy()
+ # This is recursive, but that should be OK.
+ return self._cb_save_next_attachment(saved_doc, remaining)
-def get_remote_host_info():
- remoteinfo_path = os.path.join(os.environ['HOME'], '.junius.remoteinfo')
-
- if os.path.exists(remoteinfo_path):
- f = open(remoteinfo_path, 'r')
- data = f.read()
- f.close()
- info = data.strip()
- if info[-1] != '/':
- info += '/'
- return info
- else:
- raise Exception("You need a ~/.junius.remoteinfo file")
+ def _cb_save_next_attachment(self, result, remaining):
+ if not remaining:
+ return result
+ revision = result['rev']
+ docid = result['id']
+ name, info = remaining.popitem()
+ logger.debug('saving attachment %r to doc %r', name, docid)
+ return self.db.saveAttachment(quote_id(docid),
+ quote_id(name), info['data'],
+ content_type=info['content_type'],
+ revision=revision,
+ ).addCallback(self._cb_saved_attachment, (docid, name)
+ ).addErrback(self._cb_save_failed, (docid, name)
+ ).addCallback(self._cb_save_next_attachment, remaining
+ )
-def get_local_host_info():
- localinfo_path = os.path.join(os.environ['HOME'], '.junius.localinfo')
- if os.path.exists(localinfo_path):
- f = open(localinfo_path, 'r')
- data = f.read()
- f.close()
- info = data.strip()
- if info[-1] != '/':
- info += '/'
- return info
- else:
- return DEFAULT_COUCH_SERVER
-
+ def _cb_saved_attachment(self, result, ids):
+ logger.debug("Saved attachment %s", result)
+ # XXX - now what?
+ return result
+
+ def _cb_save_failed(self, failure, ids):
+ logger.error("Failed to save attachment (%r): %s", ids, failure)
+ failure.raiseException()
+
+
+_doc_model = None
+
+def get_doc_model():
+ global _doc_model
+ if _doc_model is None:
+ _doc_model = DocumentModel(get_db())
+ return _doc_model
def nuke_db():
- import couchdb
- server = couchdb.Server(get_local_host_info())
+ couch_name = 'local'
+ db = get_db(couch_name, None)
+ dbinfo = config.couches[couch_name]
- for dbname in DATABASES.keys():
- if dbname in server:
- print "!!! Deleting database", dbname
- del server[dbname]
+ def _nuke_failed(failure, *args, **kwargs):
+ if failure.value.status != '404':
+ failure.raiseException()
+ logger.info("DB doesn't exist!")
+
+ def _nuked_ok(d):
+ logger.info("NUKED DATABASE!")
+
+ deferred = db.deleteDB(dbinfo['name'])
+ deferred.addCallbacks(_nuked_ok, _nuke_failed)
+ return deferred
+
-def fab_db(update_views=False):
- import couchdb
- server = couchdb.Server(get_local_host_info())
-
- dbs = DBS(server)
-
- for db_name, doc_class in DATABASES.items():
- if not db_name in server:
- print 'Creating database', db_name
- db = server.create(db_name)
- update_views = True
- else:
- db = server[db_name]
-
- if update_views and doc_class:
- print 'Updating views'
- views = [getattr(doc_class, k) for k, v in doc_class.__dict__.items() if isinstance(v, schema.View)]
- print 'Views:', views
- if views:
- design.ViewDefinition.sync_many(db, views)
+def fab_db(whateva):
+ couch_name = 'local'
+ db = get_db(couch_name, None)
+ dbinfo = config.couches[couch_name]
- setattr(dbs, db_name, db)
+ def _create_failed(failure, *args, **kw):
+ failure.trap(twisted.web.error.Error)
+ if failure.value.status != '412': # precondition failed.
+ failure.raiseException()
+ logger.info("couch database %(name)r already exists", dbinfo)
+ return False
- return dbs
+ def _created_ok(d):
+ logger.info("created new database")
+ return True
+
+ return db.createDB(dbinfo['name']
+ ).addCallbacks(_created_ok, _create_failed
+ )
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/pipeline.py
@@ -0,0 +1,379 @@
+""" This is the raindrop pipeline; it moves messages from their most raw
+form to their most useful form.
+"""
+from twisted.internet import defer, task
+from twisted.python.failure import Failure
+import logging
+
+logger = logging.getLogger(__name__)
+
+# Simple forward chaining.
+chain = [
+ # from_type, to_type, transformer)
+ ('proto/test', 'raw/message/rfc822',
+ 'raindrop.proto.test.TestConverter'),
+ # skype goes directly to 'message' for now...
+ ('proto/skype-msg', 'message',
+ 'raindrop.proto.skype.SkypeConverter'),
+ # skype-chat is 'terminal' for now.
+ ('proto/skype-chat', None, None),
+ ('proto/imap', 'raw/message/rfc822',
+ 'raindrop.proto.imap.IMAPConverter'),
+ ('proto/twitter', 'message',
+ 'raindrop.proto.twitter.TwitterConverter'),
+ ('raw/message/rfc822', 'raw/message/email',
+ 'raindrop.ext.message.rfc822.RFC822Converter'),
+ ('raw/message/email', 'message',
+ 'raindrop.ext.message.email.EmailConverter'),
+ ('message', 'anno/tags',
+ 'raindrop.ext.message.message.MessageAnnotator'),
+ # anno/tags is 'terminal'
+ ('anno/tags', None, None),
+]
+
+
+class Pipeline(object):
+ def __init__(self, doc_model, options):
+ self.doc_model = doc_model
+ self.options = options
+ # use a cooperator to do the work via a generator.
+ # XXX - should we own the cooperator or use our parents?
+ self.coop = task.Cooperator()
+ self.forward_chain = {}
+ for from_type, to_type, xformname in chain:
+ if xformname:
+ root, tail = xformname.rsplit('.', 1)
+ try:
+ mod = __import__(root, fromlist=[tail])
+ klass = getattr(mod, tail)
+ inst = klass(doc_model)
+ except:
+ logger.exception("Failed to load extension %r", xformname)
+ else:
+ self.forward_chain[from_type] = (to_type, inst)
+ else:
+ assert not to_type, 'no xformname must mean no to_type'
+ self.forward_chain[from_type] = None
+
+ def unprocess(self):
+ # A bit of a hack that will suffice until we get better dependency
+ # management. Determines which doc-types are 'derived', then deletes
+ # them all.
+ def delete_a_doc(doc, rid):
+ if doc is None:
+ logger.debug("can't delete document %r - it doesn't exist", rid)
+ return None
+ else:
+ logger.info("Deleting document %(_id)r (rev %(_rev)s)", doc)
+ return self.doc_model.db.deleteDoc(doc['_id'], doc['_rev'])
+
+ def delete_docs(result, doc_type):
+ docs = []
+ to_del = [(row['id'], row['value']) for row in result['rows']]
+ for id, rev in to_del:
+ docs.append({'_id': id, '_rev': rev, '_deleted': True})
+ logger.info('deleting %d messages of type %r', len(docs), doc_type)
+ return self.doc_model.db.updateDocuments(docs)
+
+ def gen_deleting_docs(doc_types):
+ for dt in doc_types:
+ yield self.doc_model.open_view('raindrop!messages!by',
+ 'by_doc_type',
+ key=dt,
+ ).addCallback(delete_docs, dt)
+ # and our work-queue docs - they aren't seen by the view, so
+ # just delete them by ID.
+ docs = []
+ for rid in ('workqueue!msg',):
+ yield self.doc_model.open_document_by_id(rid
+ ).addCallback(delete_a_doc, rid)
+
+ derived = set()
+ for _, to_info in self.forward_chain.iteritems():
+ if to_info is not None:
+ to_type, inst = to_info
+ derived.add(to_type)
+ # error records are always 'derived'
+ derived.add('core/error/msg')
+ logger.info("deleting documents with types %r", derived)
+ return self.coop.coiterate(gen_deleting_docs(derived))
+
+ def start(self):
+ return self.coop.coiterate(self.gen_wq_tasks())
+
+ def start_retry_errors(self):
+ """Attempt to re-process all messages for which our previous
+ attempt generated an error.
+ """
+ # Later we may need a 'state doc' just for errors??
+ # For now use this dict to save the state during processing but we
+ # don't persist it yet.
+ # Also note that although
+ def gen_work():
+ state_doc = {'raindrop_seq': 0}
+ while True:
+ self.num_errors_found = 0
+ # error docs are quite small - fetch 50 at a time...
+ logger.debug("starting error processing at %(raindrop_seq)r",
+ state_doc)
+ yield self.doc_model.open_view(
+ 'raindrop!proto!errors', 'errors',
+ startkey=state_doc['raindrop_seq'],
+ include_docs=True,
+ ).addCallback(self._cb_errorq_opened, state_doc)
+ if not self.num_errors_found:
+ break
+ return self.coop.coiterate(gen_work())
+
+ def _cb_errorq_opened(self, result, state_doc):
+ def gen_work():
+ for row in result['rows']:
+ self.num_errors_found += 1
+ err_doc = row['doc']
+ logger.debug("processing error document %(_id)r", err_doc)
+ # Open original source doc
+ source_infos = err_doc['raindrop_sources']
+ assert len(source_infos)==1, "only simple fwd chaining!"
+ source_id = source_infos[0][0]
+ yield self.doc_model.open_document_by_id(source_id
+ ).addCallback(self._cb_got_error_source, err_doc)
+ state_doc['raindrop_seq'] = row['key']
+
+ # XXX - see below - should we reuse self.coop, or is that unsafe?
+ coop = task.Cooperator()
+ return coop.coiterate(gen_work())
+
+ def _cb_got_error_source(self, result, err_doc):
+ # build the infos dict used by the sub-generator.
+ try:
+ _, doc_type, proto_id = self.doc_model.split_docid(err_doc['_id'])
+ except ValueError:
+ logger.warning("skipping malformed ID %(_id)r", err_doc)
+ return
+
+ infos = {proto_id: [result]}
+ # Although we only have 1 item to process, the queue is setup to
+ # handle many, so we need to use a generator
+ def gen_my_doc():
+ new_docs = []
+ for whateva in self.gen_work_tasks(infos, new_docs):
+ yield whateva
+ # should only be 1 new doc, and even on error there should be a
+ # new error doc.
+ assert len(new_docs)==1, new_docs
+ # either way, we are writing the new record replacing the one
+ # we have.
+ new_docs[0]['_rev'] = err_doc['_rev']
+ yield self.doc_model.create_ext_documents(new_docs)
+
+ # XXX - see below - should we reuse self.coop, or is that unsafe?
+ coop = task.Cooperator()
+ return coop.coiterate(gen_my_doc())
+
+ def gen_wq_tasks(self):
+ """generate deferreds which will determine where our processing is up
+ to and walk us through the _all_docs_by_seq view from that point,
+ creating and saving new docs as it goes. When the generator finishes
+ the queue is empty. """
+ # first open our 'state' document
+ def _cb_update_state_doc(result, d):
+ if result is not None:
+ assert d['_id'] == result['_id'], result
+ d.update(result)
+ # else no doc exists - the '_id' remains in the default though.
+
+ state_doc = {'_id': 'workqueue!msg',
+ 'doc_type': u"core/workqueue",
+ 'seq': 0}
+ yield self.doc_model.open_document_by_id(state_doc['_id'],
+ ).addCallback(_cb_update_state_doc, state_doc)
+
+ logger.info("Work queue starting with sequence ID %d",
+ state_doc['seq'])
+
+ logger.debug('opening by_seq view for work queue...')
+ num_per_batch = 15 # configurable????
+ # We process num_per_batch records at a time, and fetch all those
+ # documents in the same request; this seems a reasonable compromize
+ # between efficiency and handling large docs (no attachments come
+ # back...) Another alternative worth benchmarking; use a much larger
+ # limit without returning the documents, in the hope we can deduce
+ # more from the view, avoiding the fetch of many docs at all...
+ self.queue_finished = False
+ self.state_doc_dirty = False
+ while not self.queue_finished:
+ yield self.doc_model.db.listDocsBySeq(limit=num_per_batch,
+ startkey=state_doc['seq'],
+ include_docs=True,
+ ).addCallback(self._cb_by_seq_opened, state_doc)
+ if self.state_doc_dirty:
+ logger.debug("flushing state doc at end of run...")
+ yield self.doc_model.create_ext_documents([state_doc]
+ ).addCallback(self._cb_created_docs, state_doc
+ )
+ else:
+ logger.debug("no need to flush state doc")
+
+ def gen_work_tasks(self, doc_infos, new_docs):
+ # A generator which takes each doc in the list to its "next" type, but noting that
+ # as we have a number of docs ahead of us, one of them may satisfy
+ # us..
+ # ultimately we may need to generate lots of new docs from this one -
+ # but for now we have a single, simple chain moving forwards...
+
+ # Results aren't written - that is the caller's job - new docs are
+ # appended to the passed list.
+
+ # doc_infos is a dict keyed by proto_id. Each value is a list, in
+ # sequence order, of the document itself.
+ for proto_id, infos in doc_infos.iteritems():
+ for sq, doc in enumerate(infos):
+ doc_type = doc['type']
+ try:
+ xform_info = self.forward_chain[doc_type]
+ except KeyError:
+ logger.warning("Can't find transformer for message type %r - skipping %r",
+ doc_type, proto_id)
+ continue
+ if xform_info is None:
+ logger.debug("Document %r is at its terminal type of %r",
+ proto_id, doc_type)
+ continue
+
+ next_type, xformer = xform_info
+ # See if the next_type is in the rows ahead of us in by_seq
+ for check_doc in infos[sq+1:]:
+ if next_type == check_doc['type']:
+ logger.debug("cool - _by_seq lookahead tells us the doc is already %r",
+ next_type)
+ continue
+ # OK - need to create this new type.
+ logger.debug("calling %r to create a %s from %s", xformer,
+ next_type, doc['type'])
+ yield defer.maybeDeferred(xformer.convert, doc
+ ).addBoth(self._cb_converted_or_not,
+ next_type, proto_id, doc, new_docs)
+
+ def _cb_by_seq_opened(self, result, state_doc):
+ rows = result['rows']
+ logger.debug('work queue has %d items to check.', len(rows))
+ if not rows:
+ # no rows left. There is no guarantee our state doc will be
+ # the last one...
+ logger.info("work queue ran out of rows...")
+ # either way, we are done!
+ self.queue_finished = True
+ return
+ if len(rows)==1 and rows[0]['id'] == state_doc['_id']:
+ logger.info("Work queue got to the end (at sequence %(seq)s)",
+ state_doc)
+ self.queue_finished = True
+ return
+
+ # Build a map of what we can deduce from the entire result set (eg,
+ # a single msg may have a number of doc-types in the rows)
+ known = {}
+ for row in rows:
+ rid = row['id']
+ last_seq = row['key']
+ if not rid.startswith("msg!"):
+ continue
+ value = row['value']
+ if value.get('deleted'):
+ logger.debug("skipping deleted message %r", rid)
+ continue
+ try:
+ _, doc_type, proto_id = self.doc_model.split_docid(rid)
+ except ValueError:
+ logger.warning("skipping malformed message ID %r", rid)
+ continue
+ doc = row['doc']
+ real_type = doc.get('type')
+ if real_type != doc_type: # probably a core/error/msg
+ logger.info("message isn't of expected type (expected %r "
+ "but got %r) - skipping", doc_type, real_type)
+ continue
+ known.setdefault(proto_id, []).append(doc)
+
+ state_doc['seq'] = last_seq # only takes effect once saved...
+ logger.debug("Our %d rows gave info about %d messages",
+ len(rows), len(known))
+
+ def gen_my_work_tasks():
+ new_docs = []
+ for whateva in self.gen_work_tasks(known, new_docs):
+ yield whateva
+
+ # It isn't unusual to see zero docs to process, and writing our
+ # new state doc each time is going to hurt perf a little. The
+ # worst case is we die before flushing our state doc, but that's
+ # OK - we should be able to re-skip these messages quickly next
+ # time...
+ if new_docs:
+ # Now write the docs we created, plus our queue 'state' doc.
+ logger.info("work queue finished at sequence %d - %d new documents"
+ , state_doc['seq'], len(new_docs))
+ new_docs.append(state_doc)
+ self.state_doc_dirty = False # it is about to be written.
+ yield self.doc_model.create_ext_documents(new_docs
+ ).addCallback(self._cb_created_docs, state_doc
+ )
+ else:
+ # don't bother writing the state doc now, but record it is
+ # dirty so if we get to the end we *do* write it.
+ self.state_doc_dirty = True
+
+ # I *think* that if we share the same cooperator as the task itself,
+ # we risk having the next chunk of sequence IDs processed before we
+ # are done here.
+ # OTOH, I'm not sure about that.....
+ # As we are really only using a coiterator as a handy way of managing
+ # twisted loops, using our own should be ok though.
+ coop = task.Cooperator()
+ return coop.coiterate(gen_my_work_tasks())
+
+ def _cb_created_docs(self, new_revs, state_doc):
+ # XXX - note that the _ids in this result can't be trusted if there
+ # were attachments :( fortunately we don't care here...
+ # XXXXXXX - *sob* - we do care once we start hitting conflicts in
+ # messages ahead of us...
+ # XXX - this might not be necessary...
+ last_rev = new_revs[-1]
+ assert last_rev['id'] == state_doc['_id'], last_rev
+ state_doc['_rev'] = last_rev['rev']
+
+ def _cb_converted_or_not(self, result, dest_type, rootdocid, existing_doc,
+ new_docs):
+ # This is both a callBack and an errBack. If a converter fails to
+ # create a document, we can't just fail, or no later messages in the
+ # DB will ever get processed!
+ # So we write a "dummy" record - it has the same docid that the real
+ # document would have - but the 'type' attribute in the document
+ # reflects it is actually an error marker.
+ if isinstance(result, Failure):
+ logger.warn("Failed to convert a document: %s", result)
+ if self.options.stop_on_error:
+ logger.info("--stop-on-error specified - re-throwing error")
+ result.raiseException()
+ # and make a dummy doc.
+ new_doc = {'error_details': unicode(result)}
+ self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
+ # and patch the 'type' attribute to reflect its really an error.
+ new_doc['type'] = 'core/error/msg'
+ # In theory, the source ID of each doc that contributed is
+ # redundant as it could be deduced once we get backward chaining.
+ # However, we probably will always need to track the _rev of those
+ # docs so we can detect 'stale' errors (XXX - probably not - the
+ # 'chain' doesn't go any further on error)
+ # Also in theory, we don't need this in the non-error case, as
+ # our _by_seq processing will ensure the right thing happens.
+ # A list of sources as one day we will support that!
+ new_doc['raindrop_sources'] = [[existing_doc['_id'],
+ existing_doc['_rev']]
+ ]
+ else:
+ new_doc = result
+ self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
+ logger.debug("converter returned new document type %r for %r: %r",
+ dest_type, rootdocid, new_doc['_id'])
+ new_docs.append(new_doc)
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proc/base.py
@@ -0,0 +1,104 @@
+import logging
+
+__all__ = ['Rat', 'AccountBase']
+
+logger = logging.getLogger("accounts")
+
+class Rat(object):
+ '''
+ Account reasons rationale... this is here to make typing easier...
+ '''
+ #: all whats for this account
+ EVERYTHING = 'everything'
+ #: the problem is with the server (or the network)
+ SERVER = 'server'
+ #: the problem is with the account
+ ACCOUNT = 'account'
+
+ UNREACHABLE = 'unreachable'
+ PASSWORD = 'password'
+ MAINTENANCE = 'maintenance'
+ BUSY = 'busy'
+ #: something is up with the crypto; this needs to be exploded
+ CRYPTO = 'crypto'
+
+ #: good indicates that all-is-well
+ GOOD = 'good'
+ '''
+ Neutral indicates an expected transient lack of success (maintenance,
+ overloaded servers, etc.) It is tracked (rather than silently not updating
+ good) because it potentially allows for higher-level logic to escalate
+ continued inability to connect to something user-visible.
+
+ For example, twitter being down for short periods of time (at least in the
+ past) was business as usual; there would be no reason to notify the user.
+ Howerver, if twitter is down for an extended period of time, we want to let
+ the user know (in an ambient sort of way) that there's a problem with
+ twitter, and that's why they're not getting any messages.
+
+ The primary difference between a TEMPORARY BAD thing and a TEMPORARY NEUTRAL
+ thing is that we will let the user know about a TEMPORARY BAD thing
+ when it happens.
+ '''
+ NEUTRAL = 'neutral'
+ '''
+ Bad indicates an unexpected problem which may be TEMPORARY or PERMANENT.
+ Temporary problems are expressed to the user in an ambient fashion when
+ they happen, but may not require any action. If a temporary problem stays
+ a problem for an extended period of time, it will be escalated to a
+ more explicit notification. A permanent problem requires user action and
+ the user will be immediately notified.
+
+ For example, bad passwords and suspended accounts are permanent problems. The
+ former is actionable within the UI, whereas the latter is not. However, it
+ is important that the user be notified at the earliest opportunity so they
+ can take real-world action promptly. A server being inaccessible is a
+ TEMPORARY BAD problem rather than a TEMPORARY NEUTRAL thing because a user
+ may benefit from knowing their connection or server is flakey. (Note:
+ temporarily lacking an internet connection is different from a flakey one;
+ we don't want to bother the user if we know they don't have a connection.)
+ '''
+ BAD = 'bad'
+
+ #: temporary implies it may fix itself without user intervention
+ TEMPORARY = 'temporary'
+ #: permanent implies the user must take some action to correct the problem
+ PERMANENT = 'permanent'
+ #: unknown means it either doesn't matter or it could be temporary but the
+ #: user should potentially still be informed
+ UNKNOWN = 'unknown'
+
+
+class AccountBase(Rat):
+ def __init__(self, doc_model, details):
+ self.doc_model = doc_model
+ self.details = details
+
+ def reportStatus(self, what, state, why=Rat.UNKNOWN,
+ expectedDuration=Rat.UNKNOWN):
+ '''
+ Report status relating to this account.
+
+ Everything is peachy: EVERYTHING GOOD
+ Wrong password: ACCOUNT BAD PASSWORD PERMANENT
+ (may be temporary if a bad password can mean many things)
+ Can't contact server: SERVER BAD UNREACHABLE TEMPORARY
+ Server maintenance: SERVER NEUTRAL MAINTENANCE TEMPORARY
+ (indicates a temporary lapse in service but there's not much we can do)
+ Server busy: SERVER NEUTRAL BUSY TEMPORARY
+ (for example, last.fm will sometimes refuse submission requests)
+ '''
+ logger.debug("ReportStatus: %s %s (why=%s, duration=%s)",
+ what, state, why, expectedDuration)
+
+ def sync(self):
+ pass
+
+ def verify(self):
+ '''
+ '''
+ pass
+
+class ConverterBase(object):
+ def __init__(self, doc_model):
+ self.doc_model = doc_model
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proc/contact.py
@@ -0,0 +1,6 @@
+class ContactProcessor(object):
+ def resolveOrCreateContacts(self, kind, values):
+
+
+ def processContactIdentity(self, kind, value):
+ pass
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proc/message.py
@@ -0,0 +1,6 @@
+
+class MessageProcessor(object):
+ '''
+ Common message processing superclass; e-mail messages, twitter messages,
+ '''
+
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/__init__.py
@@ -0,0 +1,28 @@
+# this needs to become a 'plugin' mechanism...
+
+_protocol_infos = [
+ ('imap', 'raindrop.proto.imap', 'IMAPAccount'),
+ ('skype', 'raindrop.proto.skype', 'SkypeAccount'),
+ ('twitter', 'raindrop.proto.twitter', 'TwitterAccount'),
+]
+if __debug__:
+ _protocol_infos.append(('test', 'raindrop.proto.test', 'TestAccount'))
+
+protocols = {}
+def init_protocols():
+ import sys, logging
+ logger = logging.getLogger('raindrop.proto')
+ for name, mod, factname in _protocol_infos:
+ try:
+ logger.debug("attempting import of '%s' for '%s'", mod, factname)
+ __import__(mod)
+ mod = sys.modules[mod]
+ fact = getattr(mod, factname)
+ except ImportError, why:
+ logger.error("Failed to import '%s' factory: %s", name, why)
+ except:
+ logger.exception("Error creating '%s' factory", name)
+ else:
+ protocols[name] = fact
+
+__all__ = [protocols, init_protocols]
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/imap.py
@@ -0,0 +1,224 @@
+from twisted.internet import protocol, ssl, defer, error
+from twisted.mail import imap4
+import logging
+
+from ..proc import base
+from ..ext.message.rfc822 import doc_from_bytes
+
+brat = base.Rat
+
+logger = logging.getLogger(__name__)
+
+
+class ImapClient(imap4.IMAP4Client):
+ '''
+ Much of our logic here should perhaps be in another class that holds a
+ reference to the IMAP4Client instance subclass. Consider refactoring if we
+ don't turn out to benefit from the subclassing relationship.
+ '''
+ def serverGreeting(self, caps):
+ logger.debug("IMAP server greeting: capabilities are %s", caps)
+ return self._doAuthenticate(
+ ).addCallback(self._reqList
+ ).addCallback(self.deferred.callback)
+
+ def _doAuthenticate(self):
+ if self.account.details.get('crypto') == 'TLS':
+ d = self.startTLS(self.factory.ctx)
+ d.addErrback(self.accountStatus,
+ brat.SERVER, brat.BAD, brat.CRYPTO, brat.PERMANENT)
+ d.addCallback(self._doLogin)
+ else:
+ d = self._doLogin()
+ d.addErrback(self.accountStatus,
+ brat.SERVER, brat.BAD, brat.PASSWORD, brat.PERMANENT)
+ return d
+
+
+ def _doLogin(self, *args, **kwargs):
+ return self.login(self.account.details['username'],
+ self.account.details['password'])
+
+ def _reqList(self, *args, **kwargs):
+ self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
+ return self.list('', '*').addCallback(self._procList)
+
+ def _procList(self, result, *args, **kwargs):
+ return self.conductor.coop.coiterate(self.gen_folder_list(result))
+
+ def gen_folder_list(self, result):
+ for flags, delim, name in result:
+ logger.debug('Processing folder %s (flags=%s)', name, flags)
+ if r"\Noselect" in flags:
+ logger.debug("'%s' is unselectable - skipping", name)
+ continue
+
+ # XXX - sob - markh sees:
+ # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
+ # although we obviously have seen them already in the other folders.
+ if name and name.startswith('['):
+ logger.info("'%s' appears special -skipping", name)
+ continue
+
+ yield self.examine(name
+ ).addCallback(self._examineFolder, name
+ ).addErrback(self._cantExamineFolder, name)
+ logger.debug('imap processing finished.')
+
+ def _examineFolder(self, result, folder_path):
+ logger.debug('Looking for messages already fetched for folder %s', folder_path)
+ return self.doc_model.open_view('raindrop!proto!seen', 'imap',
+ startkey=[folder_path], endkey=[folder_path, {}]
+ ).addCallback(self._fetchAndProcess, folder_path)
+
+ def _fetchAndProcess(self, result, folder_path):
+ # XXX - we should look at the flags and update the message if it's not
+ # the same - later.
+ rows = result['rows']
+ logger.debug('_FetchAndProcess says we have seen %d items for %r',
+ len(rows), folder_path)
+ seen_uids = set(row['key'][1] for row in rows)
+ # now build a list of all message currently in the folder.
+ allMessages = imap4.MessageSet(1, None)
+ return self.fetchUID(allMessages, True).addCallback(
+ self._gotUIDs, folder_path, seen_uids)
+
+ def _gotUIDs(self, uidResults, name, seen_uids):
+ all_uids = set(int(result['UID']) for result in uidResults.values())
+ remaining = all_uids - seen_uids
+ logger.info("Folder %s has %d messages, %d new", name,
+ len(all_uids), len(remaining))
+ def gen_remaining(folder_name, reming):
+ for uid in reming:
+ # XXX - we need something to make this truly unique.
+ did = "%s#%d" % (folder_name, uid)
+ logger.debug("new imap message %r - fetching content", did)
+ # grr - we have to get the rfc822 body and the flags in separate requests.
+ to_fetch = imap4.MessageSet(uid)
+ yield self.fetchMessage(to_fetch, uid=True
+ ).addCallback(self._cb_got_body, uid, did, folder_name, to_fetch
+ )
+ return self.conductor.coop.coiterate(gen_remaining(name, remaining))
+
+ def _cb_got_body(self, result, uid, did, folder_name, to_fetch):
+ _, result = result.popitem()
+ content = result['RFC822']
+ # grr - get the flags
+ logger.debug("message %r has %d bytes; fetching flags", did, len(content))
+ return self.fetchFlags(to_fetch, uid=True
+ ).addCallback(self._cb_got_flags, uid, did, folder_name, content
+ ).addErrback(self._cantGetMessage
+ )
+
+ def _cb_got_flags(self, result, uid, did, folder_name, content):
+ logger.debug("flags are %s", result)
+ assert len(result)==1, result
+ _, result = result.popitem()
+ flags = result['FLAGS']
+ # put the 'raw' document object together and save it.
+ doc = dict(
+ storage_key=[folder_name, uid],
+ imap_flags=flags,
+ )
+ attachments = {'rfc822' : {'content_type': 'message',
+ 'data': content,
+ }
+ }
+ doc['_attachments'] = attachments
+ return self.doc_model.create_raw_documents(self.account,
+ [(did, doc, 'proto/imap')],
+ ).addCallback(self._cb_saved_message)
+
+ def _cantGetMessage(self, failure):
+ logger.error("Failed to fetch message: %s", failure)
+
+ def _cb_saved_message(self, result):
+ logger.debug("Saved message %s", result)
+
+ def _cantSaveDocument(self, failure):
+ logger.error("Failed to save message: %s", failure)
+
+ def _cantExamineFolder(self, failure, name, *args, **kw):
+ logger.warning("Failed to examine folder '%s': %s", name, failure)
+
+ def accountStatus(self, result, *args):
+ return self.account.reportStatus(*args)
+
+
+class ImapClientFactory(protocol.ClientFactory):
+ protocol = ImapClient
+
+ def __init__(self, account, conductor):
+ # base-class has no __init__
+ self.account = account
+ self.conductor = conductor
+ self.doc_model = account.doc_model # this is a little confused...
+
+ self.ctx = ssl.ClientContextFactory()
+ self.backoff = 8 # magic number
+
+ def buildProtocol(self, addr):
+ p = self.protocol(self.ctx)
+ p.factory = self
+ p.account = self.account
+ p.conductor = self.conductor
+ p.doc_model = self.account.doc_model
+ p.deferred = self.deferred # this isn't going to work in reconnect scenarios
+ return p
+
+ def connect(self):
+ details = self.account.details
+ logger.debug('attempting to connect to %s:%d (ssl: %s)',
+ details['host'], details['port'], details['ssl'])
+ reactor = self.conductor.reactor
+ self.deferred = defer.Deferred()
+ if details.get('ssl'):
+ reactor.connectSSL(details['host'], details['port'], self, self.ctx)
+ else:
+ reactor.connectTCP(details['host'], details['port'], self)
+ return self.deferred
+
+ def clientConnectionLost(self, connector, reason):
+ # the flaw in this is that we start from scratch every time; which is why
+ # most of the logic in the client class should really be pulled out into
+ # the account logic, probably. this class itself may have issues too...
+ # XXX - also note that a simple exception in other callbacks can trigger
+ # this - meaning we retry just to hit the same exception.
+ if reason.check(error.ConnectionDone):
+ # only an error if premature
+ if not self.deferred.called:
+ self.deferred.errback(reason)
+ else:
+ #self.deferred.errback(reason)
+ logger.debug('lost connection to server, going to reconnect in a bit')
+ self.conductor.reactor.callLater(2, self.connect)
+
+ def clientConnectionFailed(self, connector, reason):
+ self.account.reportStatus(brat.SERVER, brat.BAD, brat.UNREACHABLE,
+ brat.TEMPORARY)
+ logger.warning('Failed to connect, will retry after %d secs',
+ self.backoff)
+ # It occurs that some "account manager" should be reported of the error,
+ # and *it* asks us to retry later? eg, how do I ask 'ignore backoff -
+ # try again *now*"?
+ self.conductor.reactor.callLater(self.backoff, self.connect)
+ self.backoff = min(self.backoff * 2, 600) # magic number
+
+
+# A 'converter' - takes a proto/imap as input and creates a
+# 'raw/message/rfc822' as output
+class IMAPConverter(base.ConverterBase):
+ def convert(self, doc):
+ # I need the binary attachment.
+ return self.doc_model.open_attachment(doc['_id'], "rfc822",
+ ).addCallback(self._cb_got_attachment, doc)
+
+ def _cb_got_attachment(self, content, doc):
+ # the 'rfc822' module knows what to do...
+ return doc_from_bytes(content)
+
+
+class IMAPAccount(base.AccountBase):
+ def startSync(self, conductor):
+ self.factory = ImapClientFactory(self, conductor)
+ return self.factory.connect()
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/skype.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+'''
+Fetch skype contacts and chats.
+'''
+import logging
+
+import twisted.python.log
+from twisted.internet import defer, threads
+
+from ..proc import base
+
+import Skype4Py
+
+logger = logging.getLogger(__name__)
+
+# These are the raw properties we fetch from skype.
+CHAT_PROPS = [
+ ('ACTIVEMEMBERS', list),
+ ('ACTIVITY_TIMESTAMP', float),
+ ('ADDER', unicode),
+ ('APPLICANTS', list),
+ #'BLOB'??
+ ('BOOKMARKED', bool),
+ ('DESCRIPTION', unicode),
+ #'DIALOG_PARTNER',
+ ('FRIENDLYNAME', unicode),
+ ('GUIDELINES', unicode),
+ #'MEMBEROBJECTS',
+ ('MEMBERS', list),
+ ('MYROLE', unicode),
+ ('MYSTATUS', unicode),
+ ('OPTIONS', int),
+ ('PASSWORDHINT', unicode),
+ ('POSTERS', list),
+ ('STATUS', unicode),
+ ('TIMESTAMP', float),
+ ('TOPICXML', unicode),
+ ('TOPIC', unicode),
+ ('TYPE', unicode),
+]
+
+MSG_PROPS = [
+ ('BODY', unicode),
+ ('CHATNAME', unicode),
+ ('EDITED_BY', unicode),
+ ('EDITED_TIMESTAMP', float),
+ ('FROM_DISPNAME', unicode),
+ ('FROM_HANDLE', unicode),
+ ('IS_EDITABLE', bool),
+ ('LEAVEREASON', unicode),
+ ('STATUS', unicode),
+ ('TIMESTAMP', float),
+ ('TYPE', unicode),
+ ('USERS', list),
+]
+
+
+def simple_convert(str_val, typ):
+ if typ is list:
+ return str_val.split()
+ if typ is bool:
+ return str_val == "TRUE"
+ # all the rest are callables which 'do the right thing'
+ return typ(str_val)
+
+
+class TwistySkype(object):
+ def __init__(self, account, conductor):
+ self.account = account
+ self.doc_model = account.doc_model # this is a little confused...
+ self.conductor = conductor
+ self.skype = Skype4Py.Skype()
+
+ def get_docid_for_chat(self, chat):
+ return chat.Name.encode('utf8') # hrmph!
+
+ def get_docid_for_msg(self, msg):
+ return "%s-%d" % (self.account.details['username'], msg._Id)
+
+ def attach(self):
+ logger.info("attaching to skype...")
+ d = threads.deferToThread(self.skype.Attach)
+ d.addCallback(self.attached)
+ return d
+
+ def attached(self, status):
+ logger.info("attached to skype - getting chats")
+ return threads.deferToThread(self.skype._GetChats
+ ).addCallback(self._cb_got_chats
+ )
+
+ def _cb_got_chats(self, chats):
+ logger.debug("skype has %d chat(s) total", len(chats))
+ # fetch all the messages (future optimization; all we need are the
+ # IDs, but we suffer from creating an instance wrapper for each item.
+ # Sadly the skype lib doesn't offer a clean way of doing this.)
+ def gen_chats(chats):
+ for chat in chats:
+ yield threads.deferToThread(chat._GetMessages
+ ).addCallback(self._cb_got_messages, chat,
+ )
+ logger.info("skype has finished processing all chats")
+
+ return self.conductor.coop.coiterate(gen_chats(chats))
+
+ def _cb_got_messages(self, messages, chat):
+ logger.debug("chat '%s' has %d message(s) total; looking for new ones",
+ chat.Name, len(messages))
+
+ # Finally got all the messages for this chat. Execute a view to
+ # determine which we have seen (note that we obviously could just
+ # fetch the *entire* chats+msgs view once - but we do it this way on
+ # purpose to ensure we remain scalable...)
+ return self.doc_model.open_view('raindrop!proto!seen', 'skype',
+ startkey=[chat.Name],
+ endkey=[chat.Name, {}]
+ ).addCallback(self._cb_got_seen, chat, messages
+ )
+
+ def _cb_got_seen(self, result, chat, messages):
+ msgs_by_id = dict((m._Id, m) for m in messages)
+ chatname = chat.Name
+ # The view gives us a list of [chat_name, msg_id], where msg_id is
+ # None if we've seen the chat itself. Create a set of messages we
+ # *haven't* seen - including the [chat_name, None] entry if applic.
+ all_keys = [(chatname, None)]
+ all_keys.extend((chatname, mid) for mid in msgs_by_id.keys())
+ seen_chats = set([tuple(row['key']) for row in result['rows']])
+ add_bulk = [] # we bulk-update these at the end!
+ remaining = set(all_keys)-set(seen_chats)
+ # we could just process the empty list as normal, but the logging of
+ # an info when we do have items is worthwhile...
+ if not remaining:
+ logger.debug("Chat %r has no new items to process", chatname)
+ return None
+ # we have something to do...
+ logger.info("Chat %r has %d items to process", chatname,
+ len(remaining))
+ logger.debug("we've already seen %d items from this chat",
+ len(seen_chats))
+ return self.conductor.coop.coiterate(
+ self.gen_items(chat, remaining, msgs_by_id))
+
+ def gen_items(self, chat, todo, msgs_by_id):
+ tow = [] # documents to write.
+ for _, msgid in todo:
+ if msgid is None:
+ # we haven't seen the chat itself - do that.
+ logger.debug("Creating new skype chat %r", chat.Name)
+ # make a 'deferred list' to fetch each property one at a time.
+ ds = [threads.deferToThread(chat._Property, p)
+ for p, _ in CHAT_PROPS]
+ yield defer.DeferredList(ds
+ ).addCallback(self._cb_got_chat_props, chat, tow)
+ else:
+ msg = msgs_by_id[msgid]
+ # A new msg in this chat.
+ logger.debug("New skype message %d", msg._Id)
+ # make a 'deferred list' to fetch each property one at a time.
+ ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+ yield defer.DeferredList(ds
+ ).addCallback(self._cb_got_msg_props, chat, msg, tow)
+
+ if tow:
+ yield self.doc_model.create_raw_documents(self.account, tow)
+ logger.debug("finished processing chat %r", chat.Name)
+
+ def _cb_got_chat_props(self, results, chat, pending):
+ logger.debug("got chat %r properties: %s", chat.Name, results)
+ docid = self.get_docid_for_chat(chat)
+ doc ={}
+ for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
+ if ok:
+ doc['skype_'+name.lower()] = simple_convert(val, typ)
+
+ # 'Name' is a special case that doesn't come via a prop. We use
+ # 'chatname' as that is the equiv attr on the messages themselves.
+ doc['skype_chatname'] = chat.Name
+ pending.append((docid, doc, 'proto/skype-chat'))
+
+ def _cb_got_msg_props(self, results, chat, msg, pending):
+ logger.debug("got message properties for %s", msg._Id)
+ doc = {}
+ for (name, typ), (ok, val) in zip(MSG_PROPS, results):
+ if ok:
+ doc['skype_'+name.lower()] = simple_convert(val, typ)
+ doc['skype_id'] = msg._Id
+ # we include the skype username with the ID as they are unique per user.
+ docid = self.get_docid_for_msg(msg)
+ pending.append((docid, doc, 'proto/skype-msg'))
+
+
+# A 'converter' - takes a proto/skype-msg as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appopriate)
+class SkypeConverter(base.ConverterBase):
+ def convert(self, doc):
+ # We need to open the 'chat' for this Message. Clearly this is
+ # going to be inefficient...
+ proto_id = doc['skype_chatname'].encode('utf8') # hrmph!
+ return self.doc_model.open_document('msg', 'proto/skype-chat',
+ proto_id
+ ).addCallback(self.finish_convert, doc)
+
+ def finish_convert(self, chat_doc, doc):
+ if chat_doc is None:
+ subject = "<failed to fetch skype chat!>"
+ else:
+ subject = chat_doc['skype_friendlyname']
+ return {'from': ['skype', doc['skype_from_handle']],
+ 'subject': subject,
+ 'body': doc['skype_body'],
+ 'body_preview': doc['skype_body'][:128],
+ 'conversation_id': doc['skype_chatname'],
+ 'timestamp': doc['skype_timestamp'], # skype's works ok here?
+ }
+
+
+class SkypeAccount(base.AccountBase):
+ def startSync(self, conductor):
+ return TwistySkype(self, conductor).attach()
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/test/__init__.py
@@ -0,0 +1,106 @@
+# This is an implementation of a 'test' protocol.
+import logging
+from twisted.internet import defer, error
+from email.message import Message
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class TestMessageProvider(object):
+ def __init__(self, account, conductor):
+ self.account = account
+ self.doc_model = account.doc_model # this is a little confused...
+ self.conductor = conductor
+
+ def sync_generator(self):
+ num_docs = int(self.account.details.get('num_test_docs', 5))
+ logger.info("Creating %d test documents", num_docs)
+ for i in xrange(num_docs):
+ yield self.check_test_message(i)
+ if self.bulk_docs:
+ yield self.doc_model.create_raw_documents(self.account,
+ self.bulk_docs
+ ).addCallback(self.saved_bulk_messages, len(self.bulk_docs),
+ )
+
+ def attach(self):
+ logger.info("preparing to synch test messages...")
+ self.bulk_docs = [] # anything added here will be done in bulk
+ # use the cooperator for testing purposes.
+ return self.conductor.coop.coiterate(self.sync_generator())
+
+ def check_test_message(self, i):
+ logger.debug("seeing if message with ID %d exists", i)
+ return self.doc_model.open_document("msg", str(i), "proto/test"
+ ).addCallback(self.process_test_message, i)
+
+ def process_test_message(self, existing_doc, doc_num):
+ if existing_doc is None:
+ # make an attachment for testing purposes.
+ attachments = {"raw-attach" : {"content_type" : 'application/octet-stream',
+ "data" : 'test\0blob'
+ }
+ }
+ doc = dict(
+ storage_key=doc_num,
+ _attachments=attachments,
+ )
+ self.bulk_docs.append((str(doc_num), doc, 'proto/test'))
+ else:
+ logger.info("Skipping test message with ID %d - already exists",
+ doc_num)
+ # we are done.
+
+ def saved_bulk_messages(self, result, n):
+ logger.debug("Finished saving %d test messages in bulk", n)
+ # done
+
+# A 'converter' - takes a proto/test as input and creates a
+# 'raw/message/rfc822' as output.
+class TestConverter(base.ConverterBase):
+ num_converted = 0
+ def convert(self, doc):
+ # for the sake of testing the error queue, we cause an error on
+ # every 3rd message we process.
+ self.num_converted += 1
+ if self.num_converted % 3 == 0:
+ raise RuntimeError("This is a test failure")
+
+ # for the sake of testing, we fetch the raw attachment just to compare
+ # its value.
+ return self.doc_model.open_attachment(doc['_id'], "raw-attach",
+ ).addCallback(self._cb_got_attachment, doc)
+
+ def _cb_got_attachment(self, attach_content, doc):
+ if attach_content != 'test\0blob':
+ raise RuntimeError(attach_content)
+
+ me = doc['storage_key']
+ # Use the email package to construct a synthetic rfc822 stream.
+ msg = Message()
+ headers = {'from': 'From: from%(storage_key)d@test.com',
+ 'subject' : 'This is test document %(storage_key)d',
+ }
+ for h in headers:
+ msg[h] = headers[h] % doc
+
+ body = u"Hello, this is test message %(storage_key)d (with extended \xa9haracter!)" % doc
+ msg.set_payload(body.encode('utf-8'), 'utf-8')
+ #attachments = {"rfc822" : {"content_type" : 'message',
+ # "data" : msg.get_payload(),
+ # }
+ # }
+ new_doc = {}
+ new_doc['body'] = body
+ #new_doc['_attachments'] = attachments
+ new_doc['multipart'] = False
+ new_doc['headers'] = {}
+ for hn, hv in msg.items():
+ new_doc['headers'][hn.lower()] = hv
+ return new_doc
+
+
+class TestAccount(base.AccountBase):
+ def startSync(self, conductor):
+ return TestMessageProvider(self, conductor).attach()
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/twitter.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+'''
+Fetch twitter raw* objects
+'''
+
+# prevent 'import twitter' finding this module!
+from __future__ import absolute_import
+
+import logging
+import re
+import twisted.python.log
+from twisted.internet import defer, threads
+
+from ..proc import base
+
+# See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
+# about getting twisted support into the twitter package.
+# Sadly, the twisty-twitter package has issues of its own (like calling
+# delegates outside the deferred mechanism meaning you can't rely on callbacks
+# being completed when they say they are.)
+
+# So for now we are going with the blocking twitter package used via
+# deferToThread for all blocking operations...
+import twitter
+
+logger = logging.getLogger(__name__)
+
+
+class TwitterProcessor(object):
+ def __init__(self, account, conductor):
+ self.account = account
+ self.doc_model = account.doc_model # this is a little confused...
+ self.conductor = conductor
+ self.twit = None
+ self.seen_tweets = None
+
+ def attach(self):
+ logger.info("attaching to twitter...")
+ username = self.account.details['username']
+ pw = self.account.details['password']
+ return threads.deferToThread(twitter.Api,
+ username=username, password=pw
+ ).addCallback(self.attached
+ )
+
+ def attached(self, twit):
+ logger.info("attached to twitter - fetching friends")
+ self.twit = twit
+
+ # build the list of all users we will fetch tweets from.
+ return threads.deferToThread(self.twit.GetFriends
+ ).addCallback(self._cb_got_friends)
+
+ def _cb_got_friends(self, friends):
+ # Our 'seen' view is a reduce view, so we only get one result per
+ # friend we pass in. It is probably safe to assume we don't have
+ # too many friends that we can't get them all in one hit...
+ return self.doc_model.open_view('raindrop!proto!seen', 'twitter',
+ group=True
+ ).addCallback(self._cb_got_seen, friends)
+
+ def _cb_got_seen(self, result, friends):
+ # result -> [{'key': '12449', 'value': 1371372980}, ...]
+ # turn it into a map.
+ rows = result['rows']
+ last_seen_ids = dict((int(r['key']), int(r['value'])) for r in rows)
+ return self.conductor.coop.coiterate(
+ self.gen_friends_info(friends, last_seen_ids))
+
+ def gen_friends_info(self, friends, last_seen_ids):
+ logger.info("apparently I've %d friends", len(friends))
+ def do_friend(friend):
+ last_this = last_seen_ids.get(friend.id)
+ logger.debug("friend %r (%s) has latest tweet id of %s",
+ friend.screen_name, friend.id, last_this)
+ return threads.deferToThread(
+ self.twit.GetUserTimeline, friend.id, since_id=last_this
+ ).addCallback(self._cb_got_friend_timeline, friend
+ ).addErrback(self.err_friend_timeline, friend
+ )
+
+ # None means 'me' - but we don't really want to use 'None'. This is
+ # the only way I can work out how to get ourselves!
+ me = self.twit.GetUser(self.twit._username)
+ yield do_friend(me)
+ for f in friends:
+ yield do_friend(f)
+
+ logger.debug("Finished friends")
+
+ def err_friend_timeline(self, failure, friend):
+ logger.error("Failed to fetch timeline for '%s': %s",
+ friend.screen_name, failure)
+
+ def _cb_got_friend_timeline(self, timeline, friend):
+ return self.conductor.coop.coiterate(
+ self.gen_friend_timeline(timeline, friend))
+
+ def gen_friend_timeline(self, timeline, friend):
+ for tweet in timeline:
+ tid = tweet.id
+ # put the 'raw' document object together and save it.
+ logger.info("New tweet '%s...' (%s)", tweet.text[:25], tid)
+ # create the couch document for the tweet itself.
+ doc = {}
+ for name, val in tweet.AsDict().iteritems():
+ val = getattr(tweet, name)
+ # simple hacks - users just become the user ID.
+ if isinstance(val, twitter.User):
+ val = val.id
+ doc['twitter_'+name.lower()] = val
+ # not clear if the user ID is actually needed.
+ proto_id = "%s#%s" % (tweet.user.id, tid)
+ yield self.doc_model.create_raw_documents(
+ self.account, [(proto_id, doc, 'proto/twitter')])
+
+
+# A 'converter' - takes a proto/twitter as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appropriate)
+class TwitterConverter(base.ConverterBase):
+ re_tags = re.compile(r'#(\w+)')
+ def convert(self, doc):
+ # for now, if a 'proto' can detect tags, it writes them directly
+ # to a 'tags' attribute.
+ body = doc['twitter_text']
+ tags = self.re_tags.findall(body)
+ return {'from': ['twitter', doc['twitter_user']],
+ 'body': body,
+ 'body_preview': body[:128],
+ 'tags': tags,
+ # We don't have 'in seconds' seeing we came from AsDict() -
+ # but are 'seconds' usable for a timestamp?
+ #'timestamp': int(doc['twitter_created_at_in_seconds'])
+ }
+
+
+class TwitterAccount(base.AccountBase):
+ def startSync(self, conductor):
+ return TwitterProcessor(self, conductor).attach()
rename from server/python/junius/replicate.py
rename to server/python/raindrop/replicate.py
--- a/server/python/junius/replicate.py
+++ b/server/python/raindrop/replicate.py
@@ -1,20 +1,14 @@
#!/usr/bin/env python
-import os, os.path
-
-try:
- import simplejson as json
-except ImportError:
- import json # Python 2.6
-
-
-import os, os.path, httplib2
-from junius import model
+import os
+import json
+import httplib2
+from . import model
class Replicator(object):
def __init__(self):
self.local_host = model.get_local_host_info()
self.remote_host = model.get_remote_host_info()
self.http = httplib2.Http()
def create_remote_dbs(self):
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/sync.py
@@ -0,0 +1,104 @@
+import logging
+
+from twisted.internet import reactor, defer, task
+from twisted.python.failure import Failure
+import twisted.web.error
+import paisley
+
+from . import proto as proto
+
+logger = logging.getLogger(__name__)
+
+_conductor = None
+from .model import get_doc_model
+
+def get_conductor(options=None):
+ global _conductor
+ if _conductor is None:
+ proto.init_protocols()
+ _conductor = SyncConductor(options)
+ else:
+ assert options is None, 'can only set this at startup'
+ return _conductor
+
+
+# XXX - rename this to plain 'Conductor' and move to a different file.
+# This 'conducts' synchronization, the work queues and the interactions with
+# the extensions and database.
+class SyncConductor(object):
+ def __init__(self, options):
+ self.log = logger
+ self.options = options
+ # apparently it is now considered 'good form' to pass reactors around, so
+ # a future of multiple reactors is possible.
+ # We capture it here, and all the things we 'conduct' use this reactor
+ # (but later it should be passed to our ctor too)
+ self.reactor = reactor
+ self.coop = task.Cooperator()
+ reactor.addSystemEventTrigger("before", "shutdown", self._kill_coop)
+ self.doc_model = get_doc_model()
+
+ self.active_accounts = []
+
+ def _kill_coop(self):
+ logger.debug('stopping the coordinator')
+ self.coop.stop()
+ logger.debug('coordinator stopped')
+
+ def _ohNoes(self, failure, *args, **kwargs):
+ self.log.error('OH NOES! failure! %s', failure)
+
+ def _getAllAccounts(self):
+ return self.doc_model.open_view('raindrop!accounts!all', 'all'
+ ).addCallback(self._gotAllAccounts
+ ).addErrback(self._ohNoes)
+
+ def _gotAllAccounts(self, result):
+ # we don't use the cooperator here as we want them all to run in parallel.
+ return defer.DeferredList([d for d in self._genAccountSynchs(result['rows'])])
+
+ def _genAccountSynchs(self, rows):
+ self.log.info("Have %d accounts to synch", len(rows))
+ to_synch = []
+ for row in rows:
+ account_details = row['value']
+ kind = account_details['kind']
+ self.log.debug("Found account using protocol %s", kind)
+ if not self.options.protocols or kind in self.options.protocols:
+ if kind in proto.protocols:
+ account = proto.protocols[kind](self.doc_model, account_details)
+ self.log.info('Starting sync of %s account: %s',
+ kind, account_details.get('name', '(un-named)'))
+ self.active_accounts.append(account)
+ yield account.startSync(self
+ ).addBoth(self._cb_sync_finished, account)
+ else:
+ self.log.error("Don't know what to do with account kind: %s", kind)
+ else:
+ self.log.info("Skipping account - protocol '%s' is disabled", kind)
+
+ def sync(self, whateva=None):
+ return self._getAllAccounts()
+
+ def _cb_sync_finished(self, result, account):
+ if isinstance(result, Failure):
+ self.log.error("Account %s failed with an error: %s", account, result)
+ else:
+ self.log.debug("Account %s finished successfully", account)
+ assert account in self.active_accounts, (account, self.active_accounts)
+ self.active_accounts.remove(account)
+ if not self.active_accounts:
+ self.log.info("all accounts have finished synchronizing")
+
+
+if __name__ == '__main__':
+ # normal entry-point is the app itself; this is purely for debugging...
+ logging.basicConfig()
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ conductor = get_conductor()
+ conductor.reactor.callWhenRunning(conductor.sync)
+
+ logger.debug('starting reactor')
+ conductor.reactor.run()
+ logger.debug('reactor done')
new file mode 100644
--- /dev/null
+++ b/server/python/run-raindrop.py
@@ -0,0 +1,325 @@
+"""The raindrop server
+"""
+import sys
+import optparse
+import logging
+
+from twisted.internet import reactor, defer, task
+
+from raindrop import model
+from raindrop import bootstrap
+from raindrop import pipeline
+from raindrop.sync import get_conductor
+
+logger = logging.getLogger('raindrop')
+
+class HelpFormatter(optparse.IndentedHelpFormatter):
+ def format_description(self, description):
+ return description
+
+# decorators for our global functions:
+# so they can insist they complete before the next command executes
+def asynch_command(f):
+ f.asynch = True
+ return f
+
+# so they can consume the rest of the args
+def allargs_command(f):
+ f.allargs = True
+ return f
+
+
+# XXX - install_accounts should die too, but how to make a safe 'fingerprint'
+# so we can do it implicitly? We could create a hash which doesn't include
+# the password, but then the password changing wouldn't be enough to trigger
+# an update. Including even the *hash* of the password might risk leaking
+# info. So for now you must install manually.
+def install_accounts(result, parser, options):
+ """Install accounts in the database from the config file"""
+ return bootstrap.install_accounts(None)
+
+
+@asynch_command
+def sync_messages(result, parser, options):
+ """Synchronize all messages from all accounts"""
+ conductor = get_conductor()
+ return conductor.sync(None)
+
+@asynch_command
+def process(result, parser, options):
+ """Process all messages to see if any extensions need running"""
+ def done(result):
+ print "Message pipeline has finished..."
+ p = pipeline.Pipeline(model.get_doc_model(), options)
+ return p.start().addCallback(done)
+
+@asynch_command
+def retry_errors(result, parser, options):
+ """Reprocess all conversions which previously resulted in an error."""
+ def done_errors(result):
+ print "Error retry pipeline has finished - processing work queue..."
+ return result
+ p = pipeline.Pipeline(model.get_doc_model(), options)
+ return p.start_retry_errors(
+ ).addCallback(done_errors
+ ).addCallback(process, parser, options)
+
+@allargs_command
+def show_view(result, parser, options, args):
+ """Pretty-print the result of executing a view.
+
+ All arguments after this command are URLs to be executed as the view. If
+ the view name starts with '/', the URL will be used as-is. This is
+ suitable for builtin views - eg:
+
+ show-view /_all_docs?limit=5
+
+ will fetch the first 5 results from the URL:
+
+ http://[dbhost]/[dbname]/_all_docs?limit=5"
+
+ whereas
+
+ show-view my_design_doc/my_view?limit=5
+
+ will fetch the first 5 results from the URL
+
+ http://[dbhost]/[dbname]/_view/my_design_doc/my_view?limit=5
+
+ (XXX - todo - couch 0.9 changes the above URL - adjust this docstring
+ accordingly...)
+ """
+ from pprint import pprint
+ def print_view(result, view_name):
+ print "** view %r **" % view_name
+ pprint(result)
+
+ def gen_views():
+ for arg in args:
+ # don't use open_view as then we'd need to parse the query portion!
+ # grrr - just to get the dbname :()
+ from raindrop.config import get_config
+ dbinfo = get_config().couches['local']
+ dbname = dbinfo['name']
+ if arg.startswith("/"):
+ uri = "/%s/%s" % (dbname, arg)
+ else:
+ try:
+ doc, rest = arg.split("/")
+ except ValueError:
+ parser.error("View name must be in the form design_doc_name/view")
+ uri = "/%s/_view/%s/%s" % (dbname, doc, rest)
+ db = model.get_db()
+ yield db.get(uri
+ ).addCallback(db.parseResult
+ ).addCallback(print_view, arg)
+
+ coop = task.Cooperator()
+ return coop.coiterate(gen_views())
+
+
+def unprocess(result, parser, options):
+ """Delete all documents which can be re-generated by the 'process' command
+ """
+ def done(result):
+ print "unprocess has finished..."
+ # XXX - pipeline should probably be a singleton?
+ p = pipeline.Pipeline(model.get_doc_model(), options)
+ return p.unprocess().addCallback(done)
+
+def delete_docs(result, parser, options):
+ """Delete all documents of a particular type. Use with caution or see
+ the 'unprocess' command for an alternative.
+ """
+ # NOTE: This is for development only, until we get a way to say
+ # 'reprocess stuff you've already done' - in the meantime deleting those
+ # intermediate docs has the same result...
+ def _del_docs(to_del):
+ docs = []
+ for id, rev in to_del:
+ docs.append({'_id': id, '_rev': rev, '_deleted': True})
+ return model.get_db().updateDocuments(docs)
+
+ def _got_docs(result, dt):
+ to_del = [(row['id'], row['value']) for row in result['rows']]
+ logger.info("Deleting %d documents of type %r", len(to_del), dt)
+ return to_del
+
+ if not options.doctypes:
+ parser.error("You must specify one or more --doctype")
+ deferreds = []
+ for dt in options.doctypes:
+ d = model.get_doc_model().open_view('raindrop!messages!by',
+ 'by_doc_type', key=dt
+ ).addCallback(_got_docs, dt
+ ).addCallback(_del_docs
+ )
+ deferreds.append(d)
+ return defer.DeferredList(deferreds)
+
+
+def _setup_logging(options):
+ init_errors = []
+ logging.basicConfig()
+ for val in options.log_level: # a list of all --log-level options...
+ try:
+ name, level = val.split("=", 1)
+ except ValueError:
+ name = None
+ level = val
+ # convert a level name to a value.
+ try:
+ level = int(getattr(logging, level.upper()))
+ except (ValueError, AttributeError):
+ # not a level name from the logging module - maybe a literal.
+ try:
+ level = int(level)
+ except ValueError:
+ init_errors.append(
+ "Invalid log-level '%s' ignored" % (val,))
+ continue
+ l = logging.getLogger(name)
+ l.setLevel(level)
+
+ # write the errors after the logging is completely setup.
+ for e in init_errors:
+ logging.getLogger().error(e)
+
+
+def main():
+ # build the args we support.
+ all_args = {}
+ for n, v in globals().iteritems():
+ if callable(v) and getattr(v, '__doc__', None):
+ all_args[n.replace('_', '-')] = v
+
+ all_arg_names = sorted(all_args.keys())
+ description= __doc__ + "\nCommands\n help\n " + \
+ "\n ".join(all_args.keys()) + \
+ "\nUse '%prog help command-name' for help on a specific command.\n"
+
+ parser = optparse.OptionParser("%prog [options]",
+ description=description,
+ formatter=HelpFormatter())
+
+ parser.add_option("-l", "--log-level", action="append",
+ help="Specifies either an integer or a logging module "
+ "constant (such as INFO) to set all logs to the "
+ "specified level. Optionally can be in the format "
+ "of 'log.name=level' to only set the level of the "
+ "specified log.",
+ default=["INFO"])
+
+ parser.add_option("-p", "--protocol", action="append", dest='protocols',
+ help="Specifies the protocols to enable. If not "
+ "specified, all protocols are enabled")
+
+ parser.add_option("", "--doctype", action="append", dest='doctypes',
+ help="Specifies the document types to use for some "
+ "operations.")
+
+ parser.add_option("", "--force", action="store_true",
+ help="Forces some operations which would otherwise not "
+ "be done.")
+
+ parser.add_option("-s", "--stop-on-error", action="store_true",
+ help="Causes (some) operations which would normally "
+ "handle an error and continue to stop when an "
+ "error occurs.")
+
+ options, args = parser.parse_args()
+
+ _setup_logging(options)
+
+ # do this very early just to set the options
+ get_conductor(options)
+
+ if args and args[0]=='help':
+ if args[1:]:
+ which = args[1:]
+ else:
+ which = all_args.keys()
+ for this in which:
+ if this=='help': # ie, 'help help'
+ doc = "show help for the commands."
+ else:
+ try:
+ doc = all_args[this].__doc__
+ except KeyError:
+ print "No such command '%s':" % this
+ continue
+ print "Help for command '%s':" % this
+ print doc
+ print
+
+ sys.exit(0)
+
+ # create an initial deferred to perform tasks which must occur before we
+ # can start. The final callback added will fire up the real servers.
+ asynch_tasks = []
+ d = defer.Deferred()
+ def mutter(whateva):
+ print "Raindrops keep falling on my head..."
+ d.addCallback(mutter)
+
+ # Check DB exists and if not, install accounts.
+ def maybe_install_accounts(db_created):
+ if db_created:
+ return bootstrap.install_accounts(None)
+ d.addCallback(model.fab_db
+ ).addCallback(maybe_install_accounts
+ )
+ # Check if the files on the filesystem need updating.
+ d.addCallback(bootstrap.install_client_files, options)
+ d.addCallback(bootstrap.install_views, options)
+
+ # Now process the args specified.
+ for i, arg in enumerate(args):
+ try:
+ func = all_args[arg]
+ except KeyError:
+ parser.error("Invalid command: " + arg)
+
+ asynch = getattr(func, 'asynch', False)
+ if asynch:
+ asynch_tasks.append(func)
+ else:
+ consumes_args = getattr(func, 'allargs', False)
+ if consumes_args:
+ d.addCallback(func, parser, options, args[i+1:])
+ break
+ else:
+ d.addCallback(func, parser, options)
+
+ # and some final deferreds to control the process itself.
+ def done(whateva):
+ print "Apparently everything is finished - terminating."
+ reactor.stop()
+
+ def start(whateva):
+ if not asynch_tasks:
+ print "Nothing left to do - terminating."
+ reactor.stop()
+ return
+ print "Startup complete - running tasks..."
+ dl = defer.DeferredList([f(None, parser, options) for f in asynch_tasks])
+ dl.addCallback(done)
+ return dl
+
+ def error(*args, **kw):
+ from twisted.python import log
+ log.err(*args, **kw)
+ print "A startup task failed - not executing servers."
+ reactor.stop()
+
+ d.addCallbacks(start, error)
+
+ reactor.callWhenRunning(d.callback, None)
+
+ logger.debug('starting reactor')
+ reactor.run()
+ logger.debug('reactor done')
+
+
+if __name__=='__main__':
+ main()
--- a/server/python/setup.py
+++ b/server/python/setup.py
@@ -1,19 +1,19 @@
from setuptools import setup, find_packages
setup(
- name = "junius",
+ name = "raindrop",
version = "0.1a1",
packages = find_packages(),
install_requires = [
],
# PyPI meta
author = 'Andrew Sutherland',
author_email = 'asutherland@asutherland.org',
- description = 'junius',
+ description = 'raindrop',
license = 'MPL/GPL/LGPL tri-license',
keywords = 'mail',
url = 'www.mozillamessaging.com',
zip_safe = False,
)
new file mode 100644
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,1 @@
+# doot doot, just creating a branch