--- a/server/python/junius/ext/message/rfc822.py
+++ b/server/python/junius/ext/message/rfc822.py
@@ -1,34 +1,94 @@
# This is an extension which converts a message/raw/rfc822 to a
# message/raw/rfc822
from __future__ import absolute_import # stop 'email' import finding our ext
import logging
-from email.parser import HeaderParser
+from email import message_from_string
from email.utils import mktime_tz, parsedate_tz
from twisted.internet import defer
+from ...proc import base
+
logger = logging.getLogger(__name__)
-from ...proc import base
+# an 'rfc822' message stores the unpacked version of the rfc822 stream, a-la
+# the interface provided by the 'email' package. IOW, we never bother storing
+# the raw stream, just a 'raw' unpacked version of it.
+# This helper function takes a raw rfc822 string and returns a 'document'
+# suitable for storing as an rfc822 message.
+def doc_from_bytes(b):
+ msg = message_from_string(b)
+ doc = {}
+ mp = doc['multipart'] = msg.is_multipart()
+ headers = doc['headers'] = {}
+ # Given we have no opportunity to introduce an object which can ignore
+ # the case of headers, we lowercase the keys
+ for hn, hv in msg.items():
+ headers[hn.lower()] = hv
+
+ # XXX - technically msg objects are recursive; handling that requires
+ # more thought. For now, assume they are flat.
+ # Unlikely, but if we *aren't* text based also return as attachments.
+ if mp or msg.get_content_maintype() != 'text':
+ # a multi-part message - flatten it here by walking the list, but
+ # only looking at the 'leaf' nodes.
+ attachments = doc['_attachments'] = {}
+ i = 1
+ for attach in msg.walk():
+ if not attach.is_multipart():
+ name = attach.get_filename()
+ if not name:
+ name = "subpart %d" % i
+ attachments[name] = {'content_type': attach.get_content_type(),
+ 'data': attach.get_payload(decode=True),
+ }
+ i += 1
+ else:
+ body_bytes = msg.get_payload(decode=True)
+ ct = msg.get_content_charset()
+ body = None
+ if ct is not None:
+ try:
+ body = body_bytes.decode(ct)
+ except UnicodeError, exc:
+ logger.error("Failed to decode mail from %r: %s", ct, exc)
+ if body is None:
+ # no content-type or failed to decode as declared - try utf8
+ try:
+ body = body_bytes.decode('utf8')
+ except UnicodeError, exc:
+ logger.error("Failed to fallback decode mail from utf8: %s", exc)
+ body = body_bytes.decode('utf8', 'ignore')
+ doc['body'] = body
+ return doc
class RFC822Converter(base.ConverterBase):
- def __init__(self, *args, **kw):
- super(RFC822Converter, self).__init__(*args, **kw)
- self.hdr_parser = HeaderParser()
def convert(self, doc):
- msg = self.hdr_parser.parsestr(doc['headers'])
+ # I need the binary attachment.
+ return self.doc_model.open_document(doc['_id'], attachment="rfc822"
+ ).addCallback(self._cb_got_attachment, doc)
+
+ def _cb_got_attachment(self, body, doc):
+ # a 'rfc822' stores 'headers' as a dict
+ headers = doc['headers']
# for now, 'from' etc are all tuples of [identity_type, identity_id]
- ret = {'from': ['email', msg['from']],
- 'subject': msg['subject'],
- 'body': doc['body'],
- 'body_preview': doc['body'][:128],
+ # XXX - todo - find one of the multi-part bits to use as the body.
+ try:
+ body = doc['body']
+ except KeyError:
+ assert doc['multipart']
+ body = 'This is a multipart message - todo - find the body!'
+ ret = {'from': ['email', headers['from']],
+ 'subject': headers['subject'],
+ 'body': body,
+ 'body_preview': body[:128], # good enuf for now...
}
try:
- dval = msg['Date']
+ dval = headers['Date']
except KeyError:
pass
else:
if dval:
ret['timestamp'] = mktime_tz(parsedate_tz(dval))
return ret
--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -75,16 +75,93 @@ class CouchDB(paisley.CouchDB):
def openDoc(self, dbName, docId, revision=None, full=False, attachment=""):
# paisley appears to use an old api for attachments?
if attachment:
uri = "/%s/%s/%s" % (dbName, docId, attachment)
return self.get(uri)
return super(CouchDB, self).openDoc(dbName, docId, revision, full)
+ # This is a potential addition to the paisley API; It is hard to avoid
+ # a hacky workaround due to the use of 'partial' in paisley...
+ def saveAttachment(self, dbName, docId, name, data,
+ content_type="application/octet-stream",
+ revision=None):
+ """
+ Save/create an attachment to a document in a given database.
+
+ @param dbName: identifier of the database.
+ @type dbName: C{str}
+
+ @param docId: the identifier of the document.
+ @type docId: C{str}
+
+ #param name: name of the attachment
+ @type name: C{str}
+
+ @param body: content of the attachment.
+ @type body: C{sequence}
+
+ @param content_type: content type of the attachment
+ @type body: C{str}
+
+ @param revision: if specified, the revision of the attachment this
+ is updating
+ @type revision: C{str}
+ """
+ # Responses: ???
+ # 409 Conflict, 500 Internal Server Error
+ url = "/%s/%s/%s" % (dbName, docId, name)
+ if revision:
+ url = url + '?rev=' + revision
+ # *sob* - and I can't use put as it doesn't allow custom headers :(
+ # and neither does _getPage!!
+ # ** start of self._getPage clone setup...** (plus an import or 2...)
+ from twisted.web.client import HTTPClientFactory
+ kwargs = {'method': 'PUT',
+ 'postdata': data}
+ kwargs["headers"] = {"Accept": "application/json",
+ "Content-Type": content_type,
+ }
+ factory = HTTPClientFactory(url, **kwargs)
+ from twisted.internet import reactor
+ reactor.connectTCP(self.host, self.port, factory)
+ d = factory.deferred
+ # ** end of self._getPage clone **
+ d.addCallback(self.parseResult)
+ return d
+
+ def updateDocuments(self, dbName, docs):
+ # update/insert/delete multiple docs in a single request using
+ # _bulk_docs
+ # from couchdb-python.
+ docs = []
+ for doc in docs:
+ if isinstance(doc, dict):
+ docs.append(doc)
+ elif hasattr(doc, 'items'):
+ docs.append(dict(doc.items()))
+ else:
+ raise TypeError('expected dict, got %s' % type(doc))
+ url = "/%s/_bulk_docs" % dbName
+ body = json.dumps({'docs': docs})
+ return self.post(url, body
+ ).addCallback(self.parseResult
+ )
+
+ # Hack so our new bound methods work.
+ def bindToDB(self, dbName):
+ super(CouchDB, self).bindToDB(dbName)
+ partial = paisley.partial # it works hard to get this!
+ for methname in ["saveAttachment", "updateDocuments"]:
+ method = getattr(self, methname)
+ newMethod = partial(method, dbName)
+ setattr(self, methname, newMethod)
+
+
def get_db(couchname="local", dbname=_NotSpecified):
dbinfo = config.couches[couchname]
if dbname is _NotSpecified:
dbname = dbinfo['name']
key = couchname, dbname
try:
return DBs[key]
except KeyError:
@@ -124,50 +201,80 @@ class DocumentModel(object):
doc['raindrop_account'] = account.details['_id']
assert 'type' not in doc, doc # we look after that!
doc['type'] = doc_type
assert 'raindrop_seq' not in doc, doc # we look after that!
doc['raindrop_seq'] = get_seq()
- # XXX - for small blobs this is good (we get the attachment in the
- # same request as the doc). For large blobs this is bad, as we
- # base64 encode the data in memory before sending it.
- if attachments:
- self.db.addAttachments(doc, attachments)
-
# save the document.
logger.debug('create_raw_document saving doc %r', docid)
- return self.db.saveDoc(doc, docId=quote_id(docid),
- ).addCallback(self._cb_saved_document
- ).addErrback(self._cb_save_failed
+ qid = quote_id(docid)
+ return self.db.saveDoc(doc, docId=qid,
+ ).addCallback(self._cb_saved_document, 'raw-message', docid
+ ).addErrback(self._cb_save_failed, 'raw-message', docid
+ ).addCallback(self._cb_save_attachments, attachments, qid
)
- def _cb_saved_document(self, result):
- logger.debug("Saved document %s", result)
+ def _cb_saved_document(self, result, what, ids):
+ logger.debug("Saved %s %s", what, result)
# XXX - now what?
+ return result
- def _cb_save_failed(self, failure):
- logger.error("Failed to save document: %s", failure)
+ def _cb_save_failed(self, failure, what, ids):
+ logger.error("Failed to save %s (%r): %s", what, ids, failure)
failure.raiseException()
def create_ext_document(self, doc, ext, rootdocId):
assert '_id' not in doc, doc # We manage IDs for all but 'raw' docs.
assert 'raindrop_seq' not in doc, doc # we look after that!
doc['raindrop_seq'] = get_seq()
doc['type'] = ext
docid = quote_id(rootdocId + "!" + ext)
+ try:
+ attachments = doc['_attachments']
+ # nuke attachments specified
+ del doc['_attachments']
+ except KeyError:
+ attachments = None
+
# save the document.
logger.debug('saving extension document %r', docid)
return self.db.saveDoc(doc, docId=docid,
- ).addCallback(self._cb_saved_document
- ).addErrback(self._cb_save_failed
+ ).addCallback(self._cb_saved_document, 'ext-message', docid
+ ).addErrback(self._cb_save_failed, 'ext-message', docid
+ ).addCallback(self._cb_save_attachments, attachments, docid
)
+ def _cb_save_attachments(self, saved_doc, attachments, docid):
+ if not attachments:
+ return saved_doc
+ # Each time we save an attachment the doc gets a new revision number.
+ # So we need to do them in a chain, passing the result from each to
+ # the next.
+ remaining = attachments.copy()
+ # This is recursive, but that should be OK.
+ return self._cb_save_next_attachment(saved_doc, docid, remaining)
+
+ def _cb_save_next_attachment(self, result, docid, remaining):
+ if not remaining:
+ return result
+ revision = result['rev']
+ name, info = remaining.popitem()
+ logger.debug('saving attachment %r to doc %r', name, docid)
+ d = self.db.saveAttachment(docid, # already quoted by caller...
+ quote_id(name), info['data'],
+ content_type=info['content_type'],
+ revision=revision,
+ ).addCallback(self._cb_saved_document, 'attachment', (docid, name)
+ ).addErrback(self._cb_save_failed, 'attachment', (docid, name)
+ ).addCallback(self._cb_save_next_attachment, docid, remaining
+ )
+
def get_last_ext_for_document(self, doc_id):
"""Given a base docid, find the most-recent extension to have run.
This will differ from the latest extension in the document chain if
the document chain has been 'reset' for any reason (eg, change
detected at the source of the message, user adding annotations, etc)
"""
startkey = [doc_id]
endkey = [doc_id, 99999999999999]
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,14 +1,15 @@
from twisted.internet import protocol, ssl, defer, error
from twisted.mail import imap4
import logging
from ..proc import base
from ..model import get_db
+from ..ext.message.rfc822 import doc_from_bytes
brat = base.Rat
logger = logging.getLogger(__name__)
class ImapClient(imap4.IMAP4Client):
'''
@@ -46,16 +47,20 @@ class ImapClient(imap4.IMAP4Client):
def _reqList(self, *args, **kwargs):
self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
return self.list('', '*').addCallback(self._procList)
def _procList(self, result, *args, **kwargs):
# As per http://python.codefetch.com/example/ow/tnpe_code/ch07/imapdownload.py,
# We keep a 'stack' of items left to process in an instance variable...
+ # (*sob* - but its not clear why this doesn't suffer from
+ # death-by-recursion like the other impls can demonstrate? Even though we
+ # use a 'stack' of items left to process, each call to
+ # _process_next_folder is recursive?
self.folder_infos = result[:]
return self._process_next_folder()
def _process_next_folder(self):
if not self.folder_infos:
# yay - all done!
return
@@ -130,17 +135,20 @@ class ImapClient(imap4.IMAP4Client):
assert len(result)==1, result
_, result = result.popitem()
flags = result['FLAGS']
# put the 'raw' document object together and save it.
doc = dict(
storage_key=[self.current_folder, uid],
imap_flags=flags,
)
- attachments = {'rfc822' : content}
+ attachments = {'rfc822' : {'content_type': 'message',
+ 'data': content,
+ }
+ }
return self.doc_model.create_raw_document(did, doc, 'proto/imap',
self.account,
attachments=attachments,
).addCallback(self._cb_saved_message)
def _cantGetMessage(self, failure):
logger.error("Failed to fetch message: %s", failure)
return self._process_next_message()
@@ -221,20 +229,20 @@ class ImapClientFactory(protocol.ClientF
# 'raw/message/rfc822' as output
class IMAPConverter(base.ConverterBase):
def convert(self, doc):
# I need the binary attachment.
return self.doc_model.open_document(doc['_id'], attachment="rfc822"
).addCallback(self._cb_got_attachment, doc)
def _cb_got_attachment(self, content, doc):
- headers, body = content.split('\r\n\r\n', 1)
- # XXX - *sob* - this is still wrong - 'body' is a still blob so needs to
- # go into an attachment.
- return {'headers': headers, 'body': body}
+ assert content, "can't locate attachment for %r" % doc['_id']
+ # the 'rfc822' module knows what to do...
+ return doc_from_bytes(content)
+
class IMAPAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
def startSync(self, conductor, doc_model):
self.factory = ImapClientFactory(self, conductor, doc_model)