use attachments for binary blobs twisty
authorMark Hammond <mhammond@skippinet.com.au>
Thu, 26 Mar 2009 22:29:26 +1100
branchtwisty
changeset 125 91d895ce8d86d1366c92b17cb28a52211b7c095b
parent 124 f161b6e3b54a301bda27cd684c8ebdea4c5213bb
child 126 4ccc7acb1102e98de1851a9619df3238ced6ea4e
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
use attachments for binary blobs
server/python/junius/ext/message/rfc822.py
server/python/junius/model.py
server/python/junius/proto/imap.py
--- a/server/python/junius/ext/message/rfc822.py
+++ b/server/python/junius/ext/message/rfc822.py
@@ -1,34 +1,94 @@
 # This is an extension which converts a message/raw/rfc822 to a
 # message/raw/rfc822
 from __future__ import absolute_import # stop 'email' import finding our ext
 
 import logging
-from email.parser import HeaderParser
+from email import message_from_string
 from email.utils import mktime_tz, parsedate_tz
 from twisted.internet import defer
 
+from ...proc import base
+
 
 logger = logging.getLogger(__name__)
 
-from ...proc import base
+# an 'rfc822' message stores the unpacked version of the rfc822 stream, a-la
+# the interface provided by the 'email' package.  IOW, we never bother storing
+# the raw stream, just a 'raw' unpacked version of it.
+# This helper function takes a raw rfc822 string and returns a 'document'
+# suitable for storing as an rfc822 message.
+def doc_from_bytes(b):
+    msg = message_from_string(b)
+    doc = {}
+    mp = doc['multipart'] = msg.is_multipart()
+    headers = doc['headers'] = {}
+    # Given we have no opportunity to introduce an object which can ignore
+    # the case of headers, we lowercase the keys
+    for hn, hv in msg.items():
+        headers[hn.lower()] = hv
+
+    # XXX - technically msg objects are recursive; handling that requires
+    # more thought.  For now, assume they are flat.
+    # Unlikely, but if we *aren't* text based also return as attachments.
+    if mp or msg.get_content_maintype() != 'text':
+        # a multi-part message - flatten it here by walking the list, but
+        # only looking at the 'leaf' nodes.
+        attachments = doc['_attachments'] = {}
+        i = 1
+        for attach in msg.walk():
+            if not attach.is_multipart():
+                name = attach.get_filename()
+                if not name:
+                    name = "subpart %d" % i
+                attachments[name] = {'content_type': attach.get_content_type(),
+                                     'data': attach.get_payload(decode=True),
+                                     }
+                i += 1
+    else:
+        body_bytes = msg.get_payload(decode=True)
+        ct = msg.get_content_charset()
+        body = None
+        if ct is not None:
+            try:
+                body = body_bytes.decode(ct)
+            except UnicodeError, exc:
+                logger.error("Failed to decode mail from %r: %s", ct, exc)
+        if body is None:
+            # no content-type or failed to decode as declared - try utf8
+            try:
+                body = body_bytes.decode('utf8')
+            except UnicodeError, exc:
+                logger.error("Failed to fallback decode mail from utf8: %s", exc)
+                body = body_bytes.decode('utf8', 'ignore')
+        doc['body'] = body
+    return doc
 
 class RFC822Converter(base.ConverterBase):
-    def __init__(self, *args, **kw):
-        super(RFC822Converter, self).__init__(*args, **kw)
-        self.hdr_parser = HeaderParser()
     def convert(self, doc):
-        msg = self.hdr_parser.parsestr(doc['headers'])
+        # I need the binary attachment.
+        return self.doc_model.open_document(doc['_id'], attachment="rfc822"
+                  ).addCallback(self._cb_got_attachment, doc)
+
+    def _cb_got_attachment(self, body, doc):
+        # a 'rfc822' stores 'headers' as a dict
+        headers = doc['headers']
         # for now, 'from' etc are all tuples of [identity_type, identity_id]
