Download

Feel free to browse the source below, or to download a copy.

smtp_helper_tw.py


"""
An smtp server that integration with twisted.trial to ensure emails are sent.
"""
from twisted.internet import defer, protocol
from twisted.mail import smtp
from zope.interface import implements

from email.Header import Header

#-------------------------------------------------------------------------------
# This was taken (mostly) from page 124 of the Twisted book.
# I spent some 20 minutes looking through the mail.smtp source code trying to 
# determine if all of these methods were needed or if I could cut some to 
# result in slimmer code.  I failed to be able to sufficiently read and 
# understand all of the interactions between the various classes in that time
# period, and gave up in order to go help my wife cook brownies.  
#
# In order for our tests to be able to know that the message has finished
# delivery so that we can ensure that the message was as intended.
class CallbackMessageReceiver(object):
        implements(smtp.IMessage)

        def __init__(self, callback):
            # We separate the current lines we're using
            self.lines = []

            # from the full messages that we've received.
            self.callback = callback

        def lineReceived(self, line):
            self.lines.append(line)

        def eomReceived(self):
            # message is complete, tell everyone about it
            self.callback("\n".join(self.lines) + "\n")
            self.lines = []
            return defer.succeed(True)


#-------------------------------------------------------------------------------
class CallbackMessageDelivery(object):
    implements(smtp.IMessageDelivery)

    def __init__(self, callback):
        self.callback = callback

    def receivedHeader(self, helo, origin, recipients):
        myHostname, clientIP = helo
        headerValue = "by %s from %s with ESMTP ; %s" % (
                myHostname, clientIP, smtp.rfc822date())
        return "Received: %s" % Header(headerValue)

    def validateTo(self, user):
        # Use the CallbackMessageReceiver to append messages to our
        # message list.
        return lambda: CallbackMessageReceiver(self.callback)

    def validateFrom(self, helo, originAddress):
        return originAddress


#-------------------------------------------------------------------------------
class SMTPProtocol(smtp.SMTP):
    """
    From http://blackjml.livejournal.com/23029.html
    we need to be able to inform people of lost connections, this
    follows pattern presented in that blog post, which I was told to follow
    by the author when I asked for help in this in the #twisted channel
    on irc.freenode.net.

    In other words, if you want your tests to properly succeed, you
    MUST set the onConnectionLost to a new deferred when you setUp your tests
    and then return that deferred from the tearDown method so that the
    tests waits for the SMTPServer to disconnect.
    """
    def __init__(self, *args, **kwargs):
        smtp.SMTP.__init__(self, *args, **kwargs)
        self.onConnectionLost = defer.Deferred()

    def connectionLost(self, *args, **kwargs):
        smtp.SMTP.connectionLost(self, *args, **kwargs)
        self.onConnectionLost.callback(self)



#-------------------------------------------------------------------------------
class DeferredSMTPFactory(protocol.ServerFactory):
    """
    A twisted-style SMTP server factory.
    """
    def __init__(self):
        self.listOfDeferreds = []
        self.protocolDeferreds = {}

    def informDeferreds(self, result):
        deferred = self.listOfDeferreds.pop(0)
        deferred.callback(result)

    def messageReceived(self):
        d = defer.Deferred()
        self.listOfDeferreds.append(d)
        return d

    def protocolConnectionLost(self, protocol):
        del self.protocolDeferreds[protocol]
        return protocol

    def buildProtocol(self, addr):
        delivery = CallbackMessageDelivery(self.informDeferreds)
        smtpProtocol = SMTPProtocol(delivery)
        smtpProtocol.factory = self
        d = self.protocolDeferreds[smtpProtocol] = smtpProtocol.onConnectionLost
        d.addCallback(self.protocolConnectionLost)
        return smtpProtocol



integration_tests_tw.py


"""
Integration tests which test the side-effect that emails are sent out properly.
"""
from twisted.internet import defer, reactor
from twisted.trial.unittest import TestCase

from smtp_helper_tw import DeferredSMTPFactory
from business_tw import register_user, conversation, DEFAULT_PORT, DEFAULT_HOST

#-------------------------------------------------------------------------------
class EmailTestMixin(TestCase):
    def setUp(self):
        self.deferredSMTP = DeferredSMTPFactory()
        self.smtpServerPort = reactor.listenTCP(DEFAULT_PORT, self.deferredSMTP)

    def tearDown(self):
        d = defer.maybeDeferred(self.smtpServerPort.stopListening)
        protocolDeferreds = self.deferredSMTP.protocolDeferreds.values()
        protocolDeferreds.append(d)
        return defer.gatherResults(protocolDeferreds)

    def test_register(self):
        def confirm_first_email(received_message):
            # when registration succeeds, one email MUST be sent.
            self.assertTrue('Welcome lakin.wecker' in received_message)
            return received_message
        emailReceivedDeferred = self.deferredSMTP.messageReceived()
        emailReceivedDeferred.addCallback(confirm_first_email)
        d = register_user('lakin.wecker')
        return defer.gatherResults([d, emailReceivedDeferred])

    def test_conversation(self):

        # I'm 99% certain that the order of the received messages in this
        # test are not guaranteed.  It happens to work out right on my system, 
        # but in reality testing these in order may not be possible without 
        # changing the way the emails are tested.
        def confirm_first_email(received_message):
            # when registration succeeds, two emails MUST be sent
            self.assertTrue('racoon@foo.com says' in received_message)
            return received_message
        first_email_deferred = self.deferredSMTP.messageReceived()
        first_email_deferred.addCallback(confirm_first_email)

        def confirm_second_email(received_message):
            # when registration succeeds, two emails MUST be sent
            self.assertTrue('simon@foo.com says' in received_message)
            return received_message
        second_email_deferred = self.deferredSMTP.messageReceived()
        second_email_deferred.addCallback(confirm_second_email)


        d = conversation('simon@foo.com', 'racoon@foo.com')
        return defer.gatherResults([d, 
               first_email_deferred, second_email_deferred])


#-------------------------------------------------------------------------------
if __name__ == "__main__":
    import sys
    from twisted.scripts.trial import run
    sys.argv.append('integration_tests_twisted')
    run()



business_tw.py


"""
Some simple business logic surrounding user registration and user conversations.
"""
from twisted.internet import defer
from email.MIMEText import MIMEText
from twisted.mail import smtp

DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 8825

#-------------------------------------------------------------------------------
def send_email(
        toaddrs, subject, content, fromAddr,
        server_name=DEFAULT_HOST, server_port=DEFAULT_PORT):

    msg = MIMEText(content, "plain", "iso-8859-1")

    msg['Subject'] = subject
    msg['From'] = fromAddr
    msg['To'] = u", ".join(toaddrs)

    messageData = msg.as_string(unixfrom=False)
    return smtp.sendmail(server_name, server_port, toaddrs, messageData,
            port=server_port)


#-------------------------------------------------------------------------------
# These are the pieces of code that We'd like to test, the first sends one email
# the second sends two emails and we'd like to ensure that the appropriate emails
# are sent.
def register_user(username):
    return send_email(
        ['foo@foo.com'],
        'Registration Complete',
        'Welcome %s' % username,
        'registration@foo.com')

def conversation(first_email, second_email):
    dl = []
    for email_address in (first_email, second_email):
        dl.append(send_email(
            [email_address],
            'Conversation',
            '%s says' % email_address,
            'conversations@foo.com'))
    return defer.gatherResults(dl)