altre destinazioni

ultimi post

ultimi commenti

tag principali

archivi

powered by

  • WPFrontman + WP

friends

copyright

  • © 2004-2011
    Ludovico Magnocavallo
    tutti i diritti riservati

Verifying SMIME email with M2Crypto

31 maggio 2004

In my spare time, I’m working on a project where I have to sign and verify SMIME mail using M2Crypto which works quite well, but lacks a bit in documentation especially on SMIME functions. The Programming S/MIME in Python with M2Crypto howto is enough to point you in the right direction, and the source has a few SMIME examples. What is missing is a recipe to verify signed SMIME messages if you don’t have the signer’s certificate, which is what usually happens when you have to verify Internet email.

Openssl’s smime command is able to do that, so there should be a way to accomplish the same thing from Python using M2Crypto. After a bit of fiddling around and looking at openssl’s source, I have found out a way which seems to work (update: content check done against the output of SMIME.smime_load_pkcs7_bio instead of using email.message_from_string, return a list of certificates on succesful verification, show content diff if verification fails):

#!/usr/bin/python
"""
Simple class to verify SMIME signed email messages
without having to know the signer's certificate.
The signer's certificate(s) is extracted from
the signed message, and returned on successful
verification.
A unified diff of the cleartext content against
the one resulting from verification is returned
as exception value if the content has been tampered
with.

Use at your own risk, send comments and fixes.
May 30, 2004
Ludovico Magnocavallo 
"""

import os, base64
from M2Crypto import BIO, SMIME, m2, X509
from difflib import unified_diff

class VerifierError(Exception): pass

class Verifier(object):
    """
    accepts an email payload and verifies it with SMIME
    """
    
    def __init__(self, certstore):
        """
        certstore - path to the file used to store
                    CA certificates
                    eg /etc/apache/ssl.crt/ca-bundle.crt
        
        >>> v = Verifier('/etc/dummy.crt')
        >>> v.verify('pippo')
        Traceback (most recent call last):
          File "/usr/lib/python2.3/doctest.py", line 442, in _run_examples_inner
            compileflags, 1) in globs
          File "", line 1, in ?
          File "verifier.py", line 46, in verify
            self._setup()
          File "verifier.py", line 36, in _setup
            raise VerifierError, "cannot access %s" % self._certstore
        VerifierError: cannot access /etc/dummy.crt
        >>>
        """
        self._certstore = certstore
        self._smime = None
    
    def _setup(self):
        """
        sets up the SMIME.SMIME instance
        and loads the CA certificates store
        """
        smime = SMIME.SMIME()
        st = X509.X509_Store()
        if not os.access(self._certstore, os.R_OK):
            raise VerifierError, "cannot access %s" % self._certstore
        st.load_info(self._certstore)
        smime.set_x509_store(st)
        self._smime = smime
        
    def verify(self, text):
        """
        verifies a signed SMIME email
        returns a list of certificates used to sign
        the SMIME message on success

        text - string containing the SMIME signed message

        >>> v = Verifier('/etc/apache/ssl.crt/ca-bundle.crt')
        >>> v.verify('pippo')
        Traceback (most recent call last):
          File "", line 1, in ?
          File "signer.py", line 23, in __init__
            raise VerifierError, e
        VerifierError: cannot extract payloads from message
        >>>
        >>> certs = v.verify(test_email)
        >>> isinstance(certs, list) and len(certs) > 0
        True
        >>>
        """
        if self._smime is None:
            self._setup()
        buf = BIO.MemoryBuffer(text)
        try:
            p7, data_bio = SMIME.smime_load_pkcs7_bio(buf)
        except SystemError:
            # uncaught exception in M2Crypto
            raise VerifierError, "cannot extract payloads from message"
        if data_bio is not None:
            data = data_bio.read()
            data_bio = BIO.MemoryBuffer(data)
        sk3 = p7.get0_signers(X509.X509_Stack())
        if len(sk3) == 0:
            raise VerifierError, "no certificates found in message"
        signer_certs = []
        for cert in sk3:
            signer_certs.append(
                "-----BEGIN CERTIFICATE-----n%s-----END CERTIFICATE-----" 
                    % base64.encodestring(sk3[0].as_der()))
        self._smime.set_x509_stack(sk3)
        try:
            if data_bio is not None:
                v = self._smime.verify(p7, data_bio)
            else:
                v = self._smime.verify(p7)
        except SMIME.SMIME_Error, e:
            raise VerifierError, "message verification failed: %s" % e
        if data_bio is not None and data != v:
            raise VerifierError, 
                "message verification failed: payload vs SMIME.verify output diffn%s" % 
                    'n'.join(list(unified_diff(data.split('n'), v.split('n'), n = 1)))
        return signer_certs


test_email = """put your test SMIME signed email here"""

def _test():
    import doctest
    return doctest.testmod()

if __name__ == "__main__":
    _test()

Customizing logging.LogRecord

18 maggio 2004