-        ret = {'from': ['email', msg['from']],
-               'subject': msg['subject'],
-               'body': doc['body'],
-               'body_preview': doc['body'][:128],
+        # XXX - todo - find one of the multi-part bits to use as the body.
+        try:
+            body = doc['body']
+        except KeyError:
+            assert doc['multipart']
+            body = 'This is a multipart message - todo - find the body!'
+        ret = {'from': ['email', headers['from']],
+               'subject': headers['subject'],
+               'body': body,
+               'body_preview': body[:128], # good enuf for now...
         }
         try:
-            dval = msg['Date']
+            dval = headers['Date']
         except KeyError:
             pass
         else:
             if dval:
                 ret['timestamp'] = mktime_tz(parsedate_tz(dval))
         return ret
--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -75,16 +75,93 @@ class CouchDB(paisley.CouchDB):
 
     def openDoc(self, dbName, docId, revision=None, full=False, attachment=""):
         # paisley appears to use an old api for attachments?
         if attachment:
             uri = "/%s/%s/%s" % (dbName, docId, attachment)
             return  self.get(uri)
         return super(CouchDB, self).openDoc(dbName, docId, revision, full)
 
+    # This is a potential addition to the paisley API;  It is hard to avoid
+    # a hacky workaround due to the use of 'partial' in paisley...
+    def saveAttachment(self, dbName, docId, name, data,
+                       content_type="application/octet-stream",
+                       revision=None):
+        """
+        Save/create an attachment to a document in a given database.
+
+        @param dbName: identifier of the database.
+        @type dbName: C{str}
+
+        @param docId: the identifier of the document.
+        @type docId: C{str}
+
+        #param name: name of the attachment
+        @type name: C{str}
+
+        @param body: content of the attachment.
+        @type body: C{sequence}
+
+        @param content_type: content type of the attachment
+        @type body: C{str}
+
+        @param revision: if specified, the revision of the attachment this
+                         is updating
+        @type revision: C{str}
+        """
+        # Responses: ???
+        # 409 Conflict, 500 Internal Server Error
+        url = "/%s/%s/%s" % (dbName, docId, name)
+        if revision:
+            url = url + '?rev=' + revision
+        # *sob* - and I can't use put as it doesn't allow custom headers :(
+        # and neither does _getPage!!
+        # ** start of self._getPage clone setup...** (plus an import or 2...)
+        from twisted.web.client import HTTPClientFactory
+        kwargs = {'method': 'PUT',
+                  'postdata': data}
+        kwargs["headers"] = {"Accept": "application/json",
+                             "Content-Type": content_type,
+                             }
+        factory = HTTPClientFactory(url, **kwargs)
+        from twisted.internet import reactor
+        reactor.connectTCP(self.host, self.port, factory)
+        d = factory.deferred
+        # ** end of self._getPage clone **
+        d.addCallback(self.parseResult)
+        return d
+
+    def updateDocuments(self, dbName, docs):
+        # update/insert/delete multiple docs in a single request using
+        # _bulk_docs
+        # from couchdb-python.
+        docs = []
+        for doc in docs:
+            if isinstance(doc, dict):
+                docs.append(doc)
+            elif hasattr(doc, 'items'):
+                docs.append(dict(doc.items()))
+            else:
+                raise TypeError('expected dict, got %s' % type(doc))
+        url = "/%s/_bulk_docs" % dbName
+        body = json.dumps({'docs': docs})
+        return self.post(url, body
+                    ).addCallback(self.parseResult
+                    )
+
+    # Hack so our new bound methods work.
+    def bindToDB(self, dbName):
+        super(CouchDB, self).bindToDB(dbName)
+        partial = paisley.partial # it works hard to get this!
+        for methname in ["saveAttachment", "updateDocuments"]:
+            method = getattr(self, methname)
+            newMethod = partial(method, dbName)
+            setattr(self, methname, newMethod)
+
+
 def get_db(couchname="local", dbname=_NotSpecified):
     dbinfo = config.couches[couchname]
     if dbname is _NotSpecified:
         dbname = dbinfo['name']
     key = couchname, dbname
     try:
         return DBs[key]
     except KeyError:
@@ -124,50 +201,80 @@ class DocumentModel(object):
         doc['raindrop_account'] = account.details['_id']
 
         assert 'type' not in doc, doc # we look after that!
         doc['type'] = doc_type
 
         assert 'raindrop_seq' not in doc, doc # we look after that!
         doc['raindrop_seq'] = get_seq()
 
-        # XXX - for small blobs this is good (we get the attachment in the
-        # same request as the doc).  For large blobs this is bad, as we
-        # base64 encode the data in memory before sending it.
-        if attachments:
-            self.db.addAttachments(doc, attachments)
-
         # save the document.
         logger.debug('create_raw_document saving doc %r', docid)
-        return self.db.saveDoc(doc, docId=quote_id(docid),
-                    ).addCallback(self._cb_saved_document
-                    ).addErrback(self._cb_save_failed
+        qid = quote_id(docid)
+        return self.db.saveDoc(doc, docId=qid,
+                    ).addCallback(self._cb_saved_document, 'raw-message', docid
+                    ).addErrback(self._cb_save_failed, 'raw-message', docid
+                    ).addCallback(self._cb_save_attachments, attachments, qid
                     )
 
-    def _cb_saved_document(self, result):
-        logger.debug("Saved document %s", result)
+    def _cb_saved_document(self, result, what, ids):
+        logger.debug("Saved %s %s", what, result)
         # XXX - now what?
+        return result
 
-    def _cb_save_failed(self, failure):  
-        logger.error("Failed to save document: %s", failure)
+    def _cb_save_failed(self, failure, what, ids):
+        logger.error("Failed to save %s (%r): %s", what, ids, failure)
         failure.raiseException()
 
     def create_ext_document(self, doc, ext, rootdocId):
         assert '_id' not in doc, doc # We manage IDs for all but 'raw' docs.
         assert 'raindrop_seq' not in doc, doc # we look after that!
         doc['raindrop_seq'] = get_seq()
         doc['type'] = ext
         docid = quote_id(rootdocId + "!" + ext)
+        try:
+            attachments = doc['_attachments']
+            # nuke attachments specified
+            del doc['_attachments']
+        except KeyError:
+            attachments = None
+
         # save the document.
         logger.debug('saving extension document %r', docid)
         return self.db.saveDoc(doc, docId=docid,
-                    ).addCallback(self._cb_saved_document
-                    ).addErrback(self._cb_save_failed
+                    ).addCallback(self._cb_saved_document, 'ext-message', docid
+                    ).addErrback(self._cb_save_failed, 'ext-message', docid
+                    ).addCallback(self._cb_save_attachments, attachments, docid
                     )
 
+    def _cb_save_attachments(self, saved_doc, attachments, docid):
+        if not attachments:
+            return saved_doc
+        # Each time we save an attachment the doc gets a new revision number.
+        # So we need to do them in a chain, passing the result from each to
+        # the next.
+        remaining = attachments.copy()
+        # This is recursive, but that should be OK.
+        return self._cb_save_next_attachment(saved_doc, docid, remaining)
+
+    def _cb_save_next_attachment(self, result, docid, remaining):
+        if not remaining:
+            return result
+        revision = result['rev']
+        name, info = remaining.popitem()
+        logger.debug('saving attachment %r to doc %r', name, docid)
+        d = self.db.saveAttachment(docid, # already quoted by caller...
+                                   quote_id(name), info['data'],
+                                   content_type=info['content_type'],
+                                   revision=revision,
+                ).addCallback(self._cb_saved_document, 'attachment', (docid, name)
+                ).addErrback(self._cb_save_failed, 'attachment', (docid, name)
+                ).addCallback(self._cb_save_next_attachment, docid, remaining
+                )
+
     def get_last_ext_for_document(self, doc_id):
         """Given a base docid, find the most-recent extension to have run.
         This will differ from the latest extension in the document chain if
         the document chain has been 'reset' for any reason (eg, change
         detected at the source of the message, user adding annotations, etc)
         """
         startkey = [doc_id]
         endkey = [doc_id, 99999999999999]
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,14 +1,15 @@
 from twisted.internet import protocol, ssl, defer, error
 from twisted.mail import imap4
 import logging
 
 from ..proc import base
 from ..model import get_db
+from ..ext.message.rfc822 import doc_from_bytes
 
 brat = base.Rat
 
 logger = logging.getLogger(__name__)
 
 
 class ImapClient(imap4.IMAP4Client):
   '''
@@ -46,16 +47,20 @@ class ImapClient(imap4.IMAP4Client):
 
   def _reqList(self, *args, **kwargs):
     self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
     return self.list('', '*').addCallback(self._procList)
 
   def _procList(self, result, *args, **kwargs):
     # As per http://python.codefetch.com/example/ow/tnpe_code/ch07/imapdownload.py,
     # We keep a 'stack' of items left to process in an instance variable...
+    # (*sob* - but its not clear why this doesn't suffer from
+    # death-by-recursion like the other impls can demonstrate? Even though we
+    # use a 'stack' of items left to process, each call to
+    # _process_next_folder is recursive?
     self.folder_infos = result[:]
     return self._process_next_folder()
 
   def _process_next_folder(self):
     if not self.folder_infos:
       # yay - all done!
       return
 
@@ -130,17 +135,20 @@ class ImapClient(imap4.IMAP4Client):
     assert len(result)==1, result
     _, result = result.popitem()
     flags = result['FLAGS']
     # put the 'raw' document object together and save it.
     doc = dict(
       storage_key=[self.current_folder, uid],
       imap_flags=flags,
       )
-    attachments = {'rfc822' : content}
+    attachments = {'rfc822' : {'content_type': 'message',
+                               'data': content,
+                               }
+                  }
     return self.doc_model.create_raw_document(did, doc, 'proto/imap',
                                               self.account,
                                               attachments=attachments,
                 ).addCallback(self._cb_saved_message)
     
   def _cantGetMessage(self, failure):
     logger.error("Failed to fetch message: %s", failure)
     return self._process_next_message()
@@ -221,20 +229,20 @@ class ImapClientFactory(protocol.ClientF
 # 'raw/message/rfc822' as output
 class IMAPConverter(base.ConverterBase):
   def convert(self, doc):
     # I need the binary attachment.
     return self.doc_model.open_document(doc['_id'], attachment="rfc822"
               ).addCallback(self._cb_got_attachment, doc)
 
   def _cb_got_attachment(self, content, doc):
-    headers, body = content.split('\r\n\r\n', 1)
-    # XXX - *sob* - this is still wrong - 'body' is a still blob so needs to
-    # go into an attachment.
-    return {'headers': headers, 'body': body}
+    assert content, "can't locate attachment for %r" % doc['_id']
+    # the 'rfc822' module knows what to do...
+    return doc_from_bytes(content)
+
 
 class IMAPAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
   def startSync(self, conductor, doc_model):
     self.factory = ImapClientFactory(self, conductor, doc_model)