diff --git a/pgp_mime_lib/reader.py b/pgp_mime_lib/reader.py new file mode 100644 index 0000000000000000000000000000000000000000..45b7d24947f3ed031268c0a756a6225325753787 --- /dev/null +++ b/pgp_mime_lib/reader.py @@ -0,0 +1,83 @@ +# https://github.com/leapcode/leap_mail/blob/develop/src/leap/mail/incoming/service.py +# should consider get_email_charset from: +# https://github.com/leapcode/leap_pycommon/blob/6efdee0a5bab59d90c915b44a65e798b4a426c4a/src/leap/common/mail.py + +from email.utils import parseaddr + + +MULTIPART_ENCRYPTED = "multipart/encrypted" +MULTIPART_SIGNED = "multipart/signed" +PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" +PGP_END = "-----END PGP MESSAGE-----" + + +class ReaderException(Exception): + pass + + +def extract_text_payload(msg): + if msg.is_multipart(): + text = u'' + for part in msg.walk(): + if part.get_content_type() == 'text/plain': + text += unicode(part.get_payload(decode=True), + part.get_content_charset('utf-8')) + else: + text = unicode(msg.get_payload(decode=True), + msg.get_content_charset('utf-8')) + return text + + +def decrypt_multipart(msg, sender_address, gpg): + payload = msg.get_payload() + if (len(payload) != 2 or + payload[0].get_content_type() != 'application/pgp-encrypted' or + payload[1].get_content_type() != 'application/octet-stream'): + raise ReaderException("Invalid format for a multipart encrypted message") + enc_data = payload[1].get_payload() + data = str(gpg.decrypt(enc_data)) + + return data + + +def maybe_decrypt_inline(msg, sender_address, gpg): + """Decrypt a inline PGP message or a normal plain text message""" + + data = extract_text_payload(msg) + was_encrypted = False + + if PGP_BEGIN in data and PGP_END in data: + begin = data.find(PGP_BEGIN) + end = data.find(PGP_END) + if begin > end: + raise ReaderException("Invalid inline PGP message") + pgp_message = data[begin:end + len(PGP_END)] + if len(pgp_message) == 0: + raise ReaderException("Empty inline PGP message") + was_encrypted = True + return (str(gpg.decrypt(pgp_message)), was_encrypted) + + return (data, was_encrypted) + + +def read_encrypted_message(msg, gpg): + """Return the body of a email message, wether it is encrypted or not.""" + + was_encrypted = False + content_type = msg.get_content_type() + + from_header = msg.get('from', None) + sender_address = None + if from_header is not None and content_type in (MULTIPART_ENCRYPTED, + MULTIPART_SIGNED): + sender_address = parseaddr(from_header)[1] + + if content_type == MULTIPART_ENCRYPTED: + was_encrypted = True + data = decrypt_multipart(msg, sender_address, gpg) + elif content_type == MULTIPART_SIGNED: + raise ReaderException("Signed but not encrypted messages are not supported yet") + else: + (data, was_encrypted) = maybe_decrypt_inline(msg, sender_address, gpg) + + return (data, was_encrypted)