I haven’t posted anything in a long while since I’ve been pretty busy working on a complex project in my spare time (let’s call it project X), and I’ve not had Internet access outside office hours for the past couple of months. I hope to start writing again frequently soon, since I’m learning interesting things for project X which involves SMIME and SMTP mail (using OpenSSL, M2Crypto and the email package).

This brief note, done mainly so that I can use it as a reference in the future, regards customizing logging.LogRecord. One of the requirements of project X is that all operations for a single transaction are logged with a unique timestamp, representing the start of the transaction. Since I’m lazy, and I don’t trust myself too much when writing code (especially at late hours), I did not want to have to carry around this value everywhere, and remember to pass it to every logging call.

So last night I had a look into the logging internals, to see how to extend logging.LogRecord to carry an additional argument representing my unique timestamp. Apart from being very useful, the logging package is very well architected, so it turns out subclassing LogRecord is not too difficult. Let’s see one way of doing it, which is not necessarily the best one so please send me any improvements/suggestions.

LogRecord instances are

created every time something is logged. They contain all the information pertinent to the event being logged. [...]

LogRecord has no methods; it’s just a repository for information about the logging event. The only reason it’s a class rather than a dictionary is to facilitate extension.

LogRecord objects are created by makeRecord, a factory method of the logging.Logger class. To use a custom subclass of LogRecord, you have to subclass logging.Logger and override makeRecord, then set your subclassed Logger class as logging’s default which is not too hard.

When you call logging.getLogger(name) to get a new logger instance, the getLogger function calls Logger.manager.getLogger(name). Manager.getLogger (Logger.manager is Manager(Logger.root)) does a bunch of stuff, then if no loggers with the same name have been already created, it returns a new instance of _loggerClass for the given name. logging._loggerClass is set by default as _loggerClass = Logger. So to set your Logger subclass as logging’s default, just call:

logging.setLoggerClass(CustomLogger)

Now that we know what to subclass and how to set logging to use it, we still have to decide how to pass the extra argument to LogRecord. To pass it to our CustomLogger class as an additional __init__ argument we would have to subclass logging.Manager and override getLogger, which seems a bit too much work. Another option would be to directly set the extra parameter in every newly obtained CustomLogger instance, but then you would have to remember to set it every time, since logging raises a KeyError exception if you’d try to use the missing parameter as a string format (you could also set it to a default value in CustomLogging.__init__, but then you’d get a default value in your logs which IMHO is even worse). The way I choose to do it is to set the parameter in logging.root so that my extra argument is available to all loggers since they share a reference to the root logger. To make the root logger emit CustomLogRecord instances, I redefined logging.RootLogger.MakeRecord to match CustomLogger.MakeRecord.

This solution obviously won’t work if you need to set different extra arguments for different loggers, and my custom classes manage only a single extra argument, which is exactly what I need for this project but may not be enough for other cases.

#!/usr/bin/python
import logging
from time import strftime, gmtime

class CustomLogRecord(logging.LogRecord):
    """
    LogRecord subclass that stores a unique -- transaction -- time
    as an additional attribute
    """
    def __init__(self, name, level, pathname, lineno, msg, args, exc_info, trans_time):
        logging.LogRecord.__init__(self, name, level, pathname, lineno, msg, args, exc_info)
        self.trans_time = trans_time

def makeRecord(self, name, level, fn, lno, msg, args, exc_info):
    return CustomLogRecord(name, level, fn, lno, msg, args, exc_info, self.root.trans_time)

class CustomLogger(logging.Logger):
    """
    Logger subclass that uses CustomLogRecord as its LogRecord class
    """
    def __init__(self, name, level=logging.NOTSET):
        logging.Logger.__init__(self, name, level)
       
    makeRecord = makeRecord

# setup
logging.setLoggerClass(CustomLogger)
logging.root.trans_time = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
logging.RootLogger.makeRecord = makeRecord

if __name__ == '__main__':
    # get logger
    logger = logging.getLogger('modified logger')
    logger.propagate = False
    hdlr = logging.StreamHandler()
    hdlr.setFormatter(logging.Formatter(
        '%(trans_time)s [%(asctime)s] %(levelname)s %(process)d %(message)s'))
    logger.addHandler(hdlr)
    logger.setLevel(logging.DEBUG)
    logging.root.addHandler(hdlr)
    logger.debug('test')
    from time import sleep
    sleep(1)
    logger.debug('a second test, with (hopefully) the same date as the previous log record')
    logging.warn('root logging')

The output of the above script should be something like:

Tue, 18 May 2004 12:53:04 +0000 [2004-05-18 14:53:04,898] DEBUG 31268 test
Tue, 18 May 2004 12:53:04 +0000 [2004-05-18 14:53:05,898] DEBUG 31268 a second test,
with (hopefully) the same date as the previous log record
Tue, 18 May 2004 12:53:04 +0000 [2004-05-18 14:53:05,898] WARNING 31268 root logging