merge twisty to the trunk
authorMark Hammond <mhammond@skippinet.com.au>
Tue, 07 Apr 2009 15:27:16 +1000
changeset 166 194c2a8c0d184bb7590d12112cb74cbcad7cba07
parent 64 1b32b4c00eeb6094549cb9a601dbdf52eaf8df65 (current diff)
parent 165 d1fa17b9c0f07cece52748edddac414115551a0c (diff)
child 167 26248b871429b7477d98811bb9c1dfe4767d8fe9
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
merge twisty to the trunk
client/index.xhtml
server/python/junius/__init__.py
server/python/junius/bootstrap.py
server/python/junius/getmail.py
server/python/junius/getskype.py
server/python/junius/gettweets.py
server/python/junius/model.py
server/python/junius/replicate.py
server/python/raindrop/bootstrap.py
server/python/raindrop/model.py
--- a/client/cloda-completers.js
+++ b/client/cloda-completers.js
@@ -1,13 +1,13 @@
 var ContactCompleter = {
   type: "contact",
   complete: function(aAutocomplete, aText) {
     console.log("Contact completer firing on", aText);
-    Gloda.dbContacts.view("contact_ids/by_suffix", {
+    Gloda.dbContacts.view("raindrop!contacts!all/by_suffix", {
       startkey: aText,
       endkey: aText + "\u9999",
       include_docs: true,
       limit: 10,
       success: function(result) {
         var seen = {};
         var nodes = [];
         result.rows.forEach(function (row) {
@@ -26,27 +26,30 @@ var ContactCompleter = {
     });
   }
 };
 
 var TagCompleter = {
   type: "tag",
   complete: function(aAutocomplete, aText) {
     console.log("Tag completer firing on", aText);
-    Gloda.dbMessages.view("tags/all_tags", {
+    Gloda.dbMessages.view("raindrop!tags!all/all", {
+      startkey: aText,
+      endkey: aText + "\u9999",
       success: function(result) {
+        var nodes = [];
+        if (result.rows.length==0) {
+          console.log("Tag completer can't see any tags starting with", aText);
+          return;
+        }
         var tagNames = result.rows[0].value;
         console.log("Tag completer got tag names:", tagNames);
-        var nodes = [];
-        var textLength = aText.length;
         tagNames.forEach(function (tagName) {
-          if (tagName.substring(0, textLength) == aText) {
-            var node = $("<div/>")[0];
-            ElementXBL.prototype.addBinding.call(node, "autocomplete.xml#tag-completion");
-            node.setTagName(tagName);
-            nodes.push(node);
-          }
+          var node = $("<div/>")[0];
+          ElementXBL.prototype.addBinding.call(node, "autocomplete.xml#tag-completion");
+          node.setTagName(tagName);
+          nodes.push(node);
         });
         aAutocomplete.haveSomeResults(aText, nodes, TagCompleter, 100);
       }
     });
   }
 };
--- a/client/cloda.js
+++ b/client/cloda.js
@@ -33,40 +33,39 @@ var GlodaConversationProto = {
       if (!message._parent) {
         topnodes.push(message);
       }
     }
     topnodes.sort(function (a,b) { return a.timestamp - b.timestamp; } );
     return topnodes;
   },
   get subject() {
-    return this.messages[0].subject();
+    return this.messages[0]['subject'];
   },
 };
 
 var GlodaMessageProto = {
-  subject: function() {
-    return this.headers["Subject"];
-  },
   _bodyTextHelper: function(aPart) {
     var result = "";
     if (aPart.parts) {
       return aPart.parts.map(this._bodyTextHelper, this).join("");
     }
     else if (aPart.contentType == "text/plain")
       return aPart.data;
     else
       return "";
   },
   // something scary happens when these are getters in terms of putting things back
   bodyText: function() {
-    return this._bodyTextHelper(this.bodyPart);
+    return this['body']
+    // return this._bodyTextHelper(this.bodyPart);
   },
   bodySnippet: function() {
-    return this.bodyText().substring(0, 128);
+    return this['body_preview'].substring(0, 128);
+    // return this.bodyText().substring(0, 128);
   },
   _rawSetDefault: function(aKey, aDefaultValue) {
     var raw = this.__proto__;
     console.log("this", this, "raw", raw);
     if (aKey in raw) {
       var val = raw[aKey];
       if (val != null)
         return val;
@@ -117,16 +116,17 @@ GlodaConvQuery.prototype = {
 
     this.constraintsPending = aConstraints.length;
     this.constraints.forEach(this.dispatchConstraint, this);
   },
   dispatchConstraint: function(aConstraint) {
     var viewName = aConstraint.view;
     delete aConstraint.view;
     aConstraint["success"] = this.wrappedProcessResults;
+    console.log("executing view", viewName);
     Gloda.dbMessages.view(viewName, aConstraint);
   },
   /**
    * Result handling function for constraints issued by queryForConversations.
    *  Each result set has rows whose values are conversation ids.  Once all
    *  constraints have return their results, we load the conversations
    *  by a call to getConversations.
    */
@@ -159,18 +159,33 @@ GlodaConvQuery.prototype = {
    * Retrieve conversations by id, also loading any involved contacts and
    *  reflecting them onto the messages themselves.
    */
   getConversations: function(aConversationIds, aCallback, aCallbackThis) {
     this.callback = aCallback;
     this.callbackThis = aCallbackThis;
 
     var dis = this;
-    Gloda.dbMessages.view("by_conversation/by_conversation", {
-      keys: aConversationIds, include_docs: true,
+    Gloda.dbMessages.view("raindrop!messages!by/by_conversation", {
+      keys: aConversationIds,
+      success: function(result) {
+        dis.processConversationIDsFetch(result);
+      }
+    });
+  },
+  processConversationIDsFetch: function(result) {
+    // we receive the list of conversations with the value being the ID of
+    // the document holding the message itself.  Get all listed messages.
+    var msg_ids = [];
+    var rows = result.rows, iRow, row, contact_id;
+    for (iRow = 0; iRow < rows.length; iRow++) {
+      msg_ids.push(rows[iRow].value);
+    }
+    var dis = this;
+    Gloda.dbMessages.allDocs({ keys: msg_ids, include_docs: true,
       success: function(result) {
         dis.processConversationFetch(result);
       }
     });
   },
   processConversationFetch: function(result) {
     // we receive the list of fetched messages.  we need to group them by
     //  conversation (this should be trivially easy because they should come
@@ -191,22 +206,24 @@ GlodaConvQuery.prototype = {
           oldest: message.timestamp, newest: message.timestamp,
           involves_contact_ids: {}, raw_messages: []
         };
       conversation.raw_messages.push(message);
       if (conversation.oldest > message.timestamp)
         conversation.oldest = message.timestamp;
       if (conversation.newest < message.timestamp)
         conversation.newest = message.timestamp;
+      /**********
       for (var iContactId = 0; iContactId < message.involves_contact_ids.length;
            iContactId++) {
         contact_id = message.involves_contact_ids[iContactId];
         conversation.involves_contact_ids[contact_id] = true;
         seenContactIds[contact_id] = true;
       }
+      ******/
     }
 
     console.log("seenContactIds", seenContactIds);
     var contact_ids = [];
     for (contact_id in seenContactIds)
       contact_ids.push(contact_id);
 
     console.log("contact lookup list:", contact_ids);
@@ -250,37 +267,37 @@ GlodaConvQuery.prototype = {
       convList.push(conversation);
 
       wrapped_messages = [];
       for (var iMsg = 0; iMsg < conversation.raw_messages.length; iMsg++) {
         var raw_message = conversation.raw_messages[iMsg];
         raw_message.__proto__ = GlodaMessageProto;
         var message = {__proto__: raw_message};
         message.from = contacts[message.from_contact_id];
-        message.to = mapContactList(message.to_contact_ids);
-        message.cc = mapContactList(message.cc_contact_ids);
-        message.involves = mapContactList(message.involves_contact_ids);
+        message.to = "fix me!!" // mapContactList(message.to_contact_ids);
+        message.cc = "fix me too!!" // mapContactList(message.cc_contact_ids);
+        message.involves = "fix me 3!!!" // mapContactList(message.involves_contact_ids);
         
         wrapped_messages.push(message);
       }
       conversation.messages = wrapped_messages;
       conversation.messages.sort(function (a,b) {return a.timestamp - b.timestamp;});
     }
 
     convList.sort(function (a, b) { return b.newest - a.newest; });
     console.log("callback with conv list", convList);
     this.callback.call(this.callbackThis, convList);
   }
 };
 
 var MAX_TIMESTAMP = 4000000000;
 
 var Gloda = {
-  dbContacts: $.couch.db("contacts"),
-  dbMessages: $.couch.db("messages"),
+  dbContacts: $.couch.db("raindrop"),
+  dbMessages: $.couch.db("raindrop"),
 
   _init: function () {
 
   },
 
   queryByStuff: function(aInvolvedContactIds, aTagNames, aDiscussionIds, aCallback, aCallbackThis) {
     // -- for each involved person, get the set of conversations they're in
     var constraints = [];
@@ -290,18 +307,18 @@ var Gloda = {
           view: "by_involves/by_involves",
           startkey: [contact._id, 0], endkey: [contact._id, MAX_TIMESTAMP]
         };
       }, this));
     }
     if (aTagNames && aTagNames.length) {
       constraints = constraints.concat(aTagNames.map(function (tagName) {
         return {
-          view: "by_tags/by_tags",
-          startkey: [tagName, 0], endkey: [tagName, MAX_TIMESTAMP]
+          view: "raindrop!conversations!by/by_tags",
+          key: tagName,
         };
       }, this));
     }
     if (aDiscussionIds && aDiscussionIds.length) {
       constraints = constraints.concat(aDiscussionIds.map(function (discussion) {
         return {
           view: "by_mailing_list/by_list_id",
           startkey: [discussion.id, 0], endkey: [discussion.id, MAX_TIMESTAMP]
--- a/client/index.xhtml
+++ b/client/index.xhtml
@@ -61,18 +61,18 @@
       ru.addStaticMailbox("Trash",":trash");
       ru.addStaticMailbox("Junk",":junk");
       
       ru.addDynamicMailbox("People","people-by-frecency");
       ru.addDynamicMailbox("Search","searches");
       ru.addDynamicMailbox("Discussion","discussion");
       
       
-      var dbMessages = $.couch.db("messages");
-      dbMessages.view("by_list_id/by_mailing_list", {
+      var dbMessages = $.couch.db("raindrop");
+      dbMessages.view("raindrop!mailing_lists!all/by_list_id", {
         include_docs: false,
         group : true,
         success: function(json) {
           var parent = $("#discussion");
           json.rows.forEach(function(row) {
             var li = $(document.createElement("li")).addClass("discussion-list");
             parent.append(li
                           .append($(document.createElement("a"))
--- a/client/jquery.couch.js
+++ b/client/jquery.couch.js
@@ -7,17 +7,35 @@
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 // License for the specific language governing permissions and limitations under
 // the License.
 
 (function($) {
   $.couch = $.couch || {};
-  $.fn.extend($.couch, {
+  $.extend($.couch, {
+
+    activeTasks: function(options) {
+      options = options || {};
+      $.ajax({
+        type: "GET", url: "/_active_tasks", dataType: "json",
+        complete: function(req) {
+          var resp = $.httpData(req, "json");
+          if (req.status == 200) {
+            if (options.success) options.success(resp);
+          } else  if (options.error) {
+            options.error(req.status, resp.error, resp.reason);
+          } else {
+            alert("Active task status could not be retrieved: " +
+              resp.reason);
+          }
+        }
+      });
+    },
 
     allDbs: function(options) {
       options = options || {};
       $.ajax({
         type: "GET", url: "/_all_dbs",
         complete: function(req) {
           var resp = $.httpData(req, "json");
           if (req.status == 200) {
@@ -28,27 +46,42 @@
             alert("An error occurred retrieving the list of all databases: " +
               resp.reason);
           }
         }
       });
     },
 
     config: function(options, section, option, value) {
+      options = options || {};
+      var url = "/_config/";
+      if (section) {
+        url += encodeURIComponent(section) + "/";
+        if (option) {
+          url += encodeURIComponent(option);
+        }
+      }
+      if (value === undefined) {
+        var method = "GET";
+      } else {
+        var method = "PUT";
+        var data = toJSON(value);
+      }
       $.ajax({
-        type: "GET", url: "/_config/",
+        type: method, url: url, contentType: "application/json",
+        dataType: "json", data: toJSON(value), processData: false,
         complete: function(req) {
           var resp = $.httpData(req, "json");
           if (req.status == 200) {
             if (options.success) options.success(resp);
           } else if (options.error) {
             options.error(req.status, resp.error, resp.reason);
           } else {
-            alert("An error occurred retrieving the server configuration: " +
-              resp.reason);
+            alert("An error occurred retrieving/updating the server " +
+              "configuration: " + resp.reason);
           }
         }
       });
     },
 
     db: function(name) {
       return {
         name: name,
@@ -71,17 +104,17 @@
               }
             }
           });
         },
         create: function(options) {
           options = options || {};
           $.ajax({
             type: "PUT", url: this.uri, contentType: "application/json",
-            dataType: "json", data: "", processData: false, 
+            dataType: "json", data: "", processData: false,
             complete: function(req) {
               var resp = $.httpData(req, "json");
               if (req.status == 201) {
                 if (options.success) options.success(resp);
               } else if (options.error) {
                 options.error(req.status, resp.error, resp.reason);
               } else {
                 alert("The database could not be created: " + resp.reason);
@@ -154,33 +187,66 @@
                 options.error(req.status, resp.error, resp.reason);
               } else {
                 alert("An error occurred retrieving a list of all documents: " +
                   resp.reason);
               }
             }
           });
         },
-        openDoc: function(docId, options) {
+        allDesignDocs: function(options) {
+          options = options || {};
+          this.allDocs($.extend({startkey:"_design", endkey:"_design0"}, options));
+        },
+        allApps: function(options) {
           options = options || {};
-          $.ajax({
+          var self = this;
+          if (options.eachApp) {
+            this.allDesignDocs({
+              success: function(resp) {
+                $.each(resp.rows, function() {
+                  self.openDoc(this.id, {
+                    success: function(ddoc) {
+                      var index, appPath, appName = ddoc._id.split('/');
+                      appName.shift();
+                      appName = appName.join('/');
+                      index = ddoc.couchapp && ddoc.couchapp.index;
+                      if (index) {
+                        appPath = ['', name, ddoc._id, index].join('/');
+                      } else if (ddoc._attachments && ddoc._attachments["index.html"]) {
+                        appPath = ['', name, ddoc._id, "index.html"].join('/');
+                      }
+                      if (appPath) options.eachApp(appName, appPath, ddoc);
+                    }
+                  });
+                });
+              }
+            });            
+          } else {
+            alert("please provide an eachApp function for allApps()");
+          }
+        },
+        openDoc: function(docId, options, ajaxOptions) {
+          options = options || {};
+          ajaxOptions = ajaxOptions || {};
+          $.ajax($.extend({
             type: "GET",
             url: this.uri + encodeURIComponent(docId) + encodeOptions(options),
             dataType: "json",
             complete: function(req) {
               var resp = $.httpData(req, "json");
               if (req.status == 200) {
                 if (options.success) options.success(resp);
               } else if (options.error) {
                 options.error(req.status, resp.error, resp.reason);
               } else {
                 alert("The document could not be retrieved: " + resp.reason);
               }
             }
-          });
+          }, ajaxOptions));
         },
         saveDoc: function(doc, options) {
           options = options || {};
           if (doc._id === undefined) {
             var method = "POST";
             var uri = this.uri;
           } else {
             var method = "PUT";
@@ -230,63 +296,63 @@
           }
           var body = {language: language, map: mapFun};
           if (reduceFun != null) {
             if (typeof(reduceFun) != "string")
               reduceFun = reduceFun.toSource ? reduceFun.toSource() : "(" + reduceFun.toString() + ")";
             body.reduce = reduceFun;
           }
           $.ajax({
-            type: "POST", url: this.uri + "_slow_view" + encodeOptions(options),
+            type: "POST", url: this.uri + "_temp_view" + encodeOptions(options),
             contentType: "application/json",
             data: toJSON(body), dataType: "json",
             complete: function(req) {
               var resp = $.httpData(req, "json");
               if (req.status == 200) {
                 if (options.success) options.success(resp);
               } else if (options.error) {
                 options.error(req.status, resp.error, resp.reason);
               } else {
                 alert("An error occurred querying the database: " + resp.reason);
               }
             }
           });
         },
         view: function(name, options) {
           options = options || {};
+          name = name.split('/');
           if (options.keys) {
             $.ajax({
-              type: "POST",
-              url: this.uri + "_view/" + name + encodeOptions(options),
+              type: "POST", url: this.uri + "_design/" + name[0] + "/_view/" + name[1] + encodeOptions(options),
               contentType: "application/json",
               data: toJSON({keys: options.keys}), dataType: "json",
               complete: function(req) {
                 var resp = $.httpData(req, "json");
                 if (req.status == 200) {
                   if (options.success) options.success(resp);
                 } else if (options.error) {
                   options.error(req.status, resp.error, resp.reason);
                 } else {
-                  alert("An error occurred accessing the view: " + resp.reason);
+                  alert("An error occurred accessing the view '" + name + "': " + resp.reason);
                 }
               }
             });
             return;
           }
           $.ajax({
-            type: "GET", url: this.uri + "_view/" + name + encodeOptions(options),
+            type: "GET", url: this.uri + "_design/" + name[0] + "/_view/" + name[1] + encodeOptions(options),
             dataType: "json",
             complete: function(req) {
               var resp = $.httpData(req, "json");
               if (req.status == 200) {
                 if (options.success) options.success(resp);
               } else if (options.error) {
                 options.error(req.status, resp.error, resp.reason);
               } else {
-                alert("An error occurred accessing the view: " + resp.reason);
+                alert("An error occurred accessing the view '" + name + "': " + resp.reason);
               }
             }
           });
         }
       };
     },
 
     info: function(options) {
--- a/client/messages.xml
+++ b/client/messages.xml
@@ -124,19 +124,20 @@
     <xbl:implementation><![CDATA[
       ({
         conversation: null,
         setConversation: function(aConversation) {
           this.conversation = aConversation;
           this.shadowTree.getElementById("oldestMessageDate").textContent =
             makeTimestampFriendly(this.conversation.oldest);
           var firstMessage = this.conversation.messages[0];
-          this.shadowTree.getElementById("subject").textContent = firstMessage.subject();
-          this.shadowTree.getElementById("snippet").textContent = firstMessage.bodySnippet();
-          this.shadowTree.getElementById("author").textContent = firstMessage.from.name;
+          console.log("fetching info from first message", firstMessage);
+          this.shadowTree.getElementById("subject").textContent = firstMessage['subject'];
+          this.shadowTree.getElementById("snippet").textContent = firstMessage['body'];
+          //this.shadowTree.getElementById("author").textContent = firstMessage.from.name;
           this.shadowTree.getElementById("date").textContent =
             makeTimestampFriendly(firstMessage.timestamp);
             
           var tagsNode = this.shadowTree.getElementById("tags");
           ElementXBL.prototype.addBinding.call(tagsNode, "tags.xml#tags");
           tagsNode.setMessage(firstMessage);
 
           var replyNodes = this.shadowTree.getElementById("replies");
@@ -179,20 +180,20 @@
 :bound-element > .author > .date:before { content: "\2014 "; }
 :bound-element > .body { padding-left: 1em; color: #666; white-space: nowrap; overflow: hidden;  }
       ]]></xbl:style>
     </xbl:resources>
     <xbl:implementation><![CDATA[
       ({
         setMessage: function(aMessage) {
           this.message = aMessage;
-          this.shadowTree.getElementById("from").textContent = aMessage.from.name;
+          //this.shadowTree.getElementById("from").textContent = aMessage.from.name;
           this.shadowTree.getElementById("date").textContent =
             makeTimestampFriendly(aMessage.timestamp);
-          this.shadowTree.getElementById("snippet").textContent = aMessage.bodySnippet();
+          this.shadowTree.getElementById("snippet").textContent = aMessage['body_preview'];
           
           var tagsNode = this.shadowTree.getElementById("tags");
           ElementXBL.prototype.addBinding.call(tagsNode, "tags.xml#tags");
           tagsNode.setMessage(aMessage);
         },
       })
     ]]></xbl:implementation>
   </xbl:binding>
@@ -203,18 +204,18 @@
         <span id="subject"></span><br/>
         <pre id="body"></pre>
       </div>
     </xbl:template>
     <xbl:implementation><![CDATA[
       ({
         setMessage: function(aMessage) {
           this._message = aMessage;
-          this.shadowTree.getElementById("subject").textContent = this._message.subject();
-          this.shadowTree.getElementById("body").textContent = this._message.bodyPart.data;
+          this.shadowTree.getElementById("subject").textContent = this._message['subject'];
+          this.shadowTree.getElementById("body").textContent = this._message['body_preview'];
         },
       })
     ]]></xbl:implementation>
     <xbl:handlers>
       <xbl:handler event="click"><![CDATA[
       
       ]]></xbl:handler>
     </xbl:handlers>
--- a/docs/INSTALL
+++ b/docs/INSTALL
@@ -1,41 +1,148 @@
-Installation Steps
-
-Have Python 2.5 or later installed.
+Late Breaking News/Known Problems
+=================================
 
- If not 2.6: 
-    install setup_tools
-    easy_install simplejson
+* Error handling is poor.  I'm trying to get a handle on the best way to
+  manage errors in a twisted environment before adding too much handling
+  which may end up inappropriate.
+
+Installation Steps
+==================
+
+Have Python 2.6 or later installed.
 
  Prereqs:
     easy_install couchdb (0.5 or later)
 
-Install couchdb trunk
+Install couchdb trunk.  This currently means you need to google for
+instructions that work only on Linux - Windows doesn't work.  If you want to
+work on Windows you must install Couch on a Linux box and point your windows
+~/.raindrop file at the server.
 
-Pull asuth's version of gocept-imapapi:
-  hg clone http://clicky.visophyte.org/cgi-bin/hgwebdir.cgi/asuth/gocept-imapapi/
-  cd gocept-imapapi
-  python setup.py develop
+Install twisted.
 
-install junius:
+Install 'paisley' from launchpad.
+
+Either install junius:
   cd junius/server/python/junius
   python setup.py develop
+*or* just add it to your PYTHONPATH
+  set PYTHONPATH=c:\src\path\to\junius\server\python
 
-Configure IMAP account w/ some data in it, and edit bootstrap.py to point to it.
+install modules required for 'protocols' - note that these are technically
+optional - if the modules aren't installed a warning will be generated and
+the protocol will simply be disabled
+
+* imap - PyOpenSSL (link?)
+* skype - Skype4Py (link?)
+* twitter - python-twitter (link?)
+
+configure raindrop:
+  * edit ~/.raindrop
+
+  * if your 'local' couchdb isn't on localhost, add a section along the lines of:
+        [couch-local]
+        host=hostname
+        port=port
 
-With Couchdb running:
+  * Add imap accounts along the lines of
+        [account-some-account-name] # the 'account-' prefix is important!
+        kind=imap
+        host=imap.gmail.com
+        port=993
+        username=username@gmail.com
+        password=topsecret
+        ssl=True
+
+  * Add a 'test' account - for the 'test suite' - along the lines of:
+        [account-test]
+        kind=test
+        username=test # not used, but we die without it!
+        num_test_docs=1 # this defaults to 5 - but 1 is useful sometimes!
+
+** NOTE: if you edit this file, you must run 'run-raindrop.py install-accounts'
+to get the new data into the raindrop database (the accounts are automatically
+updated if the database is created, but we don't detect when the accounts
+change)
+
+
+With the couchdb server running....
 
-Setup the tables:
-  python bootstrap.py
+Setup the database and upload views and other content:
+
+  % run-raindrop.py
+
+you should see output similar to:
+
+  INFO:model:created new database
+  INFO:junius.bootstrap:Adding account 'test'
+  INFO:junius.bootstrap:client files are different - updating doc
+  INFO:junius.bootstrap:design doc '_design/raindrop!accounts!all' has changed - updating
+  ...
+  Nothing left to do - terminating.  
+
+
+Go to http://127.0.0.1:5984/_utils/index.html to check that the table(s) are 
+setup
+
+Test everything using the 'test suite' (one day it *will* be a test suite :)
+
+  % run-raindrop.py -p test sync-messages process
 
-Go to 
-to check that the tables are setup
+  THEN repeat the above - sorry about that - the first run didn't quite
+  do everything it was supposed to.
+
+  % run-raindrop.py -p test sync-messages process
+
+  Note the '-p' says to only load the 'test' protocol; without it we
+  will attempt to load all protocols (eg, imap, skype, twitter, etc).
+  But further note that we don't attempt a protocol - even the 'test' 
+  protocol - until we have an account of that 'type' set up.
 
-Load the mail:
-  python getmail.py (loads IMAP data)
+  Add '-l debug' to the above command-line for debug log messages.  This
+  can get quite verbose; you can also say -l log.name=debug to set a specific
+  log to debug.
+
+  See --help for more.
+
+
+Get *real* messages:
+
+  % run-raindrop.py sync-messages
+  % run-raindrop.py process
 
 (reload http://127.0.0.1:5984/_utils/index.html to see stuff in the messages view)
 
-Go to http://127.0.0.1:5984/junius/files/index.xhtml
+Go to http://127.0.0.1:5984/junius/files/index.xhtml and do autocomplete to do
+searches.  If you have many documents, please be patient as the views are
+generated for the first time.
+
+You can edit any of the files in the 'schema' or 'client' directories, and
+when run-raindrop is next started, the changes to these files will be detected
+and they will be sent to the database.  We don't detect the changes while we
+are running though, only at startup.
+
+Unprocessing:
+
+To delete all the intermediate messages in the DB, execute:
+
+  % run-raindrop.py unprocess
 
-and do autocomplete to do searches.
+The next time you execute a 'process' command, all your messages will be
+processed from the start.
+
+
+Error handling:
 
+If there is an error during a 'process' operation (ie, an exception in the
+converter), we will write an 'error document' and continue processing the
+work queue.  We don't make a very loud noise at the end when this happens -
+you need to notice it in the log output as it is running.  At any time you
+can execute:
+
+  % run-raindrop.py retry-errors
+
+To re-process those messages - but this will generally result in exactly the
+same error unless the converter has been fixed to deal with the condition
+causing the error.  Alternatively, execute 'unprocess' to force reprocessing
+of all messages, not only those which previously caused an error.
+
new file mode 100644
--- /dev/null
+++ b/schema/accounts/all/all-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+  if (doc.type == "account") {
+    emit(null, doc);
+  }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/contacts/all/by_identity-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+  if (doc.type == "contact") {
+    for each (var identity in doc.identities) {
+      emit([identity.kind, identity.value], doc);
+    }
+  }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/contacts/all/by_suffix-map.js
@@ -0,0 +1,16 @@
+function(doc) {
+  if (doc.type == "contact") {
+    var i, suffix;
+    for (i = 0; i < doc.name.length; i++) {
+      suffix = doc.name.substring(i);
+      if (suffix && suffix[0] != " ")
+        emit(suffix, null);
+    }
+    for each (var identity in doc.identities) {
+      for (i = 0; i < identity.value.length; i++)
+        suffix = identity.value.substring(i);
+      if (suffix && suffix[0] != " ")
+        emit(suffix, null);
+    }
+  }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/conversations/by/by_involves-map.js
@@ -0,0 +1,6 @@
+function(doc) {
+  if (doc.type == "message") {
+    for each (var contact_id in doc.involves_contact_ids)
+      emit([contact_id, doc.timestamp], doc.conversation_id);
+  }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/conversations/by/by_mailing_list-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+  if (doc.type == 'message') {
+    if (doc.headers && doc.headers["List-Id"]) {
+      var parts = doc.headers["List-Id"].match(/[\W\w\s]*<(.+)>.*/);
+      emit([parts[1], doc.timestamp], doc.conversation_id);
+    }
+  }
+}
new file mode 100644
--- /dev/null
+++ b/schema/conversations/by/by_tags-map.js
@@ -0,0 +1,6 @@
+function(doc) {
+  if (doc.type && doc.type=='anno/tags' && doc.tags) {
+      for (var i = 0; i < doc.tags.length; i++)
+        emit(doc.tags[i], doc.conversation_id || '');
+    }
+}
new file mode 100644
--- /dev/null
+++ b/schema/mailing_lists/all/by_list_id-map.js
@@ -0,0 +1,18 @@
+function(doc) {
+  if (doc.type == 'message') {
+    if (doc.headers && doc.headers["List-Id"]) {
+      var parts = doc.headers["List-Id"].match(/([\W\w]*)\s*<(.+)>.*/);
+      var values = {
+        "List-Id" : doc.headers["List-Id"],
+        "id" : parts[2],
+        "name" : parts[1]
+      };
+      for each (var headerId in ["List-Post","List-Archive","List-Help",
+                                 "List-Subscribe","List-Unsubscribe"]) {
+        if (doc.headers[headerId])
+          values[headerId] = doc.headers[headerId];
+      }
+      emit(parts[2], values);
+    }
+  }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/mailing_lists/all/by_list_id-reduce.js
@@ -0,0 +1,10 @@
+function(keys, values, rereduce) {
+  var output = {};
+  output.count = values.length;
+  for (var idx in values) {
+    for (var elm in values[idx]) {
+      output[elm] = values[idx][elm];
+    }
+  }
+  return output;
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_conversation-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+  if (doc.type && doc.type=='anno/tags' && doc.conversation_id) {
+    var id = doc._id;
+    var emit_id = id.substr(0, id.indexOf('!')) + '!message';
+    emit(doc.conversation_id, emit_id);
+  }
+}
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_doc_type-map.js
@@ -0,0 +1,5 @@
+function(doc)
+{
+    if (doc.type)
+        emit(doc.type, doc._rev);
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_header_message_id-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+  if (doc.type == "message" && doc.subtype == "rfc822") {
+    emit(doc.header_message_id, null);
+  }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/by_storage-map.js
@@ -0,0 +1,10 @@
+// This is intended to be a generic view, usable by all accounts for all 
+// *raw* message types.
+// Key is always: doc_subtype, account_id, storage_spec_key, and
+// 'startkey' and 'endkey' can be used to select specific message
+// subtypes for a particular account.
+// Storage key is any value which is determined by each message subtype.
+function(doc) {
+  if (doc.type == 'rawMessage' && doc.subtype && doc.account_id && doc.storage_key)
+      emit([doc.subtype, doc.account_id, doc.storage_key], null);
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/README.txt
@@ -0,0 +1,9 @@
+These are the views for the protocol implementations.
+
+Note that each protocol doesn't get its own design document - instead, the
+design documents are structured such a ones likely to be used together are
+grouped together, so they all get built at once.
+
+For example, the 'seen' design document includes one view per protocol - the
+expectation is in general, each protocol will need to process the same set of
+"new" documents to determine what they need to fetch.
new file mode 100644
--- /dev/null
+++ b/schema/proto/errors/errors-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+  if (doc.type=='core/error/msg')
+    // we use include_docs on this view, so get the ID magically...
+    emit(doc.raindrop_seq, null);
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/imap-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+    if (doc.type=='proto/imap') {
+        // imap uses a storage_key field which is already [foldername, uid]
+        // we include [flags, _rev] as the value so the protocol impl can see
+        // if things have changed.
+        emit(doc.storage_key, [doc.imap_flags, doc._rev]);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/skype-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+    if (doc.type && doc.type=='proto/skype-chat') {
+        emit([doc.skype_chatname, null], null);
+    } else if (doc.type && doc.type=='proto/skype-msg') {
+        emit([doc.skype_chatname, doc.skype_id], null);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/twitter-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+    if (doc.type=='proto/twitter') {
+        // Twitter uses an incrementing ID per user, and allows us to fetch
+        // only tweets since that ID.  This allows us to use map/reduce to
+        // find the exact ID we need per user.
+        emit(doc.twitter_user.toString(), doc.twitter_id);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/twitter-reduce.js
@@ -0,0 +1,10 @@
+function(keys, values)
+{
+    // we just need the largest ID we've ever seen.
+    // *sob* - where is max()?
+    ret = 0;
+    for each (var v in values)
+        if (v > ret)
+            ret = v;
+    return ret;
+}
new file mode 100644
--- /dev/null
+++ b/schema/tags/all/all-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+    if (doc.type && doc.type=='anno/tags' && doc.tags) {
+        var id = doc._id;
+        var emit_id = id.substr(0, id.indexOf('!')) + '!message';
+        for (var i = 0; i < doc.tags.length; i++)
+            emit(doc.tags[i], null);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/schema/tags/all/all-reduce.js
@@ -0,0 +1,19 @@
+function(keys, values, rereduce) {
+  var keySet = {}, i, j;
+  if (!rereduce) {
+    for (i = 0; i < keys.length; i++)
+      keySet[keys[i][0]] = true;
+  }
+  else {
+    for (i = 0; i < values.length; i++) {
+      var inSet = values[i];
+      for (j = 0; j < inSet.length; j++)
+        keySet[inSet[j]] = true;
+    }
+  }
+  var out = [];
+  for (var key in keySet)
+    out.push(key);
+  out.sort();
+  return out;
+}
deleted file mode 100644
--- a/server/python/junius/getmail.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-
-import base64, datetime, email.utils, email.header
-import pprint
-
-from gocept.imapapi.account import Account
-
-import junius.model as model
-
-'''
-Fetch new mail.
-'''
-
-class JuniusAccount(object):
-    def __init__(self, dbs, account_def):
-        self.dbs = dbs
-        self.account_def = account_def
-        self.imap_account = Account(account_def.host, account_def.port,
-                                    account_def.username, account_def.password,
-                                    account_def.ssl)
-    
-    def sync(self):
-        self.considerFolders(self.imap_account.folders)
-    
-    def considerFolders(self, folders):
-        for folder in folders.values():
-            self.syncFolder(folder)
-            self.considerFolders(folder.folders)
-    
-    def syncFolder(self, folder):
-        print '***** Sync-ing', folder.path
-        
-        folderStatus = self.account_def.folderStatuses.get(folder.path, {})
-        #curValidity = folder._validity()
-        #if curValidity != folderStatus['validity']:
-        #    pass
-        
-        # -- find out about what message id's we already know about
-        # this is really just the most primitive synchronization logic
-        #  available!  but that's okay.
-        known_uids = set()
-        startkey=[self.account_def.id, folder.path, 0]
-        endkey=[self.account_def.id, folder.path, 4000000000]
-        for row in model.Message.by_storage(self.dbs.messages, startkey=startkey, endkey=endkey).rows:
-            known_uids.add(row.key[2])
-        
-        processed = 0
-        skipped = 0
-        for message in folder.messages.values():
-            uid = int(message.UID)
-            if uid not in known_uids:
-                try:
-                    self.grok_message(message)
-                except: 
-                    print "ERROR groking messages"
-                    # http://trac.mozillamessaging.com/raindrop/ticket/3
-                processed += 1
-            else:
-                skipped += 1
-        print '  processed', processed, 'skipped', skipped
-    
-    def grok_email_addresses(self, *address_strings):
-        seen_contacts = {}
-        result_lists = []
-        involved_list = []
-        for address_string in address_strings:
-            cur_results = []
-            cur_addresses = email.utils.getaddresses((address_string,))
-            for name, address in cur_addresses:
-                # XXX TODO: we can use 'keys' instead of just key.
-                contacts = model.Contact.by_identity(self.dbs.contacts,
-                                                     key=['email', address])
-                if len(contacts):
-                    # the contact exists, use it
-                    contact = list(contacts)[0]
-                    if contact.id in seen_contacts:
-                        contact = seen_contacts[contact.id]
-                    else:
-                        involved_list.append( (address,contact) )
-                        seen_contacts[contact.id] = contact
-                else:
-                    # the contact does't exist, create it
-                    if not name:
-                        name = address
-                    else:
-                        try:
-                            pieces = email.header.decode_header(name)
-                            encoded_pieces = [piece.decode(encoding or "utf-8") for piece, encoding in pieces]
-                            name = u"".join(encoded_pieces)
-                        except (LookupError, UnicodeError):
-                            name = u""+name
-
-                    contact = model.Contact(
-                        name=name,
-                        identities=[{'kind': 'email', 'value': address}]
-                    )
-                    contact.store(self.dbs.contacts)
-                    involved_list.append( (address,contact) )
-                    seen_contacts[contact.id] = contact
-                cur_results.append( (address,contact) )
-            result_lists.append(cur_results)
-        result_lists.append(involved_list)
-        return result_lists
-    
-    def extract_message_id(self, message_id_string, acceptNonDelimitedReferences):
-        # this is a port of my fix for bug 466796, the comments should be ported
-        #  too if we keep this logic...
-        whitespaceEndedAt = None
-        firstMessageIdChar = None
-        foundLessThan = False
-        message_len = len(message_id_string)
-        i = 0
-        while i < message_len:
-            char = message_id_string[i]
-            # do nothing on whitespace
-            if char in r' \r\n\t':
-                pass
-            else:
-                if char == '<':
-                    i += 1 # skip over the '<'
-                    firstMessageIdChar = i
-                    foundLessThan = True
-                    break
-                if whitespaceEndedAt is None:
-                    whitespaceEndedAt = i
-            i += 1
-        
-        # if we hit a '<', keep going until we hit a '>' or the end
-        if foundLessThan:
-            while i < message_len:
-                char = message_id_string[i]
-                if char == '>':
-                    # it's valid, update reference, making sure to stop before the '>'
-                    return [message_id_string[firstMessageIdChar:i],
-                            message_id_string[i+1:]]
-                i += 1
-        
-        # if we are at the end of the string, we found some non-whitespace,
-        #  and the caller requested that we accept non-delimited whitespace,
-        #  give them that as their reference.  (otherwise, leave it empty)
-        if acceptNonDelimitedReferences and whitespaceEndedAt:
-            return [message_id_string[whitespaceEndedAt:], '']
-        return [None, '']
-    
-    def extract_message_ids(self, message_id_string):
-        references = []
-        while message_id_string:
-            ref, message_id_string = self.extract_message_id(message_id_string,
-                                                             not references)
-            if ref:
-                references.append(ref)
-        return references
-    
-    def grok_message_conversation(self, imsg):
-        self_header_message_id = imsg.headers['Message-Id'][1:-1]
-        refs_str = imsg.headers.get('References') or imsg.headers.get('In-Reply-To') or ''
-        conversation_id = None
-        conversations = {}
-        self_message = None
-        header_message_ids = self.extract_message_ids(refs_str)
-        unseen = set(header_message_ids)
-
-        # save off the list of referenced messages
-        references = header_message_ids[:]
-        # see if the self-message already exists...
-        header_message_ids.append(self_header_message_id)
-
-        messages = model.Message.by_header_id(self.dbs.messages,
-                                              keys=header_message_ids)
-        for message in messages:
-            if message.header_message_id == self_header_message_id:
-                self_message = message
-            else:
-                unseen.remove(message.header_message_id)
-            conversation_id = message.conversation_id
-            
-        if conversation_id is None:
-            # we need to allocate a conversation_id...
-            conversation_id = self_header_message_id
-            
-        # create dudes who are missing
-        if unseen:
-            missing_messages = []
-            for header_message_id in unseen:
-                missing_messages.append(model.Message(
-                    conversation_id=conversation_id,
-                    header_message_id=header_message_id,
-                    ))
-            self.dbs.messages.update(missing_messages)
-        
-        return conversation_id, self_message, references
-    
-    def grok_message(self, imsg):
-        attachments = {}
-        bodyPart = self.grok_part(imsg, imsg.body, attachments)
-        
-        # XXX the gocept header logic unfortunately is case-sensitive...
-        # XXX also, doesn't support repeated values...
-        # (but we can live with these limitations for now)
-        
-        from_contacts, to_contacts, cc_contacts, involves_contacts = self.grok_email_addresses(
-            imsg.headers.get('From', ''), imsg.headers.get('To', ''),
-            imsg.headers.get('Cc', ''))
-
-        conversation_id, existing_message, references = self.grok_message_conversation(imsg)
-        
-        timestamp = email.utils.mktime_tz(email.utils.parsedate_tz(imsg.headers['Date']))
-
-        cmsg = model.Message(
-            account_id=self.account_def.id,
-            storage_path=imsg.parent.path,
-            storage_id=int(imsg.UID),
-            #
-            conversation_id=conversation_id,
-            header_message_id=imsg.headers.get('Message-Id')[1:-1],
-            references=references,
-            #
-            from_contact_id=from_contacts[0][1].id,
-            to_contact_ids=[c.id for k,c in to_contacts],
-            cc_contact_ids=[c.id for k,c in cc_contacts],
-            involves_contact_ids=[c.id for k,c in involves_contacts],
-            #
-            from_contact=dict([ (from_contacts[0][1].id, dict([ ("name",from_contacts[0][1].name), 
-                                                                ("email",from_contacts[0][0]) ]) ) ]),
-            to_contacts=dict([ (c.id , dict([("name" , c.name),("email",k)]) ) for k,c in to_contacts]),
-            cc_contacts=dict([ (c.id , dict([("name" , c.name),("email",k)]) ) for k,c in cc_contacts]),
-            involves_contacts=dict([ (c.id , dict([("name" , c.name),("email",k)]) ) for k,c in involves_contacts]),
-            #
-            date=datetime.datetime.utcfromtimestamp(timestamp),
-            timestamp=timestamp,
-            #
-            read=r'\Seen' in imsg.flags,
-            #
-            headers=dict(imsg.headers),
-            bodyPart=bodyPart,
-            _attachments=attachments
-        )
-
-        if existing_message:
-            cmsg.id = existing_message.id
-            # this is ugly, we should really just have the logic above use a
-            #  style that allows it to work with new or existing...
-            cmsg._data['_rev'] = existing_message.rev
-        
-        cmsg.store(self.dbs.messages)
-        
-    
-    def grok_part(self, msg, part, attachments, depth=0):
-        contentType = part['content_type']
-        partNumber = part['partnumber']
-        me = {'contentType': contentType,
-              'partNumber': partNumber}
-        if contentType.startswith('multipart/'):
-            parts = me['parts'] = []
-            for subpart in part.parts:
-                parts.append(self.grok_part(msg, subpart, attachments, depth+1))
-        else:
-            me['parameters'] = part['parameters']
-            data = part.fetch()
-            # XXX perhaps we should recursively process the nested part dude?
-            # (if contentType == 'message/rfc822')
-            if contentType.startswith('text/'):
-                me['data'] = data
-            else:
-                attachments[partNumber] = {'content_type': contentType,
-                                           'data': base64.b64encode(data)}
-        return me
-
-class Grabber(object):
-    def __init__(self, dbs):
-        self.dbs = dbs
-    
-    def syncAccounts(self):
-        for account in model.Account.all(self.dbs.accounts):
-            if account.kind == 'imap':
-                junius_account = JuniusAccount(self.dbs, account)
-                junius_account.sync()
-
-if __name__ == '__main__':
-    import os
-    #acct = JuniusAccount('localhost', 8143, os.environ['USER'], 'pass')
-    dbs = model.fab_db()
-    grabber = Grabber(dbs)
-    grabber.syncAccounts()
deleted file mode 100644
--- a/server/python/junius/getskype.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#!/usr/bin/env python
-'''
-Fetch skype contacts and chats.
-'''
-
-import base64, datetime, email.utils
-import pprint
-import time
-import re
-from urllib2 import urlopen
-
-import Skype4Py
-global skype
-import junius.model as model
-
-
-
-class SkypeAccount(object):
-    def __init__(self, dbs, account_def, skype):
-        self.dbs = dbs
-        self.account_def = account_def
-        self.skype = skype
-        self.re_tags = re.compile(r'#(\w+)')
-        self._knownContactsByHandle= {}
-
-    def create_account_if_necessary(self):
-        self.author = self.create_contact_if_necessary()
-
-    def create_contact_if_necessary(self, handle=None):
-        #print "skype_user = " + skype_user.FullName
-        if handle in self._knownContactsByHandle:
-            return self._knownContactsByHandle[handle]
-        if not handle:
-            skype_user = self.skype.CurrentUser
-            handle = skype_user.Handle
-        else:
-            if isinstance(handle, Skype4Py.user.IUser):
-                skype_user = handle
-            else:
-                print "getting user for handle", handle
-                skype_user = skype.User(handle)
-        contacts = model.Contact.by_identity(self.dbs.contacts,
-                                             key=['skype', skype_user.Handle])
-
-        if len(contacts) == 0:
-            # the contact does't exist, create it
-
-            attachments = {}
-            #fname = '/tmp/foo.png' # os.getcwd() + '/' + 'skypeavatar' + skype_user.Handle + '.jpg'
-            #open(fname,'w').close();
-            #skype_user.SaveAvatarToFile(fname) - fails, don't know why.
-            # XXXX - skype_user.SaveAvatarToFile -> get the image!
-            if False: ######
-                try:
-                  response = urlopen(account.profile_image_url)
-                  attachments['default'] = { 'content_type': response.info()['Content-Type'],
-                                             'data': base64.b64encode(response.read()) }
-                except:
-                    pass
-
-            identities = [{'kind': 'skype', 'value': skype_user.Handle}]
-            if skype_user.Homepage: 
-                identities.append({'kind': 'url' , 'value' : skype_user.Homepage })
-
-            contact = model.Contact(
-                name=skype_user.FullName,
-                identities=identities,
-                #location=account.location, xxxxxx ???????? what is this?
-                _attachments=attachments
-            )
-            #print "XXX adding contact: ", skype_user.FullName
-
-            contact.store(self.dbs.contacts)
-        else:
-            contact = [model.Contact.load(self.dbs.contacts,contact.value['_id']) for contact in contacts.rows][0]
-
-        self._knownContactsByHandle[handle] = contact
-        return contact
-
-    def sync(self):
-        
-        print '***** Finding existing messages:',
-        # -- find out about what message id's we already know about
-        # this is really just the most primitive synchronization logic
-        #  available!  but that's okay.
-        known_uids = set()
-        for row in model.Message.by_header_id(self.dbs.messages).rows:
-            known_uids.add(row.key)
-
-        print len(known_uids)
-        processed = 0
-        skipped = 0
-
-        print '***** Fetching skype messages'
-        for i, chat in enumerate(self.skype.Chats):
-            messages = chat.Messages
-            involves = self.grok_involves(self.author, chat)
-            print "chat %d of %d '%s' (%d messages, %d contacts)" % \
-                  (i, len(self.skype.Chats), chat.Name, len(messages), len(involves))
-
-            for msg in messages:
-                if str(msg.Id) not in known_uids:
-                    self.grok_message(self.author, chat, msg, involves)
-                    processed += 1
-                else:
-                    skipped += 1
-        print '  processed', processed, 'skipped', skipped
-
-        print '***** Fetching contacts'
-        
-        for f in self.skype.Friends:
-            self.create_contact_if_necessary(f)
-
-        print 'synchronization of contacts completed'
-
-
-    def grok_message(self, author, chat, msg, involves):
-        # Its possible we should make the first message in a chat the "parent"
-        # and have all other messages reference that parent??
-
-        # The status of a message includes ones that only make sense for sent
-        # (eg, sending, sent) as well as for received.  An explicit status of
-        # 'read' exists - so we assume if it is 'received' it isn't read yet.
-        is_read = msg.Status != Skype4Py.cmsReceived
-        msgauthor = self.create_contact_if_necessary(msg.FromHandle)
-        cmsg = model.Message(
-            account_id=self.account_def.id,
-            # Is this expected to be a real URL?
-            storage_path='http://skype.com/%s' % (chat.Name,),
-            storage_id=str(msg.Id),
-            #
-            conversation_id=chat.Name.replace('/', '').replace('#', ''),
-            header_message_id=msg.Id,
-            references=[], # should reference the 'parent' (ie, the chat iself?)
-            # XXX - fixup 'from' and 'to' handling.
-            from_contact_id=str(msgauthor.id),
-            from_contact={ str(msgauthor.id) : { "name" : msg.FromDisplayName} },
-            to_contact_ids=[],
-            cc_contact_ids=[],
-            involves_contact_ids=[involved for involved in involves],
-            involves_contacts=involves,
-            date=msg.Datetime,
-            timestamp=time.mktime(msg.Datetime.timetuple()),
-            #
-            read=is_read,
-            tags=self.re_tags.findall(msg.Body),
-            headers={ "Subject" : chat.FriendlyName },
-            bodyPart={"data":msg.Body, "contentType":"text/plain"},
-            _attachments={}
-        )
-
-        cmsg.store(self.dbs.messages)
-
-    def grok_involves(self, author, chat):
-        involves = { author.id : { 'name' : author.name }  }
-        for m in chat.Members:
-            account = self.create_contact_if_necessary(m)
-            user = skype.User(m.Handle)
-            involves[account.id] = { 'username' : m.Handle,
-                                     'name' : user.FullName}
-        return involves
-
-
-class Grabber(object):
-    def __init__(self, dbs):
-        self.dbs = dbs
-    
-    def syncAccounts(self):
-        global skype
-        skype = Skype4Py.Skype()
-        print "attaching to skype..."
-        skype.Attach()
-        print "Synching..."
-        for account in model.Account.all(self.dbs.accounts):
-            if account.kind == 'skype':
-                # XXX - something needs to check we are currently logged
-                # in to this account.
-                junius_account = SkypeAccount(self.dbs, account, skype)
-                junius_account.create_account_if_necessary();
-                junius_account.sync()
-
-if __name__ == '__main__':
-    import os
-    dbs = model.fab_db()
-    grabber = Grabber(dbs)
-    grabber.syncAccounts()
deleted file mode 100644
--- a/server/python/junius/gettweets.py
+++ /dev/null
@@ -1,196 +0,0 @@
-#!/usr/bin/env python
-
-import base64, datetime, email.utils
-import pprint
-import re
-import sys
-from urllib2 import urlopen, HTTPError
-
-import twitter
-
-import junius.model as model
-
-'''
-Fetch contacts, and tweets.
-'''
-
-
-class JuniusAccount(object):
-    def __init__(self, dbs, account_def):
-        self.dbs = dbs
-        self.account_def = account_def
-        self.twitter_account = twitter.Api(username=account_def.username, password=account_def.password)
-        self.re_involves =  re.compile(r'@(\w+)')
-        self.re_tags = re.compile(r'#(\w+)')
-        self._contactsByHandle = {}
-        
-        # -- find out about what message id's we already know about
-        # this is really just the most primitive synchronization logic
-        #  available!  but that's okay.
-        self.known_uids = set()
-        #startkey=0
-        #endkey=4000000000
-        #for row in model.Message.by_header_id(self.dbs.messages, startkey=startkey, endkey=endkey).rows:
-        ## XXX optimize this to only look for tweets from this user
-        for row in model.Message.by_header_id(self.dbs.messages).rows:
-            self.known_uids.add(row.key)
-        try:
-            self.twitter_user = self.twitter_account.GetUser(self.account_def.username)
-        except HTTPError, e:
-            print e.url
-            raise e
-        self.author = self.create_contact_if_necessary(self.account_def.username)
-        self.sync(self.twitter_user)
-
-    def create_contact_if_necessary(self, username):
-        if username in self._contactsByHandle:
-            return self._contactsByHandle[username]
-        print "looking for contact with username ", username
-        contacts = model.Contact.by_identity(self.dbs.contacts,
-                                             key=['twitter', username])
-        try:
-            twitter_account = self.twitter_account.GetUser(username)
-        except HTTPError, e:
-            if e.code == 404:
-                return None
-            raise e
-
-        if len(contacts) == 0:
-            # the contact does't exist, create it
-
-            attachments = {}
-            if twitter_account.profile_image_url:
-                try:
-                  response = urlopen(twitter_account.profile_image_url)
-                  attachments['default'] = { 'content_type': response.info()['Content-Type'],
-                                             'data': base64.b64encode(response.read()) }
-                except:
-                    pass
-
-            identities = [{'kind': 'twitter', 'value': username }]
-            if twitter_account.url: 
-                identities.append({'kind': 'url' , 'value' : twitter_account.url })
-
-            contact = model.Contact(
-                name=twitter_account.name,
-                identities=identities,
-                location=twitter_account.location,
-                _attachments=attachments
-            )
-
-            contact.store(self.dbs.contacts)
-        else:
-            contact = [model.Contact.load(self.dbs.contacts,contact.value['_id']) for contact in contacts.rows][0]
-        self._contactsByHandle[username] = contact
-        return contact
-    
-    def find_contacts(self):
-        """for all of my contacts w/ email addresses, look for twitter accounts,
-        and create them if they are found"""
-        twitter_contacts = model.Contact.by_identity(self.dbs.contacts, startkey=['twitter', ''],
-                                             endkey=['twitter', 'ZZZZ'])
-        twitter_ids = {}
-        for contact in twitter_contacts.rows:
-            for identity in contact.value['identities']:
-                twitter_ids[identity['value']] = contact
-
-        contacts = model.Contact.by_identity(self.dbs.contacts, startkey=['email', ''],
-                                             endkey=['email', 'ZZZZ'])
-        print "contacts =", contacts
-        for c in contacts.rows:
-            for identity in c.value['identities']:
-                email = identity['value']
-            print "looking for twitter account for " + email, 
-            try:
-                twitteruser = self.twitter_account.GetUserByEmail(email)
-            except HTTPError, e:
-                if e.code == 404: 
-                    print "nope"
-                    continue
-                raise e
-            print '=>', twitteruser.id
-            if twitteruser.screen_name not in twitter_ids:
-                contact = self.create_contact_if_necessary(twitteruser.screen_name)
-#                if contact:  # this will easily get past the twitter API hourly limits.
-#                    self.sync(twitteruser)
-
-        sys.exit(0)
-
-    def sync(self, twitter_account):
-        print '***** Fetching tweets for', twitter_account.screen_name
-        processed = 0
-        skipped = 0
-
-        for message in self.twitter_account.GetUserTimeline(twitter_account.screen_name):
-            if str(message.id) not in self.known_uids:
-                self.grok_message(self.author, message)
-                processed += 1
-            else:
-                skipped += 1
-        print '  processed', processed, 'skipped', skipped
-
-
-
-    def grok_message(self, author, imsg):
-        involves = self.grok_involves(author, imsg)
-        print "grokking message from " + author.name
-        cmsg = model.Message(
-            account_id=self.account_def.id,
-            storage_path='http://twitter.com/' + imsg.user.screen_name + '/status/' + str(imsg.id),
-            storage_id=str(imsg.id),
-            #
-            conversation_id=imsg.GetInReplyToStatusId() if imsg.GetInReplyToStatusId() else str(imsg.id),
-            header_message_id=str(imsg.id),
-            references=[],
-            #
-            from_contact_id=str(author.id),
-            from_contact={ str(author.id) : { "name" : author.name } },
-            to_contact_ids=[],
-            cc_contact_ids=[],
-            involves_contact_ids=[involved for involved in involves],
-            involves_contacts=involves,
-            #
-            date=datetime.datetime.utcfromtimestamp(imsg.created_at_in_seconds),
-            timestamp=int(imsg.created_at_in_seconds),
-            #
-            read=False,
-            tags=self.re_tags.findall(imsg.text),
-            headers={ "Subject" : "" },
-            bodyPart={"data":imsg.text, "contentType":"text/plain"},
-            _attachments={}
-        )
-
-        cmsg.store(self.dbs.messages)
-
-    def grok_involves(self, author, imsg):
-        involves = { author.id : { 'name' : author.name }  }
-        usernames = self.re_involves.findall(imsg.text)
-        for username in usernames:
-            account = self.create_contact_if_necessary(username)
-            if account:
-                involves[account.id] = { 'username' : username,
-                                         'name' : account.name }
-        return involves
-
-class Grabber(object):
-    def __init__(self, dbs):
-        self.dbs = dbs
-    
-    def syncAccounts(self):
-        for account in model.Account.all(self.dbs.accounts):
-            if account.kind == 'twitter':
-                junius_account = JuniusAccount(self.dbs, account)
-                
-                for friend in junius_account.twitter_account.GetFriends():
-                    junius_account.create_contact_if_necessary(friend.screen_name)
-                print 'got friend accounts'
-
-
-                junius_account.find_contacts()
-
-
-if __name__ == '__main__':
-    import os
-    dbs = model.fab_db()
-    grabber = Grabber(dbs)
-    grabber.syncAccounts()
rename from server/python/junius/__init__.py
rename to server/python/raindrop/__init__.py
rename from server/python/junius/bootstrap.py
rename to server/python/raindrop/bootstrap.py
--- a/server/python/junius/bootstrap.py
+++ b/server/python/raindrop/bootstrap.py
@@ -1,159 +1,268 @@
 #!/usr/bin/env python
 
 '''
 Setup the CouchDB server so that it is fully usable and what not.
 '''
-import model
-
 import sys
+import twisted.web.error
+from twisted.internet import defer
 import os, os.path, mimetypes, base64, pprint
-
-import junius.model as model
-
-def setup_account(dbs):
-    if len(model.Account.all(dbs.accounts)):
-        print 'Account presumably already exists, not adding it!'
-        return
-    
-    # we want a file of the form:
-    #  hostname,port,username,password,ssl?
-    # example:
-    #  mail.example.com,993,bob@example.com,sekret,True
-    import os, os.path
-    configPath = os.path.join(os.environ['HOME'], ".junius")
-    f = open(configPath, 'r')
-    data = f.read()
-    f.close()
-    host, portstr, username, password, sslstr = data.split(',')
-    ssl = not (sslstr.strip().lower() in ['false', 'f', 'no', '0'])
-    
-    account = model.Account(
-        kind='imap', host=host, port=int(portstr), ssl=ssl,
-        username=username, password=password,
-    )
-    account.store(dbs.accounts)
+import model
+import hashlib
 
-def setup_twitter_account(dbs):
-    # we want a file of the form:
-    #  username,password
-    # example:
-    #  davidascher,sekret
-    import os, os.path
-    configPath = os.path.join(os.environ['HOME'], ".junius_twitter")
-    if not os.path.isfile(configPath):
-        print "Skipping twitter - no config file '%s'" % (configPath,)
-        return
-    f = open(configPath, 'r')
-    data = f.read()
-    f.close()
-    username, password = data.strip().split(',')
-
-    accts = [acct for acct in model.Account.all(dbs.accounts) if acct.kind=='twitter' and acct.username==username]
-    if accts and len(accts) > 0:
-        print 'Twitter account for %s already exist, not automatically adding a new one!' % username
-        return
-
-    account = model.Account(
-        kind='twitter', username=username, password=password,
-    )
-    account.store(dbs.accounts)
+from .config import get_config
+from .model import get_db
 
-def setup_skype_account(dbs):
-    # For now we just use whoever is currently logged into skype (but we
-    # probably do want a config file and should skip things if that user
-    # isn't current.)
-
-    # XXX - paramaterized views?
-    accts = [acct for acct in model.Account.all(dbs.accounts) if acct.kind=='skype']
-    if accts:
-        print 'Skype accounts for %r already exist, not automatically adding a new one!' % \
-              [acct.username for a in accts]
-        return
-
-    try:
-        import Skype4Py
-    except ImportError:
-        print "skipping skype as the Skype4Py module isn't installed"
-    else:
-        skype = Skype4Py.Skype()
-        skype.Timeout = 10000 # default of 30 seconds is painful...
-        print 'Connecting to Skype (please check skype; you may need to authorize us)'
-        try:
-            skype.Attach()
-        except Skype4Py.SkypeAPIError, exc:
-            print 'Failed to attach to Skype (is it installed and running? Is authorization pending?)'
-            print '  error was:', exc
-            return
-        print 'Creating skype account for current user', skype.CurrentUser.Handle
-        account = model.Account(
-            kind='skype', username=skype.CurrentUser.Handle,
-        )
-        account.store(dbs.accounts)
+import logging
+logger = logging.getLogger(__name__)
 
 def path_part_nuke(path, count):
     for i in range(count):
         path = os.path.dirname(path)
     return path
     
 
 FILES_DOC = 'files' #'_design/files'
 
-def install_client_files(dbs):
-    '''
-    cram everyone in 'client' into the 'junius' app database
+# Updating design documents when not necessary can be expensive as all views
+# in that doc are reset. So we've created a helper - a simple 'fingerprinter'
+# which creates a dict, each entry holding the finterprint of a single item
+# (usually a file), used when files on the file-system are the 'source' of a
+# couchdb document. The 'fingerprint' is stored with the document, so later we
+# can build the fingerprint of the file-system, and compare them to see if we
+# need to update the document. Simple impl - doesn't use the 'stat' of the
+# file at all, but always calculates the md5 checksum of the content.
+class Fingerprinter:
+    def __init__(self):
+        self.fs_hashes = {}
+
+    def get_finger(self, filename):
+        # it doesn't make sense to calc a file's fingerprint twice
+        assert filename not in self.fs_hashes
+        ret = self.fs_hashes[filename] = hashlib.md5()
+        return ret
+
+    def get_prints(self):
+        return dict((n,h.hexdigest()) for (n, h) in self.fs_hashes.iteritems())
+
+
+def install_client_files(whateva, options):
     '''
-    if FILES_DOC in dbs.junius:
-        print 'Design doc already exists, will be updating/overwriting files'
-        design_doc = dbs.junius[FILES_DOC]
-        attachments = design_doc['_attachments'] = {}
-    else:
-        design_doc = {}
+    cram everyone in 'client' into the app database
+    '''
+    d = get_db()
+
+    def _opened_ok(doc):
+        logger.debug("document '%(_id)s' already exists at revision %(_rev)s",
+                    doc)
+        return doc
+
+    def _open_not_exists(failure, *args, **kw):
+        failure.trap(twisted.web.error.Error)
+        if failure.value.status != '404': # not found.
+            failure.raiseException()
+        return {} # return an empty doc.
+
+    def _maybe_update_doc(design_doc):
+        fp = Fingerprinter()
         attachments = design_doc['_attachments'] = {}
-    
-    # we cannot go in a zipped egg...
-    junius_root_dir = path_part_nuke(model.__file__, 4)
-    client_dir = os.path.join(junius_root_dir, 'client')
-    print 'listing contents of', client_dir
-    
-    for filename in os.listdir(client_dir):
-        path = os.path.join(client_dir, filename)
-        if filename.endswith("~") or filename.startswith("."): continue;
-        if os.path.isfile(path):
-            f = open(path, 'rb')
-            ct = mimetypes.guess_type(filename)[0]
-            if ct is None and sys.platform=="win32":
-                # A very simplistic check in the windows registry.
-                import _winreg
-                try:
-                    k = _winreg.OpenKey(_winreg.HKEY_CLASSES_ROOT,
-                                        os.path.splitext(filename)[1])
-                    ct = _winreg.QueryValueEx(k, "Content Type")[0]
-                except EnvironmentError:
-                    pass
-            assert ct, "can't guess the content type for '%s'" % filename
-            attachments[filename] = {
-                'content_type': ct,
-                'data': base64.b64encode(f.read())
-            }
-            f.close()
-        print 'filename "%s" (%s)' % (filename, ct)
-    
-    dbs.junius[FILES_DOC] = design_doc
+        # we cannot go in a zipped egg...
+        root_dir = path_part_nuke(model.__file__, 4)
+        client_dir = os.path.join(root_dir, 'client')
+        logger.debug("listing contents of '%s' to look for client files", client_dir)
+
+        for filename in os.listdir(client_dir):
+            path = os.path.join(client_dir, filename)
+            if os.path.isfile(path):
+                f = open(path, 'rb')
+                ct = mimetypes.guess_type(filename)[0]
+                if ct is None and sys.platform=="win32":
+                    # A very simplistic check in the windows registry.
+                    import _winreg
+                    try:
+                        k = _winreg.OpenKey(_winreg.HKEY_CLASSES_ROOT,
+                                            os.path.splitext(filename)[1])
+                        ct = _winreg.QueryValueEx(k, "Content Type")[0]
+                    except EnvironmentError:
+                        pass
+                assert ct, "can't guess the content type for '%s'" % filename
+                data = f.read()
+                fp.get_finger(filename).update(data)
+                attachments[filename] = {
+                    'content_type': ct,
+                    'data': base64.b64encode(data)
+                }
+                f.close()
+            logger.debug("filename '%s' (%s)", filename, ct)
+        new_prints = fp.get_prints()
+        if options.force or design_doc.get('fingerprints') != new_prints:
+            logger.info("client files are different - updating doc")
+            design_doc['fingerprints'] = new_prints
+            return d.saveDoc(design_doc, FILES_DOC)
+        logger.debug("client files are identical - not updating doc")
+        return None
+
+    defrd = d.openDoc(FILES_DOC)
+    defrd.addCallbacks(_opened_ok, _open_not_exists)
+    defrd.addCallback(_maybe_update_doc)
+    return defrd
 
 
-def main():
-    import sys
-    if 'nuke' in sys.argv:
-      print 'NUKING DATABASE!!!'
-      model.nuke_db()
+def install_accounts(whateva):
+    db = get_db()
+    config = get_config()
+
+    def _opened_ok(doc):
+        logger.info("account '%(_id)s' already exists, will be updating existing account",
+                    doc)
+        return doc
+
+    def _open_not_exists(failure, doc_id, *args, **kw):
+        failure.trap(twisted.web.error.Error)
+        if failure.value.status != '404': # not found.
+            failure.raiseException()
+        return {'_id': doc_id} # return an empty doc for the account.
+
+    def _update_acct(doc, info):
+        logger.debug("updating %s with %s", doc, info)
+        doc.update(info)
+        doc['type'] = 'account'
+        return db.saveDoc(doc, doc['_id'])
+
+    def _open_doc(whateva, key):
+        return db.openDoc(key)
+
+    d = defer.Deferred()
+
+    for acct_name, acct_info in config.accounts.iteritems():
+        acct_id = acct_info['_id']
+        logger.info("Adding account '%s'", acct_id)
+        d.addCallback(_open_doc, acct_id)
+        d.addCallbacks(_opened_ok, _open_not_exists, errbackArgs=(acct_id,))
+        d.addCallback(_update_acct, acct_info)
+
+    # Start the chain and return.
+    d.callback(None)
+    return d
+
 
-    dbs = model.fab_db(update_views='updateviews' in sys.argv)
+# Functions working with design documents holding views.
+def install_views(whateva, options):
+    def _doc_not_found(failure):
+        return None
+
+    def _got_existing_docs(results, docs):
+        put_docs = []
+        for (whateva, existing), doc in zip(results, docs):
+            if existing:
+                assert existing['_id']==doc['_id']
+                assert '_rev' not in doc
+                if not options.force and \
+                   doc['fingerprints'] == existing.get('fingerprints'):
+                    logger.debug("design doc %r hasn't changed - skipping",
+                                 doc['_id'])
+                    continue
+                existing.update(doc)
+                doc = existing
+            logger.info("design doc %r has changed - updating", doc['_id'])
+            put_docs.append(doc)
+        return get_db().updateDocuments(put_docs)
+    
+    schema_src = os.path.abspath(os.path.join(os.path.dirname(__file__),
+                                              "../../../schema"))
+
+    docs = [d for d in generate_view_docs_from_filesystem(schema_src)]
+    logger.debug("Found %d documents in '%s'", len(docs), schema_src)
+    assert docs, 'surely I have *some* docs!'
+    # ack - I need to open existing docs first to get the '_rev' property.
+    dl = []
+    for doc in docs:
+        deferred = get_db().openDoc(doc['_id']).addErrback(_doc_not_found)
+        dl.append(deferred)
+
+    return defer.DeferredList(dl
+                ).addCallback(_got_existing_docs, docs)
+
 
-    setup_account(dbs)
-    setup_twitter_account(dbs)
-    setup_skype_account(dbs)
-    install_client_files(dbs)
-    
+def _build_views_doc_from_directory(ddir):
+    # all we look for is the views.
+    ret = {}
+    fprinter = Fingerprinter()
+    ret_views = ret['views'] = {}
+    # The '-map.js' file is the 'trigger' for creating a view...
+    tail = "-map.js"
+    rtail = "-reduce.js"
+    files = os.listdir(ddir)
+    for f in files:
+        fqf = os.path.join(ddir, f)
+        if f.endswith(tail):
+            view_name = f[:-len(tail)]
+            try:
+                with open(fqf) as f:
+                    data = f.read()
+                    ret_views[view_name] = {'map': data}
+                    fprinter.get_finger(view_name+tail).update(data)
+            except (OSError, IOError):
+                logger.warning("can't open map file %r - skipping this view", fqf)
+                continue
+            fqr = os.path.join(ddir, view_name + rtail)
+            try:
+                with open(fqr) as f:
+                    data = f.read()
+                    ret_views[view_name]['reduce'] = data
+                    fprinter.get_finger(view_name+rtail).update(data)
+            except (OSError, IOError):
+                # no reduce - no problem...
+                logger.debug("no reduce file %r - skipping reduce for view '%s'",
+                             fqr, view_name)
+        else:
+            # avoid noise...
+            if not f.endswith(rtail) and not f.startswith("."):
+                logger.info("skipping non-map/reduce file %r", fqf)
 
-if __name__ == '__main__':
-    main()
+    ret['fingerprints'] = fprinter.get_prints()
+    logger.debug("Document in directory %r has views %s", ddir, ret_views.keys())
+    if not ret_views:
+        logger.warning("Document in directory %r appears to have no views", ddir)
+    return ret
+
+
+def generate_view_docs_from_filesystem(root):
+    # We use the same file-system layout as 'CouchRest' does:
+    # http://jchrisa.net/drl/_design/sofa/_show/post/release__couchrest_0_9_0
+    # note however that we don't create a design documents in exactly the same
+    # way - the view is always named as specified, and currently no 'map only'
+    # view is created (and if/when it is, only it will have a "special" name)
+    # See http://groups.google.com/group/raindrop-core/web/maintaining-design-docs
+
+    # This is pretty dumb (but therefore simple).
+    # root/* -> directories used purely for a 'namespace'
+    # root/*/* -> directories which hold the contents of a document.
+    # root/*/*-map.js and maybe *-reduce.js -> view content with name b4 '-'
+    logger.debug("Starting to build design documents from %r", root)
+    for top_name in os.listdir(root):
+        fq_child = os.path.join(root, top_name)
+        if not os.path.isdir(fq_child):
+            logger.debug("skipping non-directory: %s", fq_child)
+            continue
+        # so we have a 'namespace' directory.
+        num_docs = 0
+        for doc_name in os.listdir(fq_child):
+            fq_doc = os.path.join(fq_child, doc_name)
+            if not os.path.isdir(fq_doc):
+                logger.info("skipping document non-directory: %s", fq_doc)
+                continue
+            # have doc - build a dict from its dir.
+            doc = _build_views_doc_from_directory(fq_doc)
+            # XXX - note the artificial 'raindrop' prefix - the intent here
+            # is that we need some way to determine which design documents we
+            # own, and which are owned by extensions...
+            # XXX - *sob* - and that we shouldn't use '/' in the doc ID at the
+            # moment (well - we probably could if we ensured we quoted all the
+            # '/' chars, but that seems too much burden for no gain...)
+            doc['_id'] = '_design/' + ('!'.join(['raindrop', top_name, doc_name]))
+            yield doc
+            num_docs += 1
+
+        if not num_docs:
+            logger.info("skipping sub-directory without child directories: %s", fq_child)
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/config.py
@@ -0,0 +1,74 @@
+import ConfigParser, logging, os, os.path
+
+__all__ = ['get_config']
+
+class Config(object):
+  COUCH_DEFAULTS = {'host': 'localhost', 'port': 5984, 'name': 'raindrop'}
+  def __init__(self):
+    self.parser = ConfigParser.SafeConfigParser()
+
+    self.couches = {'local': self.COUCH_DEFAULTS.copy()}
+    self.accounts = {}
+
+    # XXX - this seems wrong: most of the time the 'config' - particularly the
+    # list of accounts etc - will come from the DB.  The config file should only
+    # be used during bootstrapping.
+    self.load()
+
+
+  def dictifySection(self, section_name, defaults=None, name=None):
+    '''
+    Given a config section name, suck up its contents into a dictionary.  Poor
+    man's type detection turns lowercase true/false into the boolean of that
+    type, things that can be int()ed into ints, and otherwise things get to
+    stay strings.  Defaults are applied before dictification, and the name is
+    an optional default for 'name' if specified (which overrides the defaults
+    dict.)
+    '''
+    results = {}
+    if defaults:
+      results.update(defaults)
+    if name:
+      results['name'] = name
+    for name, value in self.parser.items(section_name):
+      if value.lower() in ('true', 'false'):
+        value = (value.lower() == 'true')
+      else:
+        try:
+          value = int(value)
+        except:
+          pass
+
+      results[name] = value
+    return results
+
+  def load(self):
+    self.parser.read([os.path.expanduser('~/.raindrop')])
+
+    COUCH_PREFIX = 'couch-'
+    ACCOUNT_PREFIX = 'account-'
+    for section_name in self.parser.sections():
+      if section_name.startswith(COUCH_PREFIX):
+        couch_name = section_name[len(COUCH_PREFIX):]
+        self.couches[couch_name] = self.dictifySection(section_name,
+                                                       self.COUCH_DEFAULTS)
+
+      if section_name.startswith(ACCOUNT_PREFIX):
+        account_name = section_name[len(ACCOUNT_PREFIX):]
+        acct = self.accounts[account_name] = \
+                    self.dictifySection(section_name, None, account_name)
+        if 'id' in acct:
+          acct['_id'] = acct['id']
+          del acct['id']
+        else:
+          acct['_id'] = acct['username']
+
+    self.local_couch = self.couches['local']
+    self.remote_couch = self.couches.get('remote') # may be None
+
+CONFIG = None
+def get_config():
+  global CONFIG
+  if CONFIG is None:
+    CONFIG = Config()
+  return CONFIG
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core extensions
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core message processor extensions.
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/email.py
@@ -0,0 +1,16 @@
+# This is an extension which converts a message/raw/email to a 'message'
+import logging
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class EmailConverter(base.ConverterBase):
+    def convert(self, doc):
+        # for now, the email repr has all we need.
+        ret = doc.copy()
+        for n in ret.keys():
+            if n.startswith('_') or n.startswith('raindrop'):
+                del ret[n]
+        del ret['type']
+        return ret
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/message.py
@@ -0,0 +1,30 @@
+# This is an extension which hacks some annotations to a 'message', creating
+# an annotated message.
+import re
+import logging
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+# *sob* this re still fails to get unicode.
+# | >>> re.compile(r"[^\w]+", re.UNICODE).split(u"\xa9opyright me")
+# | [u'', u'opyright', u'me']
+re_tags = re.compile(r"[^\w]+", re.UNICODE)
+
+
+class MessageAnnotator(base.ConverterBase):
+    def convert(self, doc):
+        # for now, if a 'proto' couldn't detect tags by itself, all words in
+        # the body become tags.
+        try:
+            tags = doc['tags']
+        except KeyError:
+            bits = re_tags.split(doc['body'])
+            tags = list(set(b.lower() for b in bits if len(b)>3))
+        conversation_id = doc.get('conversation_id')
+        if conversation_id is None:
+            conversation_id = doc.get('subject')
+        return {'tags': tags,
+                'conversation_id' : conversation_id,
+                }
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/ext/message/rfc822.py
@@ -0,0 +1,101 @@
+# This is an extension which converts a message/raw/rfc822 to a
+# message/raw/rfc822
+from __future__ import absolute_import # stop 'email' import finding our ext
+
+import logging
+from email import message_from_string
+from email.utils import mktime_tz, parsedate_tz
+from twisted.internet import defer
+
+from ...proc import base
+
+
+logger = logging.getLogger(__name__)
+
+
+def _safe_convert_bytes(val, charset):
+    # Convert a byte string to *some* unicode object, ignoring (but logging)
+    # unicode errors we may see in the wild...
+    # TODO: This sucks; we need to mimick what firefox does in such cases...
+    charset = charset or 'ascii'
+    try:
+        ret = val.decode(charset)
+    except UnicodeError, exc:
+        logger.error("Failed to decode mail from %r: %s", charset, exc)
+        # no charset failed to decode as declared - try utf8
+        try:
+            ret = val.decode('utf-8')
+        except UnicodeError, exc:
+            logger.error("Failed to fallback decode mail from utf8: %s", exc)
+            ret = val.decode('utf-8', 'ignore')
+    return ret
+
+
+# an 'rfc822' message stores the unpacked version of the rfc822 stream, a-la
+# the interface provided by the 'email' package.  IOW, we never bother storing
+# the raw stream, just a 'raw' unpacked version of it.
+# This helper function takes a raw rfc822 string and returns a 'document'
+# suitable for storing as an rfc822 message.
+def doc_from_bytes(b):
+    msg = message_from_string(b)
+    doc = {}
+    mp = doc['multipart'] = msg.is_multipart()
+    headers = doc['headers'] = {}
+    # Given we have no opportunity to introduce an object which can ignore
+    # the case of headers, we lowercase the keys
+    # We can see all kinds of stuff in the wild.  It's not uncommon to see
+    # ascii body but a utf8 name encoded in the header.  Its also possible
+    # to see a header encoded the same as the declared content.
+    # Who wins?  We try the content encoding first then fallback to utf.
+    charset = msg.get_charset() or msg.get_content_charset()
+    for hn, hv in msg.items():
+        headers[hn.lower()] = _safe_convert_bytes(hv, charset)
+
+    # XXX - technically msg objects are recursive; handling that requires
+    # more thought.  For now, assume they are flat.
+    # Unlikely, but if we *aren't* text based also return as attachments.
+    if mp or msg.get_content_maintype() != 'text':
+        # a multi-part message - flatten it here by walking the list, but
+        # only looking at the 'leaf' nodes.
+        attachments = doc['_attachments'] = {}
+        i = 1
+        for attach in msg.walk():
+            if not attach.is_multipart():
+                name = attach.get_filename()
+                if not name:
+                    name = "subpart %d" % i
+                attachments[name] = {'content_type': attach.get_content_type(),
+                                     'data': attach.get_payload(decode=True),
+                                     }
+                i += 1
+    else:
+        body_bytes = msg.get_payload(decode=True)
+        body = _safe_convert_bytes(body_bytes, msg.get_content_charset())
+        doc['body'] = body
+    return doc
+
+
+class RFC822Converter(base.ConverterBase):
+    def convert(self, doc):
+        # a 'rfc822' stores 'headers' as a dict
+        headers = doc['headers']
+        # for now, 'from' etc are all tuples of [identity_type, identity_id]
+        # XXX - todo - find one of the multi-part bits to use as the body.
+        try:
+            body = doc['body']
+        except KeyError:
+            assert doc['multipart']
+            body = 'This is a multipart message - todo - find the body!'
+        ret = {'from': ['email', headers['from']],
+               'subject': headers['subject'],
+               'body': body,
+               'body_preview': body[:128], # good enuf for now...
+        }
+        try:
+            dval = headers['Date']
+        except KeyError:
+            pass
+        else:
+            if dval:
+                ret['timestamp'] = mktime_tz(parsedate_tz(dval))
+        return ret
rename from server/python/junius/model.py
rename to server/python/raindrop/model.py
--- a/server/python/junius/model.py
+++ b/server/python/raindrop/model.py
@@ -1,323 +1,463 @@
-import os, os.path
-from couchdb import schema, design
+import sys
+import logging
+import time
+from urllib import urlencode, quote
+import base64
+
+import twisted.web.error
+from twisted.internet import defer
+from twisted.python.failure import Failure
 
-class WildField(schema.Field):
-    '''
-    Allows us to have dictionaries without schemas.
-    '''
-    def _to_python(self, value):
-        return value
-    
-    def _to_json(self, value):
-        return value
+try:
+    import simplejson as json
+except ImportError:
+    import json # Python 2.6
+
+import paisley
+from .config import get_config
+
+
+config = get_config()
+
+class _NotSpecified:
+    pass
 
-class Account(schema.Document):
-    kind = schema.TextField()
-    host = schema.TextField(default='')
-    port = schema.IntegerField(default=0)
-    username = schema.TextField()
-    password = schema.TextField()
-    ssl = schema.BooleanField(default=False)
-    
-    folderStatuses = WildField(default={})
-    
-    # could we just do _all_docs?  I don't want the damn design docs though...
-    # (ironically, this is the first one :)
-    all = schema.View('all', '''\
-        function(doc) {
-            emit(null, doc);
-        }''')
+logger = logging.getLogger('model')
+
+DBs = {}
+
+# XXXX - this relies on couch giving us some kind of 'sequence id'.  For
+# now we use a timestamp, but that obviously sucks in many scenarios.
+if sys.platform == 'win32':
+    # woeful resolution on windows and equal timestamps are bad.
+    clock_start = time.time()
+    time.clock() # time.clock starts counting from zero the first time its called.
+    def get_seq():
+        return clock_start + time.clock()
+else:
+    get_seq = time.time
+
+
+def encode_proto_id(proto_id):
+    # a 'protocol' gives us a 'blob' used to identify the document; we create
+    # a real docid from that protocol_id; we base64-encode what was given to
+    # us to avoid the possibility of a '!' char, and also to better accomodate
+    # truly binary strings (eg, a pickle or something bizarre)
+    return base64.encodestring(proto_id).replace('\n', '')
+
 
-class Contact(schema.Document):
-    name = schema.TextField()
-    identities = schema.ListField(schema.DictField(schema.Schema.build(
-        kind = schema.TextField(),
-        value = schema.TextField()
-    )))
-    location = schema.TextField()
-    _attachments = WildField(default={})
-    all = schema.View('contacts', '''\
-        function(doc) {
-            emit(doc._id, null);
-        }''')
-    #: expose contacts by their identities
-    by_identity = schema.View('contacts', '''\
-        function(doc) {
-            for each (var identity in doc.identities) {
-                emit([identity.kind, identity.value], doc);
-            }
-        }''')
-    #: expose all suffixes of the contact name and identity values
-    by_suffix = schema.View('contact_ids', '''\
-        function(doc) {
-            var i;
-            for (i = 0; i < doc.name.length; i++)
-                emit(doc.name.substring(i), null);
-            for each (var identity in doc.identities) {
-                for (i = 0; i < identity.value.length; i++)
-                    emit(identity.value.substring(i), null);
-            }
-        }''', include_docs=True)
-    #: expose contacts with pictures
-    with_pictures = schema.View('contacts', '''\
-        function(doc) {
-            if (doc._attachments['default'])
-                emit(doc._id, null);
-        }''')
+# from the couchdb package; not sure what makes these names special...
+def _encode_options(options):
+    retval = {}
+    for name, value in options.items():
+        if name in ('key', 'startkey', 'endkey', 'include_docs') \
+                or not isinstance(value, basestring):
+            value = json.dumps(value, allow_nan=False, ensure_ascii=False)
+        retval[name] = value
+    return retval
+
+
+class CouchDB(paisley.CouchDB):
+    def postob(self, uri, ob):
+        # This seems to not use keep-alives etc where using twisted.web
+        # directly does?
+        body = json.dumps(ob, allow_nan=False,
+                          ensure_ascii=False).encode('utf-8')
+        return self.post(uri, body)
+
+    #def openView(self, *args, **kwargs):
+        # paisley doesn't handle encoding options...
+        #return super(CouchDB, self).openView(*args, **_encode_options(kwargs)
+        #                )
+        # Ack - couch 0.9 view syntax...
+    def openView(self, dbName, docId, viewId, **kwargs):
+        #uri = "/%s/_view/%s/%s" % (dbName, docId, viewId)
+        uri = "/%s/_design/%s/_view/%s" % (dbName, docId, viewId)
+
+        if kwargs:
+            uri += "?%s" % (urlencode(_encode_options(kwargs)),)
+
+        return self.get(uri
+            ).addCallback(self.parseResult)
+        
+
+    def openDoc(self, dbName, docId, revision=None, full=False, attachment=""):
+        # paisley appears to use an old api for attachments?
+        if attachment:
+            uri = "/%s/%s/%s" % (dbName, docId, attachment)
+            return  self.get(uri)
+        return super(CouchDB, self).openDoc(dbName, docId, revision, full)
+
+    # This is a potential addition to the paisley API;  It is hard to avoid
+    # a hacky workaround due to the use of 'partial' in paisley...
+    def saveAttachment(self, dbName, docId, name, data,
+                       content_type="application/octet-stream",
+                       revision=None):
+        """
+        Save/create an attachment to a document in a given database.
+
+        @param dbName: identifier of the database.
+        @type dbName: C{str}
+
+        @param docId: the identifier of the document.
+        @type docId: C{str}
 
-class Message(schema.Document):
-    account_id = schema.TextField()
-    storage_path = schema.TextField()
-    storage_id = schema.IntegerField()
-    
-    conversation_id = schema.TextField()
-    header_message_id = schema.TextField()
-    references = WildField()
-    
-    # canonical contacts
-    from_contact_id = schema.TextField()
-    to_contact_ids = schema.ListField(schema.TextField())
-    cc_contact_ids = schema.ListField(schema.TextField())
-    # convenience contacts with enough semantics to not just map it (for now)
-    involves_contact_ids = schema.ListField(schema.TextField())
+        #param name: name of the attachment
+        @type name: C{str}
+
+        @param body: content of the attachment.
+        @type body: C{sequence}
+
+        @param content_type: content type of the attachment
+        @type body: C{str}
 
-    # actual contact objects, a little duplication; but for good, not evil
-    from_contact = WildField()
-    to_contacts = WildField(default={})
-    cc_contacts = WildField(default={})
-    involves_contacts = WildField(default={})
-
-    date = schema.DateTimeField()
-    timestamp = schema.IntegerField()
-
-    # general attribute info...
-    read = schema.BooleanField()
-    
-    # user-added meta-information
-    tags = WildField()
-
-    headers = WildField()
-    bodyPart = WildField()
-    _attachments = WildField(default={})
+        @param revision: if specified, the revision of the attachment this
+                         is updating
+        @type revision: C{str}
+        """
+        # Responses: ???
+        # 409 Conflict, 500 Internal Server Error
+        url = "/%s/%s/%s" % (dbName, docId, name)
+        if revision:
+            url = url + '?rev=' + revision
+        # *sob* - and I can't use put as it doesn't allow custom headers :(
+        # and neither does _getPage!!
+        # ** start of self._getPage clone setup...** (plus an import or 2...)
+        from twisted.web.client import HTTPClientFactory
+        kwargs = {'method': 'PUT',
+                  'postdata': data}
+        kwargs["headers"] = {"Accept": "application/json",
+                             "Content-Type": content_type,
+                             }
+        factory = HTTPClientFactory(url, **kwargs)
+        from twisted.internet import reactor
+        reactor.connectTCP(self.host, self.port, factory)
+        d = factory.deferred
+        # ** end of self._getPage clone **
+        d.addCallback(self.parseResult)
+        return d
 
-    # -- conversation views
-    # no ghosts!
-    conversation_info = schema.View('conversations', '''\
-        function(doc) {
-            if (doc.timestamp)
-                emit(doc.conversation_id,
-                     {oldest: doc.timestamp, newest: doc.timestamp, count: 1,
-                      involves: doc.involves_contact_ids});
-        }''', '''\
-        function(keys, values, rereduce) {
-            out = values[0];
-            out_involves = {};
-            function involve_fuse(l) {
-                for (var il = 0; il < l.length; il++)
-                    out_involves[l[il]] = true;
-            }
-            involve_fuse(out.involves);
-            for (var i = 1; i < values.length; i++) {
-                var cur = values[i];
-                if (cur.oldest < out.oldest)
-                    out.oldest = cur.oldest;
-                if (cur.newest > out.newest)
-                    out.newest = cur.newest;
-                out.count += cur.count;
-                involve_fuse(cur.involves);
-            }
-            out.involves = [];
-            for (var contact_id in out_involves)
-              out.involves.push(contact_id);
-            return out;
-        }''', group=True, group_level=1)
-    # no ghosts!
-    by_conversation = schema.View('by_conversation', '''\
-        function(doc) {
-            if (doc.timestamp)
-                emit(doc.conversation_id, null);
-        }''', include_docs=True)
+    def updateDocuments(self, dbName, user_docs):
+        # update/insert/delete multiple docs in a single request using
+        # _bulk_docs
+        # from couchdb-python.
+        docs = []
+        for doc in user_docs:
+            if isinstance(doc, dict):
+                docs.append(doc)
+            elif hasattr(doc, 'items'):
+                docs.append(dict(doc.items()))
+            else:
+                raise TypeError('expected dict, got %s' % type(doc))
+        url = "/%s/_bulk_docs" % dbName
+        body = json.dumps({'docs': docs})
+        return self.post(url, body
+                    ).addCallback(self.parseResult
+                    )
+
+    def listDocsBySeq(self, dbName, reverse=False, startKey=0, limit=-1, **kw):
+        """
+        List all documents in a given database by the document's sequence number
+        """
+        # Response:
+        # {"total_rows":1597,"offset":0,"rows":[
+        # {"id":"test","key":1,"value":{"rev":"4104487645"}},
+        # {"id":"skippyhammond","key":2,"value":{"rev":"121469801"}},
+        # ...
+        uri = "/%s/_all_docs_by_seq" % (dbName,)
+        args = {}
+        if reverse:
+            args["reverse"] = "true"
+        if startKey > 0:
+            args["startkey"] = int(startKey)
+        if limit >= 0:
+            args["limit"] = int(limit)
+        # suck the rest of the kwargs in
+        args.update(_encode_options(kw))
+        if args:
+            uri += "?%s" % (urlencode(args),)
+        return self.get(uri
+            ).addCallback(self.parseResult)
+
+    # Hack so our new bound methods work.
+    def bindToDB(self, dbName):
+        super(CouchDB, self).bindToDB(dbName)
+        partial = paisley.partial # it works hard to get this!
+        for methname in ["saveAttachment", "updateDocuments",
+                         "listDocsBySeq"]:
+            method = getattr(self, methname)
+            newMethod = partial(method, dbName)
+            setattr(self, methname, newMethod)
+
+
+# XXX - get_db should die as a global/singleton - only our DocumentModel
+# instance should care about that.  Sadly, bootstrap.py is a (small; later)
+# problem here...
+def get_db(couchname="local", dbname=_NotSpecified):
+    dbinfo = config.couches[couchname]
+    if dbname is _NotSpecified:
+        dbname = dbinfo['name']
+    key = couchname, dbname
+    try:
+        return DBs[key]
+    except KeyError:
+        pass
+    logger.info("Connecting to couchdb at %s", dbinfo)
+    db = CouchDB(dbinfo['host'], dbinfo['port'], dbname)
+    DBs[key] = db
+    return db
 
-    # -- message (id) views
-    # ghosts are okay!
-    by_header_id = schema.View('by_header_id', '''\
-        function(doc) {
-            emit(doc.header_message_id, null);
-        }''', include_docs=True)    
+def quote_id(doc_id):
+    # A '/' should be impossible now we base64 encode the string given
+    # by an extension - but it doesn't hurt.
+    # Note the '!' character seems to work fine with couch (ie, we use it
+    # unquoted when constructing views), so we allow that for no better
+    # reason than the logs etc are clearer...
+    return quote(doc_id, safe="!")
+
+class DocumentModel(object):
+    """The layer between 'documents' and the 'database'.  Responsible for
+       creating the unique ID for each document (other than the raw document),
+       for fetching documents based on an ID, etc
+    """
+    def __init__(self, db):
+        self.db = db
+
+    @classmethod
+    def build_docid(cls, category, doc_type, provider_id):
+        """Builds a docid from (category, doc_type, provider_id)
+
+        The exact order of the fields may depend on who can best take
+        advantage of a specific order, so is subject to change in the
+        prototype.  This method gives the code a consistent orderindg
+        regardless of the actual impl."""
+        return "!".join([category, provider_id, doc_type])
+
+    @classmethod
+    def split_docid(cls, docid):
+        """Splits a docid into (category, doc_type, provider_id)
+
+        Is likely to raise ValueError on an 'invalid' docid"""
+        cat, prov_id, doc_type = docid.split("!")
+        return cat, doc_type, prov_id
+
+    def open_view(self, *args, **kwargs):
+        # A convenience method for completeness so consumers don't hit the
+        # DB directly (and to give a consistent 'style').  Is it worth it?
+        return self.db.openView(*args, **kwargs)
 
-    # no ghosts!
-    by_timestamp = schema.View('by_timestamp', '''\
-        function(doc) {
-            if (doc.timestamp)
-                emit(doc.timestamp, null);
-        }''', include_docs=True)    
+    def open_attachment(self, doc_id, attachment, **kw):
+        """Open an attachment for the given docID.  As this is generally done
+        when processing the document itself, so the raw ID of the document
+        itself is known.  For this reason, a docid rather than the parts is
+        used.
+
+        Unlike open_document, this never returns None, but raises an
+        exception if the attachment doesn't exist.
+        """
+        logger.debug("attempting to open attachment %s/%s", doc_id, attachment)
+        return self.db.openDoc(quote_id(doc_id), attachment=attachment, **kw)
+
+    def open_document(self, category, proto_id, ext_type, **kw):
+        """Open the specific document, returning None if it doesn't exist"""
+        docid = self.build_docid(category, ext_type, encode_proto_id(proto_id))
+        return self.open_document_by_id(docid, **kw)
+
+    def open_document_by_id(self, doc_id, **kw):
+        """Open a document by the already constructed docid"""
+        logger.debug("attempting to open document %r", doc_id)
+        return self.db.openDoc(quote_id(doc_id), **kw
+                    ).addBoth(self._cb_doc_opened)
+
+    def _cb_doc_opened(self, result):
+        if isinstance(result, Failure):
+            result.trap(twisted.web.error.Error)
+            if result.value.status != '404': # not found
+                result.raiseException()
+            result = None # indicate no doc exists.
+            logger.debug("no document of that ID exists")
+        else:
+            logger.debug("opened document %(_id)r at revision %(_rev)s",
+                         result)
+        return result
+
+    def _prepare_raw_doc(self, account, docid, doc, doc_type):
+        assert docid
+        assert doc_type
+        assert '_id' not in doc, doc # that isn't how you specify the ID.
+        doc['_id'] = docid
+        assert 'raindrop_account' not in doc, doc # we look after that!
+        doc['raindrop_account'] = account.details['_id']
+
+        assert 'type' not in doc, doc # we look after that!
+        doc['type'] = doc_type
+
+        assert 'raindrop_seq' not in doc, doc # we look after that!
+        doc['raindrop_seq'] = get_seq()
 
-    # the key includes the timestamp so we can use it to limit our queries plus
-    #  pick up where we left off if we need to page/chunk.
-    # we expose the conversation id as the value because set intersection
-    #  on a conversation-basis demands it, and it would theoretically be too
-    #  expensive to just return the whole document via include_docs.
-    # (no ghosts!)
-    by_involves = schema.View('by_involves', '''\
-        function(doc) {
-            for each (var contact_id in doc.involves_contact_ids)
-                emit([contact_id, doc.timestamp], doc.conversation_id);
-        }''')
-    
-    # -- user provided meta-info junk
-    tagmap_func = '''\
-        function(doc) {
-            if (doc.tags) {
-                for (var i = 0; i < doc.tags.length; i++)
-                    emit([doc.tags[i], doc.timestamp], doc.conversation_id);
-            }
-        }'''
-    by_tags = schema.View('by_tags', tagmap_func)
-    
-    # by reusing tagmap_func, we are able to consume its output from the
-    #  previous view without introducing additional storage needs
-    all_tags = schema.View('tags', tagmap_func, '''\
-        function(keys, values, rereduce) {
-            var keySet = {}, i, j;
-            if (!rereduce) {
-                for (i = 0; i < keys.length; i++)
-                    keySet[keys[i][0][0]] = true;
-            }
-            else {
-                for (i = 0; i < values.length; i++) {
-                    var inSet = values[i];
-                    for (j = 0; j < inSet.length; j++)
-                        keySet[inSet[j]] = true;
-                }
-            }
-            var out = [];
-            for (var key in keySet)
-                out.push(key);
-            out.sort();
-            return out;
-        }''', group=False, group_level=0)
-    
-    # -- storage info views
-    # so, this key is theoretically just wildly expensive
-    # no ghosts!
-    by_storage = schema.View('by_storage', '''\
-        function(doc) {
-            if (doc.timestamp)
-                emit([doc.account_id, doc.storage_path, doc.storage_id], null);
-        }''', include_docs=False)
-        
-    by_mailing_list = schema.View('by_list_id', '''\
-        function(doc) {
-          if (doc.headers && doc.headers["List-Id"]) {
-            var parts = doc.headers["List-Id"].match(/([\\W\\w]*)\\s*<(.+)>.*/);
-            var values = {"List-Id" : doc.headers["List-Id"],
-                          "id" : parts[2],
-                          "name" : parts[1] };
-            for each (var headerId in ["List-Post","List-Archive","List-Help",
-                                       "List-Subscribe","List-Unsubscribe"]) {
-              if (doc.headers[headerId])
-                values[headerId] = doc.headers[headerId];
-            }
-            emit(parts[2], values);
-          }
-        }''', '''\
-        function(keys, values, rereduce) {
-          var output = {};
-          output.count = values.length;
-          for (var idx in values) {
-            for (var elm in values[idx]) {
-              output[elm] = values[idx][elm];
-            }
-          }
-          return output;
-        }''', include_docs=False, group=True, group_level=1)
+    def create_raw_documents(self, account, doc_infos):
+        """Entry-point to create raw documents.  The API reflects that
+        creating multiple documents in a batch is faster; if you want to
+        create a single doc, just put it in a list
+        """
+        docs = []
+        dids = [] # purely for the log :(
+        logger.debug('create_raw_documents preparing %d docs', len(doc_infos))
+        for (proto_id, doc, doc_type) in doc_infos:
+            docid = self.build_docid('msg', doc_type, encode_proto_id(proto_id))
+            self._prepare_raw_doc(account, docid, doc, doc_type)
+            # XXX - this hardcoding is obviously going to be a problem when
+            # we need to add 'contacts' etc :(
+            docs.append(doc)
+            dids.append(docid)
+        attachments = self._prepare_attachments(docs)
+        logger.debug('create_raw_documents saving docs %s', dids)
+        return self.db.updateDocuments(docs
+                    ).addCallback(self._cb_saved_docs, attachments
+                    )
+
+    def _prepare_attachments(self, docs):
+        # called internally when creating a batch of documents. Returns a list
+        # of attachments which should be saved separately.
+
+        # The intent is that later we can optimize this - if the attachment
+        # is small, we can keep it in the document base64 encoded and save
+        # a http connection.  For now though we just do all attachments
+        # separately.
+
+        # attachment processing still need more thought - ultimately we need
+        # to be able to 'push' them via a generator or similar to avoid
+        # reading them entirely in memory. Further, we need some way of the
+        # document knowing if the attachment failed (or vice-versa) given we
+        # have no transactional semantics.
+        all_attachments = []
+        for doc in docs:
+            assert '_id' in doc # should have called prepare_ext_document!
+            try:
+                all_attachments.append(doc['_attachments'])
+                # nuke attachments specified
+                del doc['_attachments']
+            except KeyError:
+                # It is important we put 'None' here so the list of
+                # attachments is exactly parallel with the list of docs.
+                all_attachments.append(None)
+        assert len(all_attachments)==len(docs)
+        return all_attachments
+
+    def prepare_ext_document(self, prov_id, doc_type, doc):
+        """Called by extensions to setup the raindrop maintained attributes
+           of the documents, including the document ID
+        """
+        assert '_id' not in doc, doc # We manage IDs for all but 'raw' docs.
+        assert 'type' not in doc, doc # we manage this too!
+        assert 'raindrop_seq' not in doc, doc # we look after that!
+        doc['raindrop_seq'] = get_seq()
+        doc['type'] = doc_type
+        doc['_id'] = self.build_docid("msg", doc['type'], prov_id)
 
-    by_list_id = schema.View('by_mailing_list', '''\
-        function(doc) {
-          if (doc.headers && doc.headers["List-Id"]) {
-            var parts = doc.headers["List-Id"].match(/[\\W\\w\\s]*<(.+)>.*/);
-            emit([parts[1], doc.timestamp], doc.conversation_id);
-          }
-        }''', include_docs=True)    
-        
-DATABASES = {
-    # the app database proper, no real data
-    'junius': None,
-    #
-    'accounts': Account,
-    'contacts': Contact,
-    'messages': Message,
-}
+    def create_ext_documents(self, docs):
+        for doc in docs:
+            assert '_id' in doc # should have called prepare_ext_document!
+        attachments = self._prepare_attachments(docs)
+        # save the document.
+        logger.debug('saving %d extension documents', len(docs))
+        return self.db.updateDocuments(docs,
+                    ).addCallback(self._cb_saved_docs, attachments
+                    )
 
-AVOID_REPLICATING = {
-    'accounts': 'Private info perhaps',
-}
+    def _cb_saved_docs(self, result, attachments):
+        # result: [{'rev': 'xxx', 'id': '...'}, ...]
+        logger.debug("saved %d documents", len(result))
+        ds = []
+        for dinfo, dattach in zip(result, attachments):
+            if dattach:
+                ds.append(self._cb_save_attachments(dinfo, dattach))
+        def return_orig(ignored_result):
+            return result
+        # XXX - the result set will *not* have the correct _rev if there are
+        # attachments :(
+        return defer.DeferredList(ds
+                    ).addCallback(return_orig)
 
-class DBS(object):
-    def __init__(self, server):
-        self.server = server
-
-DEFAULT_COUCH_SERVER = 'http://localhost:5984/'
+    def _cb_save_attachments(self, saved_doc, attachments):
+        if not attachments:
+            return saved_doc
+        # Each time we save an attachment the doc gets a new revision number.
+        # So we need to do them in a chain, passing the result from each to
+        # the next.
+        remaining = attachments.copy()
+        # This is recursive, but that should be OK.
+        return self._cb_save_next_attachment(saved_doc, remaining)
 
-def get_remote_host_info():
-    remoteinfo_path = os.path.join(os.environ['HOME'], '.junius.remoteinfo')
-    
-    if os.path.exists(remoteinfo_path):
-        f = open(remoteinfo_path, 'r')
-        data = f.read()
-        f.close()
-        info = data.strip()
-        if info[-1] != '/':
-            info += '/'
-        return info
-    else:
-        raise Exception("You need a ~/.junius.remoteinfo file")
+    def _cb_save_next_attachment(self, result, remaining):
+        if not remaining:
+            return result
+        revision = result['rev']
+        docid = result['id']
+        name, info = remaining.popitem()
+        logger.debug('saving attachment %r to doc %r', name, docid)
+        return self.db.saveAttachment(quote_id(docid),
+                                   quote_id(name), info['data'],
+                                   content_type=info['content_type'],
+                                   revision=revision,
+                ).addCallback(self._cb_saved_attachment, (docid, name)
+                ).addErrback(self._cb_save_failed, (docid, name)
+                ).addCallback(self._cb_save_next_attachment, remaining
+                )
 
-def get_local_host_info():
-    localinfo_path = os.path.join(os.environ['HOME'], '.junius.localinfo')
-    if os.path.exists(localinfo_path):
-        f = open(localinfo_path, 'r')
-        data = f.read()
-        f.close()
-        info = data.strip()
-        if info[-1] != '/':
-            info += '/'
-        return info
-    else:
-        return DEFAULT_COUCH_SERVER
-    
+    def _cb_saved_attachment(self, result, ids):
+        logger.debug("Saved attachment %s", result)
+        # XXX - now what?
+        return result
+
+    def _cb_save_failed(self, failure, ids):
+        logger.error("Failed to save attachment (%r): %s", ids, failure)
+        failure.raiseException()
+
+
+_doc_model = None
+
+def get_doc_model():
+    global _doc_model
+    if _doc_model is None:
+        _doc_model = DocumentModel(get_db())
+    return _doc_model
 
 def nuke_db():
-    import couchdb
-    server = couchdb.Server(get_local_host_info())
+    couch_name = 'local'
+    db = get_db(couch_name, None)
+    dbinfo = config.couches[couch_name]
 
-    for dbname in DATABASES.keys():
-      if dbname in server:
-        print "!!! Deleting database", dbname
-        del server[dbname]
+    def _nuke_failed(failure, *args, **kwargs):
+        if failure.value.status != '404':
+            failure.raiseException()
+        logger.info("DB doesn't exist!")
+
+    def _nuked_ok(d):
+        logger.info("NUKED DATABASE!")
+
+    deferred = db.deleteDB(dbinfo['name'])
+    deferred.addCallbacks(_nuked_ok, _nuke_failed)
+    return deferred
+
 
 
-def fab_db(update_views=False):
-    import couchdb
-    server = couchdb.Server(get_local_host_info())
-    
-    dbs = DBS(server)
-    
-    for db_name, doc_class in DATABASES.items():
-        if not db_name in server:
-            print 'Creating database', db_name
-            db = server.create(db_name)
-            update_views = True
-        else:
-            db = server[db_name]
-        
-        if update_views and doc_class:
-            print 'Updating views'
-            views = [getattr(doc_class, k) for k, v in doc_class.__dict__.items() if isinstance(v, schema.View)]
-            print 'Views:', views
-            if views:
-                design.ViewDefinition.sync_many(db, views)
+def fab_db(whateva):
+    couch_name = 'local'
+    db = get_db(couch_name, None)
+    dbinfo = config.couches[couch_name]
 
-        setattr(dbs, db_name, db)
+    def _create_failed(failure, *args, **kw):
+        failure.trap(twisted.web.error.Error)
+        if failure.value.status != '412': # precondition failed.
+            failure.raiseException()
+        logger.info("couch database %(name)r already exists", dbinfo)
+        return False
 
-    return dbs    
+    def _created_ok(d):
+        logger.info("created new database")
+        return True
+
+    return db.createDB(dbinfo['name']
+                ).addCallbacks(_created_ok, _create_failed
+                )
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/pipeline.py
@@ -0,0 +1,379 @@
+""" This is the raindrop pipeline; it moves messages from their most raw
+form to their most useful form.
+"""
+from twisted.internet import defer, task
+from twisted.python.failure import Failure
+import logging
+
+logger = logging.getLogger(__name__)
+
+# Simple forward chaining.
+chain = [
+    # from_type, to_type, transformer)
+    ('proto/test',         'raw/message/rfc822',
+                           'raindrop.proto.test.TestConverter'),
+    # skype goes directly to 'message' for now...
+    ('proto/skype-msg',    'message',
+                           'raindrop.proto.skype.SkypeConverter'),
+    # skype-chat is 'terminal' for now.
+    ('proto/skype-chat', None, None),
+    ('proto/imap',         'raw/message/rfc822',
+                           'raindrop.proto.imap.IMAPConverter'),
+    ('proto/twitter',      'message',
+                           'raindrop.proto.twitter.TwitterConverter'),
+    ('raw/message/rfc822', 'raw/message/email',
+                           'raindrop.ext.message.rfc822.RFC822Converter'),
+    ('raw/message/email',  'message',
+                           'raindrop.ext.message.email.EmailConverter'),
+    ('message',            'anno/tags',
+                           'raindrop.ext.message.message.MessageAnnotator'),
+    # anno/tags is 'terminal'
+    ('anno/tags', None, None),
+]
+
+
+class Pipeline(object):
+    def __init__(self, doc_model, options):
+        self.doc_model = doc_model
+        self.options = options
+        # use a cooperator to do the work via a generator.
+        # XXX - should we own the cooperator or use our parents?
+        self.coop = task.Cooperator()
+        self.forward_chain = {}
+        for from_type, to_type, xformname in chain:
+            if xformname:
+                root, tail = xformname.rsplit('.', 1)
+                try:
+                    mod = __import__(root, fromlist=[tail])
+                    klass = getattr(mod, tail)
+                    inst = klass(doc_model)
+                except:
+                    logger.exception("Failed to load extension %r", xformname)
+                else:
+                    self.forward_chain[from_type] = (to_type, inst)
+            else:
+                assert not to_type, 'no xformname must mean no to_type'
+                self.forward_chain[from_type] = None
+
+    def unprocess(self):
+        # A bit of a hack that will suffice until we get better dependency
+        # management.  Determines which doc-types are 'derived', then deletes
+        # them all.
+        def delete_a_doc(doc, rid):
+            if doc is None:
+                logger.debug("can't delete document %r - it doesn't exist", rid)
+                return None
+            else:
+                logger.info("Deleting document %(_id)r (rev %(_rev)s)", doc)
+                return self.doc_model.db.deleteDoc(doc['_id'], doc['_rev'])
+
+        def delete_docs(result, doc_type):
+            docs = []
+            to_del = [(row['id'], row['value']) for row in result['rows']]
+            for id, rev in to_del:
+                docs.append({'_id': id, '_rev': rev, '_deleted': True})
+            logger.info('deleting %d messages of type %r', len(docs), doc_type)
+            return self.doc_model.db.updateDocuments(docs)
+
+        def gen_deleting_docs(doc_types):
+            for dt in doc_types:
+                yield self.doc_model.open_view('raindrop!messages!by',
+                                               'by_doc_type',
+                                               key=dt,
+                            ).addCallback(delete_docs, dt)
+            # and our work-queue docs - they aren't seen by the view, so
+            # just delete them by ID.
+            docs = []
+            for rid in ('workqueue!msg',):
+                yield self.doc_model.open_document_by_id(rid
+                        ).addCallback(delete_a_doc, rid)
+
+        derived = set()
+        for _, to_info in self.forward_chain.iteritems():
+            if to_info is not None:
+                to_type, inst = to_info
+                derived.add(to_type)
+        # error records are always 'derived'
+        derived.add('core/error/msg')
+        logger.info("deleting documents with types %r", derived)
+        return self.coop.coiterate(gen_deleting_docs(derived))
+
+    def start(self):
+        return self.coop.coiterate(self.gen_wq_tasks())
+
+    def start_retry_errors(self):
+        """Attempt to re-process all messages for which our previous
+        attempt generated an error.
+        """
+        # Later we may need a 'state doc' just for errors??
+        # For now use this dict to save the state during processing but we
+        # don't persist it yet.
+        # Also note that although
+        def gen_work():
+            state_doc = {'raindrop_seq': 0}
+            while True:
+                self.num_errors_found = 0
+                # error docs are quite small - fetch 50 at a time...
+                logger.debug("starting error processing at %(raindrop_seq)r",
+                             state_doc)
+                yield self.doc_model.open_view(
+                                'raindrop!proto!errors', 'errors',
+                                startkey=state_doc['raindrop_seq'],
+                                include_docs=True,
+                        ).addCallback(self._cb_errorq_opened, state_doc)
+                if not self.num_errors_found:
+                    break
+        return self.coop.coiterate(gen_work())
+
+    def _cb_errorq_opened(self, result, state_doc):
+        def gen_work():
+            for row in result['rows']:
+                self.num_errors_found += 1
+                err_doc = row['doc']
+                logger.debug("processing error document %(_id)r", err_doc)
+                # Open original source doc
+                source_infos = err_doc['raindrop_sources']
+                assert len(source_infos)==1, "only simple fwd chaining!"
+                source_id = source_infos[0][0]
+                yield self.doc_model.open_document_by_id(source_id
+                        ).addCallback(self._cb_got_error_source, err_doc)
+                state_doc['raindrop_seq'] = row['key']
+
+        # XXX - see below - should we reuse self.coop, or is that unsafe?
+        coop = task.Cooperator()
+        return coop.coiterate(gen_work())
+
+    def _cb_got_error_source(self, result, err_doc):
+        # build the infos dict used by the sub-generator.
+        try:
+            _, doc_type, proto_id = self.doc_model.split_docid(err_doc['_id'])
+        except ValueError:
+            logger.warning("skipping malformed ID %(_id)r", err_doc)
+            return
+
+        infos = {proto_id: [result]}
+        # Although we only have 1 item to process, the queue is setup to
+        # handle many, so we need to use a generator
+        def gen_my_doc():
+            new_docs = []
+            for whateva in self.gen_work_tasks(infos, new_docs):
+                yield whateva
+            # should only be 1 new doc, and even on error there should be a
+            # new error doc.
+            assert len(new_docs)==1, new_docs
+            # either way, we are writing the new record replacing the one
+            # we have.
+            new_docs[0]['_rev'] = err_doc['_rev']
+            yield self.doc_model.create_ext_documents(new_docs)
+
+        # XXX - see below - should we reuse self.coop, or is that unsafe?
+        coop = task.Cooperator()
+        return coop.coiterate(gen_my_doc())
+
+    def gen_wq_tasks(self):
+        """generate deferreds which will determine where our processing is up
+        to and walk us through the _all_docs_by_seq view from that point,
+        creating and saving new docs as it goes. When the generator finishes
+        the queue is empty. """
+        # first open our 'state' document
+        def _cb_update_state_doc(result, d):
+            if result is not None:
+                assert d['_id'] == result['_id'], result
+                d.update(result)
+            # else no doc exists - the '_id' remains in the default though.
+
+        state_doc = {'_id': 'workqueue!msg',
+                     'doc_type': u"core/workqueue",
+                     'seq': 0}
+        yield self.doc_model.open_document_by_id(state_doc['_id'],
+                    ).addCallback(_cb_update_state_doc, state_doc)
+
+        logger.info("Work queue starting with sequence ID %d",
+                    state_doc['seq'])
+
+        logger.debug('opening by_seq view for work queue...')
+        num_per_batch = 15 # configurable????
+        # We process num_per_batch records at a time, and fetch all those
+        # documents in the same request; this seems a reasonable compromize
+        # between efficiency and handling large docs (no attachments come
+        # back...) Another alternative worth benchmarking; use a much larger
+        # limit without returning the documents, in the hope we can deduce
+        # more from the view, avoiding the fetch of many docs at all...
+        self.queue_finished = False
+        self.state_doc_dirty = False
+        while not self.queue_finished:
+            yield self.doc_model.db.listDocsBySeq(limit=num_per_batch,
+                                                  startkey=state_doc['seq'],
+                                                  include_docs=True,
+                        ).addCallback(self._cb_by_seq_opened, state_doc)
+        if self.state_doc_dirty:
+            logger.debug("flushing state doc at end of run...")
+            yield self.doc_model.create_ext_documents([state_doc]
+                    ).addCallback(self._cb_created_docs, state_doc
+                    )
+        else:
+            logger.debug("no need to flush state doc")
+
+    def gen_work_tasks(self, doc_infos, new_docs):
+        # A generator which takes each doc in the list to its "next" type, but noting that
+        # as we have a number of docs ahead of us, one of them may satisfy
+        # us..
+        # ultimately we may need to generate lots of new docs from this one -
+        # but for now we have a single, simple chain moving forwards...
+
+        # Results aren't written - that is the caller's job - new docs are
+        # appended to the passed list.
+
+        # doc_infos is a dict keyed by proto_id.  Each value is a list, in
+        # sequence order, of the document itself.
+        for proto_id, infos in doc_infos.iteritems():
+            for sq, doc in enumerate(infos):
+                doc_type = doc['type']
+                try:
+                    xform_info = self.forward_chain[doc_type]
+                except KeyError:
+                    logger.warning("Can't find transformer for message type %r - skipping %r",
+                                   doc_type, proto_id)
+                    continue
+                if xform_info is None:
+                    logger.debug("Document %r is at its terminal type of %r",
+                                 proto_id, doc_type)
+                    continue
+
+                next_type, xformer = xform_info
+                # See if the next_type is in the rows ahead of us in by_seq
+                for check_doc in infos[sq+1:]:
+                    if next_type == check_doc['type']:
+                        logger.debug("cool - _by_seq lookahead tells us the doc is already %r",
+                                     next_type)
+                        continue
+                # OK - need to create this new type.
+                logger.debug("calling %r to create a %s from %s", xformer,
+                             next_type, doc['type'])
+                yield defer.maybeDeferred(xformer.convert, doc
+                            ).addBoth(self._cb_converted_or_not,
+                                      next_type, proto_id, doc, new_docs)
+
+    def _cb_by_seq_opened(self, result, state_doc):
+        rows = result['rows']
+        logger.debug('work queue has %d items to check.', len(rows))
+        if not rows:
+            # no rows left.  There is no guarantee our state doc will be
+            # the last one...
+            logger.info("work queue ran out of rows...")
+            # either way, we are done!
+            self.queue_finished = True
+            return
+        if len(rows)==1 and rows[0]['id'] == state_doc['_id']:
+            logger.info("Work queue got to the end (at sequence %(seq)s)",
+                        state_doc)
+            self.queue_finished = True
+            return
+
+        # Build a map of what we can deduce from the entire result set (eg,
+        # a single msg may have a number of doc-types in the rows)
+        known = {}
+        for row in rows:
+            rid = row['id']
+            last_seq = row['key']
+            if not rid.startswith("msg!"):
+                continue
+            value = row['value']
+            if value.get('deleted'):
+                logger.debug("skipping deleted message %r", rid)
+                continue
+            try:
+                _, doc_type, proto_id = self.doc_model.split_docid(rid)
+            except ValueError:
+                logger.warning("skipping malformed message ID %r", rid)
+                continue
+            doc = row['doc']
+            real_type = doc.get('type')
+            if real_type != doc_type: # probably a core/error/msg
+                logger.info("message isn't of expected type (expected %r "
+                            "but got %r) - skipping", doc_type, real_type)
+                continue
+            known.setdefault(proto_id, []).append(doc)
+
+        state_doc['seq'] = last_seq # only takes effect once saved...
+        logger.debug("Our %d rows gave info about %d messages",
+                     len(rows), len(known))
+
+        def gen_my_work_tasks():
+            new_docs = []
+            for whateva in self.gen_work_tasks(known, new_docs):
+                yield whateva
+
+            # It isn't unusual to see zero docs to process, and writing our
+            # new state doc each time is going to hurt perf a little.  The
+            # worst case is we die before flushing our state doc, but that's
+            # OK - we should be able to re-skip these messages quickly next
+            # time...
+            if new_docs:
+                # Now write the docs we created, plus our queue 'state' doc.
+                logger.info("work queue finished at sequence %d - %d new documents"
+                            , state_doc['seq'], len(new_docs))
+                new_docs.append(state_doc)
+                self.state_doc_dirty = False # it is about to be written.
+                yield self.doc_model.create_ext_documents(new_docs
+                        ).addCallback(self._cb_created_docs, state_doc
+                        )
+            else:
+                # don't bother writing the state doc now, but record it is
+                # dirty so if we get to the end we *do* write it.
+                self.state_doc_dirty = True
+
+        # I *think* that if we share the same cooperator as the task itself,
+        # we risk having the next chunk of sequence IDs processed before we
+        # are done here.
+        # OTOH, I'm not sure about that.....
+        # As we are really only using a coiterator as a handy way of managing
+        # twisted loops, using our own should be ok though.
+        coop = task.Cooperator()
+        return coop.coiterate(gen_my_work_tasks())
+
+    def _cb_created_docs(self, new_revs, state_doc):
+        # XXX - note that the _ids in this result can't be trusted if there
+        # were attachments :(  fortunately we don't care here...
+        # XXXXXXX - *sob* - we do care once we start hitting conflicts in
+        # messages ahead of us...
+        # XXX - this might not be necessary...
+        last_rev = new_revs[-1]
+        assert last_rev['id'] == state_doc['_id'], last_rev
+        state_doc['_rev'] = last_rev['rev']
+
+    def _cb_converted_or_not(self, result, dest_type, rootdocid, existing_doc,
+                             new_docs):
+        # This is both a callBack and an errBack.  If a converter fails to
+        # create a document, we can't just fail, or no later messages in the
+        # DB will ever get processed!
+        # So we write a "dummy" record - it has the same docid that the real
+        # document would have - but the 'type' attribute in the document
+        # reflects it is actually an error marker.
+        if isinstance(result, Failure):
+            logger.warn("Failed to convert a document: %s", result)
+            if self.options.stop_on_error:
+                logger.info("--stop-on-error specified - re-throwing error")
+                result.raiseException()
+            # and make a dummy doc.
+            new_doc = {'error_details': unicode(result)}
+            self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
+            # and patch the 'type' attribute to reflect its really an error.
+            new_doc['type'] = 'core/error/msg'
+            # In theory, the source ID of each doc that contributed is
+            # redundant as it could be deduced once we get backward chaining.
+            # However, we probably will always need to track the _rev of those
+            # docs so we can detect 'stale' errors (XXX - probably not - the
+            # 'chain' doesn't go any further on error)
+            # Also in theory, we don't need this in the non-error case, as
+            # our _by_seq processing will ensure the right thing happens.
+            # A list of sources as one day we will support that!
+            new_doc['raindrop_sources'] = [[existing_doc['_id'],
+                                            existing_doc['_rev']]
+                                          ]
+        else:
+            new_doc = result
+            self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
+            logger.debug("converter returned new document type %r for %r: %r",
+                         dest_type, rootdocid, new_doc['_id'])
+        new_docs.append(new_doc)
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proc/base.py
@@ -0,0 +1,104 @@
+import logging
+
+__all__ = ['Rat', 'AccountBase']
+
+logger = logging.getLogger("accounts")
+
+class Rat(object):
+  '''
+  Account reasons rationale... this is here to make typing easier...
+  '''
+  #: all whats for this account
+  EVERYTHING = 'everything'
+  #: the problem is with the server (or the network)
+  SERVER = 'server'
+  #: the problem is with the account
+  ACCOUNT = 'account'
+
+  UNREACHABLE = 'unreachable'
+  PASSWORD = 'password'
+  MAINTENANCE = 'maintenance'
+  BUSY = 'busy'
+  #: something is up with the crypto; this needs to be exploded
+  CRYPTO = 'crypto'
+
+  #: good indicates that all-is-well
+  GOOD = 'good'
+  '''
+  Neutral indicates an expected transient lack of success (maintenance,
+   overloaded servers, etc.)  It is tracked (rather than silently not updating
+   good) because it potentially allows for higher-level logic to escalate
+   continued inability to connect to something user-visible.
+
+  For example, twitter being down for short periods of time (at least in the
+   past) was business as usual; there would be no reason to notify the user.
+   Howerver, if twitter is down for an extended period of time, we want to let
+   the user know (in an ambient sort of way) that there's a problem with
+   twitter, and that's why they're not getting any messages.
+
+  The primary difference between a TEMPORARY BAD thing and a TEMPORARY NEUTRAL
+   thing is that we will let the user know about a TEMPORARY BAD thing
+   when it happens.
+  '''
+  NEUTRAL = 'neutral'
+  '''
+  Bad indicates an unexpected problem which may be TEMPORARY or PERMANENT.
+   Temporary problems are expressed to the user in an ambient fashion when
+   they happen, but may not require any action.  If a temporary problem stays
+   a problem for an extended period of time, it will be escalated to a
+   more explicit notification.  A permanent problem requires user action and
+   the user will be immediately notified.
+
+  For example, bad passwords and suspended accounts are permanent problems.  The
+   former is actionable within the UI, whereas the latter is not.  However, it
+   is important that the user be notified at the earliest opportunity so they
+   can take real-world action promptly.  A server being inaccessible is a
+   TEMPORARY BAD problem rather than a TEMPORARY NEUTRAL thing because a user
+   may benefit from knowing their connection or server is flakey.  (Note:
+   temporarily lacking an internet connection is different from a flakey one;
+   we don't want to bother the user if we know they don't have a connection.)
+  '''
+  BAD = 'bad'
+
+  #: temporary implies it may fix itself without user intervention
+  TEMPORARY = 'temporary'
+  #: permanent implies the user must take some action to correct the problem
+  PERMANENT = 'permanent'
+  #: unknown means it either doesn't matter or it could be temporary but the
+  #:  user should potentially still be informed
+  UNKNOWN = 'unknown'
+
+
+class AccountBase(Rat):
+  def __init__(self, doc_model, details):
+    self.doc_model = doc_model
+    self.details = details
+
+  def reportStatus(self, what, state, why=Rat.UNKNOWN,
+                   expectedDuration=Rat.UNKNOWN):
+    '''
+    Report status relating to this account.
+
+    Everything is peachy: EVERYTHING GOOD
+    Wrong password: ACCOUNT BAD PASSWORD PERMANENT
+    (may be temporary if a bad password can mean many things)
+    Can't contact server: SERVER BAD UNREACHABLE TEMPORARY
+    Server maintenance: SERVER NEUTRAL MAINTENANCE TEMPORARY
+    (indicates a temporary lapse in service but there's not much we can do)
+    Server busy: SERVER NEUTRAL BUSY TEMPORARY
+    (for example, last.fm will sometimes refuse submission requests)
+    '''
+    logger.debug("ReportStatus: %s %s (why=%s, duration=%s)",
+                 what, state, why, expectedDuration)
+
+  def sync(self):
+    pass
+
+  def verify(self):
+    '''
+    '''
+    pass
+
+class ConverterBase(object):
+    def __init__(self, doc_model):
+       self.doc_model = doc_model
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proc/contact.py
@@ -0,0 +1,6 @@
+class ContactProcessor(object):
+  def resolveOrCreateContacts(self, kind, values):
+    
+
+  def processContactIdentity(self, kind, value):
+    pass
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proc/message.py
@@ -0,0 +1,6 @@
+
+class MessageProcessor(object):
+  '''
+  Common message processing superclass; e-mail messages, twitter messages,
+  '''
+  
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/__init__.py
@@ -0,0 +1,28 @@
+# this needs to become a 'plugin' mechanism...
+
+_protocol_infos = [
+    ('imap', 'raindrop.proto.imap', 'IMAPAccount'),
+    ('skype', 'raindrop.proto.skype', 'SkypeAccount'),
+    ('twitter', 'raindrop.proto.twitter', 'TwitterAccount'),
+]
+if __debug__:
+    _protocol_infos.append(('test', 'raindrop.proto.test', 'TestAccount'))
+
+protocols = {}
+def init_protocols():
+    import sys, logging
+    logger = logging.getLogger('raindrop.proto')
+    for name, mod, factname in _protocol_infos:
+        try:
+            logger.debug("attempting import of '%s' for '%s'", mod, factname)
+            __import__(mod)
+            mod = sys.modules[mod]
+            fact = getattr(mod, factname)
+        except ImportError, why:
+            logger.error("Failed to import '%s' factory: %s", name, why)
+        except:
+            logger.exception("Error creating '%s' factory", name)
+        else:
+            protocols[name] = fact
+
+__all__ = [protocols, init_protocols]
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/imap.py
@@ -0,0 +1,224 @@
+from twisted.internet import protocol, ssl, defer, error
+from twisted.mail import imap4
+import logging
+
+from ..proc import base
+from ..ext.message.rfc822 import doc_from_bytes
+
+brat = base.Rat
+
+logger = logging.getLogger(__name__)
+
+
+class ImapClient(imap4.IMAP4Client):
+  '''
+  Much of our logic here should perhaps be in another class that holds a
+  reference to the IMAP4Client instance subclass.  Consider refactoring if we
+  don't turn out to benefit from the subclassing relationship.
+  '''
+  def serverGreeting(self, caps):
+    logger.debug("IMAP server greeting: capabilities are %s", caps)
+    return self._doAuthenticate(
+            ).addCallback(self._reqList
+            ).addCallback(self.deferred.callback)
+
+  def _doAuthenticate(self):
+    if self.account.details.get('crypto') == 'TLS':
+      d = self.startTLS(self.factory.ctx)
+      d.addErrback(self.accountStatus,
+                   brat.SERVER, brat.BAD, brat.CRYPTO, brat.PERMANENT)
+      d.addCallback(self._doLogin)
+    else:
+      d = self._doLogin()
+    d.addErrback(self.accountStatus,
+                 brat.SERVER, brat.BAD, brat.PASSWORD, brat.PERMANENT)
+    return d
+
+
+  def _doLogin(self, *args, **kwargs):
+    return self.login(self.account.details['username'],
+                      self.account.details['password'])
+
+  def _reqList(self, *args, **kwargs):
+    self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
+    return self.list('', '*').addCallback(self._procList)
+
+  def _procList(self, result, *args, **kwargs):
+    return self.conductor.coop.coiterate(self.gen_folder_list(result))
+
+  def gen_folder_list(self, result):
+    for flags, delim, name in result:
+      logger.debug('Processing folder %s (flags=%s)', name, flags)
+      if r"\Noselect" in flags:
+        logger.debug("'%s' is unselectable - skipping", name)
+        continue
+
+      # XXX - sob - markh sees:
+      # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
+      # although we obviously have seen them already in the other folders.
+      if name and name.startswith('['):
+        logger.info("'%s' appears special -skipping", name)
+        continue
+
+      yield self.examine(name
+                 ).addCallback(self._examineFolder, name
+                 ).addErrback(self._cantExamineFolder, name)
+    logger.debug('imap processing finished.')
+
+  def _examineFolder(self, result, folder_path):
+    logger.debug('Looking for messages already fetched for folder %s', folder_path)
+    return self.doc_model.open_view('raindrop!proto!seen', 'imap',
+                             startkey=[folder_path], endkey=[folder_path, {}]
+              ).addCallback(self._fetchAndProcess, folder_path)
+
+  def _fetchAndProcess(self, result, folder_path):
+    # XXX - we should look at the flags and update the message if it's not
+    # the same - later.
+    rows = result['rows']
+    logger.debug('_FetchAndProcess says we have seen %d items for %r',
+                 len(rows), folder_path)
+    seen_uids = set(row['key'][1] for row in rows)
+    # now build a list of all message currently in the folder.
+    allMessages = imap4.MessageSet(1, None)
+    return self.fetchUID(allMessages, True).addCallback(
+            self._gotUIDs, folder_path, seen_uids)
+
+  def _gotUIDs(self, uidResults, name, seen_uids):
+    all_uids = set(int(result['UID']) for result in uidResults.values())
+    remaining = all_uids - seen_uids
+    logger.info("Folder %s has %d messages, %d new", name,
+                len(all_uids), len(remaining))
+    def gen_remaining(folder_name, reming):
+      for uid in reming:
+        # XXX - we need something to make this truly unique.
+        did = "%s#%d" % (folder_name, uid)
+        logger.debug("new imap message %r - fetching content", did)
+        # grr - we have to get the rfc822 body and the flags in separate requests.
+        to_fetch = imap4.MessageSet(uid)
+        yield self.fetchMessage(to_fetch, uid=True
+                    ).addCallback(self._cb_got_body, uid, did, folder_name, to_fetch
+                    )
+    return self.conductor.coop.coiterate(gen_remaining(name, remaining))
+
+  def _cb_got_body(self, result, uid, did, folder_name, to_fetch):
+    _, result = result.popitem()
+    content = result['RFC822']
+    # grr - get the flags
+    logger.debug("message %r has %d bytes; fetching flags", did, len(content))
+    return self.fetchFlags(to_fetch, uid=True
+                ).addCallback(self._cb_got_flags, uid, did, folder_name, content
+                ).addErrback(self._cantGetMessage
+                )
+
+  def _cb_got_flags(self, result, uid, did, folder_name, content):
+    logger.debug("flags are %s", result)
+    assert len(result)==1, result
+    _, result = result.popitem()
+    flags = result['FLAGS']
+    # put the 'raw' document object together and save it.
+    doc = dict(
+      storage_key=[folder_name, uid],
+      imap_flags=flags,
+      )
+    attachments = {'rfc822' : {'content_type': 'message',
+                               'data': content,
+                               }
+                  }
+    doc['_attachments'] = attachments
+    return self.doc_model.create_raw_documents(self.account,
+                                              [(did, doc, 'proto/imap')],
+                ).addCallback(self._cb_saved_message)
+
+  def _cantGetMessage(self, failure):
+    logger.error("Failed to fetch message: %s", failure)
+
+  def _cb_saved_message(self, result):
+    logger.debug("Saved message %s", result)
+
+  def _cantSaveDocument(self, failure):
+    logger.error("Failed to save message: %s", failure)
+
+  def _cantExamineFolder(self, failure, name, *args, **kw):
+    logger.warning("Failed to examine folder '%s': %s", name, failure)
+
+  def accountStatus(self, result, *args):
+    return self.account.reportStatus(*args)
+
+
+class ImapClientFactory(protocol.ClientFactory):
+  protocol = ImapClient
+
+  def __init__(self, account, conductor):
+    # base-class has no __init__
+    self.account = account
+    self.conductor = conductor
+    self.doc_model = account.doc_model # this is a little confused...
+
+    self.ctx = ssl.ClientContextFactory()
+    self.backoff = 8 # magic number
+
+  def buildProtocol(self, addr):
+    p = self.protocol(self.ctx)
+    p.factory = self
+    p.account = self.account
+    p.conductor = self.conductor
+    p.doc_model = self.account.doc_model
+    p.deferred = self.deferred # this isn't going to work in reconnect scenarios
+    return p
+
+  def connect(self):
+    details = self.account.details
+    logger.debug('attempting to connect to %s:%d (ssl: %s)',
+                 details['host'], details['port'], details['ssl'])
+    reactor = self.conductor.reactor
+    self.deferred = defer.Deferred()
+    if details.get('ssl'):
+      reactor.connectSSL(details['host'], details['port'], self, self.ctx)
+    else:
+      reactor.connectTCP(details['host'], details['port'], self)
+    return self.deferred
+
+  def clientConnectionLost(self, connector, reason):
+    # the flaw in this is that we start from scratch every time; which is why
+    #  most of the logic in the client class should really be pulled out into
+    #  the account logic, probably.  this class itself may have issues too...
+    # XXX - also note that a simple exception in other callbacks can trigger
+    # this - meaning we retry just to hit the same exception.
+    if reason.check(error.ConnectionDone):
+        # only an error if premature
+        if not self.deferred.called:
+            self.deferred.errback(reason)
+    else:
+        #self.deferred.errback(reason)
+        logger.debug('lost connection to server, going to reconnect in a bit')
+        self.conductor.reactor.callLater(2, self.connect)
+
+  def clientConnectionFailed(self, connector, reason):
+    self.account.reportStatus(brat.SERVER, brat.BAD, brat.UNREACHABLE,
+                              brat.TEMPORARY)
+    logger.warning('Failed to connect, will retry after %d secs',
+                   self.backoff)
+    # It occurs that some "account manager" should be reported of the error,
+    # and *it* asks us to retry later?  eg, how do I ask 'ignore backoff -
+    # try again *now*"?
+    self.conductor.reactor.callLater(self.backoff, self.connect)
+    self.backoff = min(self.backoff * 2, 600) # magic number
+
+
+# A 'converter' - takes a proto/imap as input and creates a
+# 'raw/message/rfc822' as output
+class IMAPConverter(base.ConverterBase):
+  def convert(self, doc):
+    # I need the binary attachment.
+    return self.doc_model.open_attachment(doc['_id'], "rfc822",
+              ).addCallback(self._cb_got_attachment, doc)
+
+  def _cb_got_attachment(self, content, doc):
+    # the 'rfc822' module knows what to do...
+    return doc_from_bytes(content)
+
+
+class IMAPAccount(base.AccountBase):
+  def startSync(self, conductor):
+    self.factory = ImapClientFactory(self, conductor)
+    return self.factory.connect()
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/skype.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+'''
+Fetch skype contacts and chats.
+'''
+import logging
+
+import twisted.python.log
+from twisted.internet import defer, threads
+
+from ..proc import base
+
+import Skype4Py
+
+logger = logging.getLogger(__name__)
+
+# These are the raw properties we fetch from skype.
+CHAT_PROPS = [
+    ('ACTIVEMEMBERS',   list),
+    ('ACTIVITY_TIMESTAMP', float),
+    ('ADDER', unicode),
+    ('APPLICANTS', list),
+    #'BLOB'??
+    ('BOOKMARKED', bool),
+    ('DESCRIPTION', unicode),
+    #'DIALOG_PARTNER',
+    ('FRIENDLYNAME', unicode),
+    ('GUIDELINES', unicode),
+    #'MEMBEROBJECTS',
+    ('MEMBERS', list),
+    ('MYROLE', unicode),
+    ('MYSTATUS', unicode),
+    ('OPTIONS', int),
+    ('PASSWORDHINT', unicode),
+    ('POSTERS', list),
+    ('STATUS', unicode),
+    ('TIMESTAMP', float),
+    ('TOPICXML', unicode),
+    ('TOPIC', unicode),
+    ('TYPE', unicode),
+]
+
+MSG_PROPS = [
+    ('BODY', unicode),
+    ('CHATNAME', unicode),
+    ('EDITED_BY', unicode),
+    ('EDITED_TIMESTAMP', float),
+    ('FROM_DISPNAME', unicode),
+    ('FROM_HANDLE', unicode),
+    ('IS_EDITABLE', bool),
+    ('LEAVEREASON', unicode),
+    ('STATUS', unicode),
+    ('TIMESTAMP', float),
+    ('TYPE', unicode),
+    ('USERS', list),
+]
+
+
+def simple_convert(str_val, typ):
+    if typ is list:
+        return str_val.split()
+    if typ is bool:
+        return str_val == "TRUE"
+    # all the rest are callables which 'do the right thing'
+    return typ(str_val)
+
+
+class TwistySkype(object):
+    def __init__(self, account, conductor):
+        self.account = account
+        self.doc_model = account.doc_model # this is a little confused...
+        self.conductor = conductor
+        self.skype = Skype4Py.Skype()
+
+    def get_docid_for_chat(self, chat):
+        return chat.Name.encode('utf8') # hrmph!
+
+    def get_docid_for_msg(self, msg):
+        return "%s-%d" % (self.account.details['username'], msg._Id)
+
+    def attach(self):
+        logger.info("attaching to skype...")
+        d = threads.deferToThread(self.skype.Attach)
+        d.addCallback(self.attached)
+        return d
+
+    def attached(self, status):
+        logger.info("attached to skype - getting chats")
+        return threads.deferToThread(self.skype._GetChats
+                    ).addCallback(self._cb_got_chats
+                    )
+
+    def _cb_got_chats(self, chats):
+        logger.debug("skype has %d chat(s) total", len(chats))
+        # fetch all the messages (future optimization; all we need are the
+        # IDs, but we suffer from creating an instance wrapper for each item.
+        # Sadly the skype lib doesn't offer a clean way of doing this.)
+        def gen_chats(chats):
+            for chat in chats:
+                yield threads.deferToThread(chat._GetMessages
+                        ).addCallback(self._cb_got_messages, chat,
+                        )
+            logger.info("skype has finished processing all chats")
+
+        return self.conductor.coop.coiterate(gen_chats(chats))
+
+    def _cb_got_messages(self, messages, chat):
+        logger.debug("chat '%s' has %d message(s) total; looking for new ones",
+                     chat.Name, len(messages))
+
+        # Finally got all the messages for this chat.  Execute a view to
+        # determine which we have seen (note that we obviously could just
+        # fetch the *entire* chats+msgs view once - but we do it this way on
+        # purpose to ensure we remain scalable...)
+        return self.doc_model.open_view('raindrop!proto!seen', 'skype',
+                                        startkey=[chat.Name],
+                                        endkey=[chat.Name, {}]
+                    ).addCallback(self._cb_got_seen, chat, messages
+                    )
+
+    def _cb_got_seen(self, result, chat, messages):
+        msgs_by_id = dict((m._Id, m) for m in messages)
+        chatname = chat.Name
+        # The view gives us a list of [chat_name, msg_id], where msg_id is
+        # None if we've seen the chat itself.  Create a set of messages we
+        # *haven't* seen - including the [chat_name, None] entry if applic.
+        all_keys = [(chatname, None)]
+        all_keys.extend((chatname, mid) for mid in msgs_by_id.keys())
+        seen_chats = set([tuple(row['key']) for row in result['rows']])
+        add_bulk = [] # we bulk-update these at the end!
+        remaining = set(all_keys)-set(seen_chats)
+        # we could just process the empty list as normal, but the logging of
+        # an info when we do have items is worthwhile...
+        if not remaining:
+            logger.debug("Chat %r has no new items to process", chatname)
+            return None
+        # we have something to do...
+        logger.info("Chat %r has %d items to process", chatname,
+                    len(remaining))
+        logger.debug("we've already seen %d items from this chat",
+                     len(seen_chats))
+        return self.conductor.coop.coiterate(
+                    self.gen_items(chat, remaining, msgs_by_id))
+
+    def gen_items(self, chat, todo, msgs_by_id):
+        tow = [] # documents to write.
+        for _, msgid in todo:
+            if msgid is None:
+                # we haven't seen the chat itself - do that.
+                logger.debug("Creating new skype chat %r", chat.Name)
+                # make a 'deferred list' to fetch each property one at a time.
+                ds = [threads.deferToThread(chat._Property, p)
+                      for p, _ in CHAT_PROPS]
+                yield defer.DeferredList(ds
+                            ).addCallback(self._cb_got_chat_props, chat, tow)
+            else:
+                msg = msgs_by_id[msgid]
+                # A new msg in this chat.
+                logger.debug("New skype message %d", msg._Id)
+                # make a 'deferred list' to fetch each property one at a time.
+                ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+                yield defer.DeferredList(ds
+                            ).addCallback(self._cb_got_msg_props, chat, msg, tow)
+
+        if tow:
+            yield self.doc_model.create_raw_documents(self.account, tow)
+        logger.debug("finished processing chat %r", chat.Name)
+
+    def _cb_got_chat_props(self, results, chat, pending):
+        logger.debug("got chat %r properties: %s", chat.Name, results)
+        docid = self.get_docid_for_chat(chat)
+        doc ={}
+        for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
+            if ok:
+                doc['skype_'+name.lower()] = simple_convert(val, typ)
+
+        # 'Name' is a special case that doesn't come via a prop.  We use
+        # 'chatname' as that is the equiv attr on the messages themselves.
+        doc['skype_chatname'] = chat.Name
+        pending.append((docid, doc, 'proto/skype-chat'))
+
+    def _cb_got_msg_props(self, results, chat, msg, pending):
+        logger.debug("got message properties for %s", msg._Id)
+        doc = {}
+        for (name, typ), (ok, val) in zip(MSG_PROPS, results):
+            if ok:
+                doc['skype_'+name.lower()] = simple_convert(val, typ)
+        doc['skype_id'] = msg._Id
+        # we include the skype username with the ID as they are unique per user.
+        docid = self.get_docid_for_msg(msg)
+        pending.append((docid, doc, 'proto/skype-msg'))
+
+
+# A 'converter' - takes a proto/skype-msg as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appopriate)
+class SkypeConverter(base.ConverterBase):
+    def convert(self, doc):
+        # We need to open the 'chat' for this Message.  Clearly this is
+        # going to be inefficient...
+        proto_id = doc['skype_chatname'].encode('utf8') # hrmph!
+        return self.doc_model.open_document('msg', 'proto/skype-chat',
+                                            proto_id
+                        ).addCallback(self.finish_convert, doc)
+
+    def finish_convert(self, chat_doc, doc):
+        if chat_doc is None:
+            subject = "<failed to fetch skype chat!>"
+        else:
+            subject = chat_doc['skype_friendlyname']
+        return {'from': ['skype', doc['skype_from_handle']],
+                'subject': subject,
+                'body': doc['skype_body'],
+                'body_preview': doc['skype_body'][:128],
+                'conversation_id': doc['skype_chatname'],
+                'timestamp': doc['skype_timestamp'], # skype's works ok here?
+                }
+
+
+class SkypeAccount(base.AccountBase):
+  def startSync(self, conductor):
+    return TwistySkype(self, conductor).attach()
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/test/__init__.py
@@ -0,0 +1,106 @@
+# This is an implementation of a 'test' protocol.
+import logging
+from twisted.internet import defer, error
+from email.message import Message
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class TestMessageProvider(object):
+    def __init__(self, account, conductor):
+        self.account = account
+        self.doc_model = account.doc_model # this is a little confused...
+        self.conductor = conductor
+
+    def sync_generator(self):
+        num_docs = int(self.account.details.get('num_test_docs', 5))
+        logger.info("Creating %d test documents", num_docs)
+        for i in xrange(num_docs):
+            yield self.check_test_message(i)
+        if self.bulk_docs:
+            yield self.doc_model.create_raw_documents(self.account,
+                                                      self.bulk_docs
+                    ).addCallback(self.saved_bulk_messages, len(self.bulk_docs),
+                    )
+
+    def attach(self):
+        logger.info("preparing to synch test messages...")
+        self.bulk_docs = [] # anything added here will be done in bulk
+        # use the cooperator for testing purposes.
+        return self.conductor.coop.coiterate(self.sync_generator())
+
+    def check_test_message(self, i):
+        logger.debug("seeing if message with ID %d exists", i)
+        return self.doc_model.open_document("msg", str(i), "proto/test"
+                        ).addCallback(self.process_test_message, i)
+
+    def process_test_message(self, existing_doc, doc_num):
+        if existing_doc is None:
+            # make an attachment for testing purposes.
+            attachments = {"raw-attach" : {"content_type" : 'application/octet-stream',
+                                        "data" : 'test\0blob'
+                                        }
+            }
+            doc = dict(
+              storage_key=doc_num,
+              _attachments=attachments,
+              )
+            self.bulk_docs.append((str(doc_num), doc, 'proto/test'))
+        else:
+            logger.info("Skipping test message with ID %d - already exists",
+                        doc_num)
+            # we are done.
+
+    def saved_bulk_messages(self, result, n):
+        logger.debug("Finished saving %d test messages in bulk", n)
+        # done
+
+# A 'converter' - takes a proto/test as input and creates a
+# 'raw/message/rfc822' as output.
+class TestConverter(base.ConverterBase):
+    num_converted = 0
+    def convert(self, doc):
+        # for the sake of testing the error queue, we cause an error on
+        # every 3rd message we process.
+        self.num_converted += 1
+        if self.num_converted % 3 == 0:
+            raise RuntimeError("This is a test failure")
+
+        # for the sake of testing, we fetch the raw attachment just to compare
+        # its value.
+        return self.doc_model.open_attachment(doc['_id'], "raw-attach",
+                  ).addCallback(self._cb_got_attachment, doc)
+
+    def _cb_got_attachment(self, attach_content, doc):
+        if attach_content != 'test\0blob':
+            raise RuntimeError(attach_content)
+
+        me = doc['storage_key']
+        # Use the email package to construct a synthetic rfc822 stream.
+        msg = Message()
+        headers = {'from': 'From: from%(storage_key)d@test.com',
+                   'subject' : 'This is test document %(storage_key)d',
+        }
+        for h in headers:
+            msg[h] = headers[h] % doc
+
+        body = u"Hello, this is test message %(storage_key)d (with extended \xa9haracter!)" % doc
+        msg.set_payload(body.encode('utf-8'), 'utf-8')
+        #attachments = {"rfc822" : {"content_type" : 'message',
+        #                           "data" : msg.get_payload(),
+        #                         }
+        #              }
+        new_doc = {}
+        new_doc['body'] = body
+        #new_doc['_attachments'] = attachments
+        new_doc['multipart'] = False
+        new_doc['headers'] = {}
+        for hn, hv in msg.items():
+            new_doc['headers'][hn.lower()] = hv
+        return new_doc
+
+
+class TestAccount(base.AccountBase):
+  def startSync(self, conductor):
+    return TestMessageProvider(self, conductor).attach()
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/proto/twitter.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+'''
+Fetch twitter raw* objects
+'''
+
+# prevent 'import twitter' finding this module!
+from __future__ import absolute_import
+
+import logging
+import re
+import twisted.python.log
+from twisted.internet import defer, threads
+
+from ..proc import base
+
+# See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
+# about getting twisted support into the twitter package.
+# Sadly, the twisty-twitter package has issues of its own (like calling
+# delegates outside the deferred mechanism meaning you can't rely on callbacks
+# being completed when they say they are.)
+
+# So for now we are going with the blocking twitter package used via
+# deferToThread for all blocking operations...
+import twitter
+
+logger = logging.getLogger(__name__)
+
+
+class TwitterProcessor(object):
+    def __init__(self, account, conductor):
+        self.account = account
+        self.doc_model = account.doc_model # this is a little confused...
+        self.conductor = conductor
+        self.twit = None
+        self.seen_tweets = None
+
+    def attach(self):
+        logger.info("attaching to twitter...")
+        username = self.account.details['username']
+        pw = self.account.details['password']
+        return threads.deferToThread(twitter.Api,
+                                  username=username, password=pw
+                    ).addCallback(self.attached
+                    )
+
+    def attached(self, twit):
+        logger.info("attached to twitter - fetching friends")
+        self.twit = twit
+
+        # build the list of all users we will fetch tweets from.
+        return threads.deferToThread(self.twit.GetFriends
+                  ).addCallback(self._cb_got_friends)
+
+    def _cb_got_friends(self, friends):
+        # Our 'seen' view is a reduce view, so we only get one result per
+        # friend we pass in.  It is probably safe to assume we don't have
+        # too many friends that we can't get them all in one hit...
+        return self.doc_model.open_view('raindrop!proto!seen', 'twitter',
+                                        group=True
+                  ).addCallback(self._cb_got_seen, friends)
+
+    def _cb_got_seen(self, result, friends):
+        # result -> [{'key': '12449', 'value': 1371372980}, ...]
+        # turn it into a map.
+        rows = result['rows']
+        last_seen_ids = dict((int(r['key']), int(r['value'])) for r in rows)
+        return self.conductor.coop.coiterate(
+                    self.gen_friends_info(friends, last_seen_ids))
+
+    def gen_friends_info(self, friends, last_seen_ids):
+        logger.info("apparently I've %d friends", len(friends))
+        def do_friend(friend):
+            last_this = last_seen_ids.get(friend.id)
+            logger.debug("friend %r (%s) has latest tweet id of %s",
+                         friend.screen_name, friend.id, last_this)
+            return threads.deferToThread(
+                    self.twit.GetUserTimeline, friend.id, since_id=last_this
+                            ).addCallback(self._cb_got_friend_timeline, friend
+                            ).addErrback(self.err_friend_timeline, friend
+                            )
+
+        # None means 'me' - but we don't really want to use 'None'.  This is
+        # the only way I can work out how to get ourselves!
+        me = self.twit.GetUser(self.twit._username)
+        yield do_friend(me)
+        for f in friends:
+            yield do_friend(f)
+
+        logger.debug("Finished friends")
+
+    def err_friend_timeline(self, failure, friend):
+        logger.error("Failed to fetch timeline for '%s': %s",
+                     friend.screen_name, failure)
+
+    def _cb_got_friend_timeline(self, timeline, friend):
+        return self.conductor.coop.coiterate(
+                    self.gen_friend_timeline(timeline, friend))
+
+    def gen_friend_timeline(self, timeline, friend):
+        for tweet in timeline:
+            tid = tweet.id
+            # put the 'raw' document object together and save it.
+            logger.info("New tweet '%s...' (%s)", tweet.text[:25], tid)
+            # create the couch document for the tweet itself.
+            doc = {}
+            for name, val in tweet.AsDict().iteritems():
+                val = getattr(tweet, name)
+                # simple hacks - users just become the user ID.
+                if isinstance(val, twitter.User):
+                    val = val.id
+                doc['twitter_'+name.lower()] = val
+            # not clear if the user ID is actually needed.
+            proto_id = "%s#%s" % (tweet.user.id, tid)
+            yield self.doc_model.create_raw_documents(
+                            self.account, [(proto_id, doc, 'proto/twitter')])
+
+
+# A 'converter' - takes a proto/twitter as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appropriate)
+class TwitterConverter(base.ConverterBase):
+    re_tags = re.compile(r'#(\w+)')    
+    def convert(self, doc):
+        # for now, if a 'proto' can detect tags, it writes them directly
+        # to a 'tags' attribute.
+        body = doc['twitter_text']
+        tags = self.re_tags.findall(body)
+        return {'from': ['twitter', doc['twitter_user']],
+                'body': body,
+                'body_preview': body[:128],
+                'tags': tags,
+                # We don't have 'in seconds' seeing we came from AsDict() -
+                # but are 'seconds' usable for a timestamp?
+                #'timestamp': int(doc['twitter_created_at_in_seconds'])
+                }
+
+
+class TwitterAccount(base.AccountBase):
+  def startSync(self, conductor):
+    return TwitterProcessor(self, conductor).attach()
rename from server/python/junius/replicate.py
rename to server/python/raindrop/replicate.py
--- a/server/python/junius/replicate.py
+++ b/server/python/raindrop/replicate.py
@@ -1,20 +1,14 @@
 #!/usr/bin/env python
 
-import os, os.path
-
-try:
-    import simplejson as json
-except ImportError:
-    import json # Python 2.6
-
-
-import os, os.path, httplib2
-from junius import model
+import os
+import json
+import httplib2
+from . import model
 
 class Replicator(object):
     def __init__(self):
         self.local_host = model.get_local_host_info()
         self.remote_host = model.get_remote_host_info()
         self.http = httplib2.Http()
         
     def create_remote_dbs(self):
new file mode 100644
--- /dev/null
+++ b/server/python/raindrop/sync.py
@@ -0,0 +1,104 @@
+import logging
+
+from twisted.internet import reactor, defer, task
+from twisted.python.failure import Failure
+import twisted.web.error
+import paisley
+
+from . import proto as proto
+
+logger = logging.getLogger(__name__)
+
+_conductor = None
+from .model import get_doc_model
+
+def get_conductor(options=None):
+  global _conductor
+  if _conductor is None:
+    proto.init_protocols()
+    _conductor = SyncConductor(options)
+  else:
+    assert options is None, 'can only set this at startup'
+  return _conductor
+  
+
+# XXX - rename this to plain 'Conductor' and move to a different file.
+# This 'conducts' synchronization, the work queues and the interactions with
+# the extensions and database.
+class SyncConductor(object):
+  def __init__(self, options):
+    self.log = logger
+    self.options = options
+    # apparently it is now considered 'good form' to pass reactors around, so
+    # a future of multiple reactors is possible.
+    # We capture it here, and all the things we 'conduct' use this reactor
+    # (but later it should be passed to our ctor too)
+    self.reactor = reactor
+    self.coop = task.Cooperator()
+    reactor.addSystemEventTrigger("before", "shutdown", self._kill_coop)
+    self.doc_model = get_doc_model()
+
+    self.active_accounts = []
+
+  def _kill_coop(self):
+    logger.debug('stopping the coordinator')
+    self.coop.stop()
+    logger.debug('coordinator stopped')
+
+  def _ohNoes(self, failure, *args, **kwargs):
+    self.log.error('OH NOES! failure! %s', failure)
+
+  def _getAllAccounts(self):
+    return self.doc_model.open_view('raindrop!accounts!all', 'all'
+      ).addCallback(self._gotAllAccounts
+      ).addErrback(self._ohNoes)
+
+  def _gotAllAccounts(self, result):
+    # we don't use the cooperator here as we want them all to run in parallel.
+    return defer.DeferredList([d for d in self._genAccountSynchs(result['rows'])])
+
+  def _genAccountSynchs(self, rows):
+    self.log.info("Have %d accounts to synch", len(rows))
+    to_synch = []
+    for row in rows:
+      account_details = row['value']
+      kind = account_details['kind']
+      self.log.debug("Found account using protocol %s", kind)
+      if not self.options.protocols or kind in self.options.protocols:
+        if kind in proto.protocols:
+          account = proto.protocols[kind](self.doc_model, account_details)
+          self.log.info('Starting sync of %s account: %s',
+                        kind, account_details.get('name', '(un-named)'))
+          self.active_accounts.append(account)
+          yield account.startSync(self
+                    ).addBoth(self._cb_sync_finished, account)
+        else:
+          self.log.error("Don't know what to do with account kind: %s", kind)
+      else:
+          self.log.info("Skipping account - protocol '%s' is disabled", kind)
+
+  def sync(self, whateva=None):
+    return self._getAllAccounts()
+
+  def _cb_sync_finished(self, result, account):
+    if isinstance(result, Failure):
+      self.log.error("Account %s failed with an error: %s", account, result)
+    else:
+      self.log.debug("Account %s finished successfully", account)
+    assert account in self.active_accounts, (account, self.active_accounts)
+    self.active_accounts.remove(account)
+    if not self.active_accounts:
+      self.log.info("all accounts have finished synchronizing")
+
+
+if __name__ == '__main__':
+  # normal entry-point is the app itself; this is purely for debugging...
+  logging.basicConfig()
+  logging.getLogger().setLevel(logging.DEBUG)
+
+  conductor = get_conductor()
+  conductor.reactor.callWhenRunning(conductor.sync)
+
+  logger.debug('starting reactor')
+  conductor.reactor.run()
+  logger.debug('reactor done')
new file mode 100644
--- /dev/null
+++ b/server/python/run-raindrop.py
@@ -0,0 +1,325 @@
+"""The raindrop server
+"""
+import sys
+import optparse
+import logging
+
+from twisted.internet import reactor, defer, task
+
+from raindrop import model
+from raindrop import bootstrap
+from raindrop import pipeline
+from raindrop.sync import get_conductor
+
+logger = logging.getLogger('raindrop')
+
+class HelpFormatter(optparse.IndentedHelpFormatter):
+    def format_description(self, description):
+        return description
+
+# decorators for our global functions:
+#  so they can insist they complete before the next command executes
+def asynch_command(f):
+    f.asynch = True
+    return f
+
+#  so they can consume the rest of the args
+def allargs_command(f):
+    f.allargs = True
+    return f
+
+
+# XXX - install_accounts should die too, but how to make a safe 'fingerprint'
+# so we can do it implicitly? We could create a hash which doesn't include
+# the password, but then the password changing wouldn't be enough to trigger
+# an update.  Including even the *hash* of the password might risk leaking
+# info.  So for now you must install manually.
+def install_accounts(result, parser, options):
+    """Install accounts in the database from the config file"""
+    return bootstrap.install_accounts(None)
+
+
+@asynch_command
+def sync_messages(result, parser, options):
+    """Synchronize all messages from all accounts"""
+    conductor = get_conductor()
+    return conductor.sync(None)
+
+@asynch_command
+def process(result, parser, options):
+    """Process all messages to see if any extensions need running"""
+    def done(result):
+        print "Message pipeline has finished..."
+    p = pipeline.Pipeline(model.get_doc_model(), options)
+    return p.start().addCallback(done)
+
+@asynch_command
+def retry_errors(result, parser, options):
+    """Reprocess all conversions which previously resulted in an error."""
+    def done_errors(result):
+        print "Error retry pipeline has finished - processing work queue..."
+        return result
+    p = pipeline.Pipeline(model.get_doc_model(), options)
+    return p.start_retry_errors(
+                ).addCallback(done_errors
+                ).addCallback(process, parser, options)
+
+@allargs_command
+def show_view(result, parser, options, args):
+    """Pretty-print the result of executing a view.
+
+    All arguments after this command are URLs to be executed as the view.  If
+    the view name starts with '/', the URL will be used as-is.  This is
+    suitable for builtin views - eg:
+
+        show-view /_all_docs?limit=5
+    
+    will fetch the first 5 results from the URL:
+
+        http://[dbhost]/[dbname]/_all_docs?limit=5"
+
+    whereas
+
+        show-view my_design_doc/my_view?limit=5
+
+    will fetch the first 5 results from the URL
+
+        http://[dbhost]/[dbname]/_view/my_design_doc/my_view?limit=5
+
+    (XXX - todo - couch 0.9 changes the above URL - adjust this docstring
+    accordingly...)
+    """
+    from pprint import pprint
+    def print_view(result, view_name):
+        print "** view %r **" % view_name
+        pprint(result)
+
+    def gen_views():
+        for arg in args:
+            # don't use open_view as then we'd need to parse the query portion!
+            # grrr - just to get the dbname :()
+            from raindrop.config import get_config
+            dbinfo = get_config().couches['local']
+            dbname = dbinfo['name']
+            if arg.startswith("/"):
+                uri = "/%s/%s" % (dbname, arg)
+            else:
+                try:
+                    doc, rest = arg.split("/")
+                except ValueError:
+                    parser.error("View name must be in the form design_doc_name/view")
+                uri = "/%s/_view/%s/%s" % (dbname, doc, rest)
+            db = model.get_db()
+            yield db.get(uri
+                ).addCallback(db.parseResult
+                ).addCallback(print_view, arg)
+
+    coop = task.Cooperator()
+    return coop.coiterate(gen_views())
+
+
+def unprocess(result, parser, options):
+    """Delete all documents which can be re-generated by the 'process' command
+    """
+    def done(result):
+        print "unprocess has finished..."
+    # XXX - pipeline should probably be a singleton?
+    p = pipeline.Pipeline(model.get_doc_model(), options)
+    return p.unprocess().addCallback(done)
+
+def delete_docs(result, parser, options):
+    """Delete all documents of a particular type.  Use with caution or see
+       the 'unprocess' command for an alternative.
+    """
+    # NOTE: This is for development only, until we get a way to say
+    # 'reprocess stuff you've already done' - in the meantime deleting those
+    # intermediate docs has the same result...
+    def _del_docs(to_del):
+        docs = []
+        for id, rev in to_del:
+            docs.append({'_id': id, '_rev': rev, '_deleted': True})
+        return model.get_db().updateDocuments(docs)
+
+    def _got_docs(result, dt):
+        to_del = [(row['id'], row['value']) for row in result['rows']]
+        logger.info("Deleting %d documents of type %r", len(to_del), dt)
+        return to_del
+
+    if not options.doctypes:
+        parser.error("You must specify one or more --doctype")
+    deferreds = []
+    for dt in options.doctypes:
+        d = model.get_doc_model().open_view('raindrop!messages!by',
+                                            'by_doc_type', key=dt
+                ).addCallback(_got_docs, dt
+                ).addCallback(_del_docs
+                )
+        deferreds.append(d)
+    return defer.DeferredList(deferreds)
+
+
+def _setup_logging(options):
+    init_errors = []
+    logging.basicConfig()
+    for val in options.log_level: # a list of all --log-level options...
+        try:
+            name, level = val.split("=", 1)
+        except ValueError:
+            name = None
+            level = val
+        # convert a level name to a value.
+        try:
+            level = int(getattr(logging, level.upper()))
+        except (ValueError, AttributeError):
+            # not a level name from the logging module - maybe a literal.
+            try:
+                level = int(level)
+            except ValueError:
+                init_errors.append(
+                    "Invalid log-level '%s' ignored" % (val,))
+                continue
+        l = logging.getLogger(name)
+        l.setLevel(level)
+
+    # write the errors after the logging is completely setup.
+    for e in init_errors:
+        logging.getLogger().error(e)
+
+
+def main():
+    # build the args we support.
+    all_args = {}
+    for n, v in globals().iteritems():
+        if callable(v) and getattr(v, '__doc__', None):
+            all_args[n.replace('_', '-')] = v
+
+    all_arg_names = sorted(all_args.keys())
+    description= __doc__ + "\nCommands\n  help\n  " + \
+                 "\n  ".join(all_args.keys()) + \
+                 "\nUse '%prog help command-name' for help on a specific command.\n"
+
+    parser = optparse.OptionParser("%prog [options]",
+                                   description=description,
+                                   formatter=HelpFormatter())
+
+    parser.add_option("-l", "--log-level", action="append",
+                      help="Specifies either an integer or a logging module "
+                           "constant (such as INFO) to set all logs to the "
+                           "specified level.  Optionally can be in the format "
+                           "of 'log.name=level' to only set the level of the "
+                           "specified log.",
+                      default=["INFO"])
+
+    parser.add_option("-p", "--protocol", action="append", dest='protocols',
+                      help="Specifies the protocols to enable.  If not "
+                           "specified, all protocols are enabled")
+
+    parser.add_option("", "--doctype", action="append", dest='doctypes',
+                      help="Specifies the document types to use for some "
+                           "operations.")
+
+    parser.add_option("", "--force", action="store_true",
+                      help="Forces some operations which would otherwise not "
+                           "be done.")
+
+    parser.add_option("-s", "--stop-on-error", action="store_true",
+                      help="Causes (some) operations which would normally "
+                           "handle an error and continue to stop when an "
+                           "error occurs.")
+
+    options, args = parser.parse_args()
+
+    _setup_logging(options)
+
+    # do this very early just to set the options
+    get_conductor(options)
+
+    if args and args[0]=='help':
+        if args[1:]:
+            which = args[1:]
+        else:
+            which = all_args.keys()
+        for this in which:
+            if this=='help': # ie, 'help help'
+                doc = "show help for the commands."
+            else:
+                try:
+                    doc = all_args[this].__doc__
+                except KeyError:
+                    print "No such command '%s':" % this
+                    continue
+            print "Help for command '%s':" % this
+            print doc
+            print
+    
+        sys.exit(0)
+
+    # create an initial deferred to perform tasks which must occur before we
+    # can start.  The final callback added will fire up the real servers.
+    asynch_tasks = []
+    d = defer.Deferred()
+    def mutter(whateva):
+        print "Raindrops keep falling on my head..."
+    d.addCallback(mutter)
+
+    # Check DB exists and if not, install accounts.
+    def maybe_install_accounts(db_created):
+        if db_created:
+            return bootstrap.install_accounts(None)
+    d.addCallback(model.fab_db
+        ).addCallback(maybe_install_accounts
+        )
+    # Check if the files on the filesystem need updating.
+    d.addCallback(bootstrap.install_client_files, options)
+    d.addCallback(bootstrap.install_views, options)
+
+    # Now process the args specified.
+    for i, arg in enumerate(args):
+        try:
+            func = all_args[arg]
+        except KeyError:
+            parser.error("Invalid command: " + arg)
+
+        asynch = getattr(func, 'asynch', False)
+        if asynch:
+            asynch_tasks.append(func)
+        else:
+            consumes_args = getattr(func, 'allargs', False)
+            if consumes_args:
+                d.addCallback(func, parser, options, args[i+1:])
+                break
+            else:
+                d.addCallback(func, parser, options)
+
+    # and some final deferreds to control the process itself.
+    def done(whateva):
+        print "Apparently everything is finished - terminating."
+        reactor.stop()
+
+    def start(whateva):
+        if not asynch_tasks:
+            print "Nothing left to do - terminating."
+            reactor.stop()
+            return
+        print "Startup complete - running tasks..."
+        dl = defer.DeferredList([f(None, parser, options) for f in asynch_tasks])
+        dl.addCallback(done)
+        return dl
+
+    def error(*args, **kw):
+        from twisted.python import log
+        log.err(*args, **kw)
+        print "A startup task failed - not executing servers."
+        reactor.stop()
+
+    d.addCallbacks(start, error)
+
+    reactor.callWhenRunning(d.callback, None)
+
+    logger.debug('starting reactor')
+    reactor.run()
+    logger.debug('reactor done')
+
+
+if __name__=='__main__':
+    main()
--- a/server/python/setup.py
+++ b/server/python/setup.py
@@ -1,19 +1,19 @@
 from setuptools import setup, find_packages
 
 setup(
-    name = "junius",
+    name = "raindrop",
     version = "0.1a1",
     packages = find_packages(),
 
     install_requires = [
                         ],
 
     # PyPI meta
     author = 'Andrew Sutherland',
     author_email = 'asutherland@asutherland.org',
-    description = 'junius',
+    description = 'raindrop',
     license = 'MPL/GPL/LGPL tri-license',
     keywords = 'mail',
     url = 'www.mozillamessaging.com',
     zip_safe = False,
 )
new file mode 100644
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,1 @@
+# doot doot, just creating a branch