Download

Feel free to browse the source below, or to download a copy.

smtp_helper.py


"""
A helper class which allows test code to intercept and store sent email.
"""
import smtpd
import asyncore
from business import DEFAULT_HOST, DEFAULT_PORT


#-------------------------------------------------------------------------------
class TestSMTPServer(smtpd.SMTPServer):
    """
    An SMTP Server used to allow integration tests which ensure email is sent.
    """

    def __init__(self, localaddr=(DEFAULT_HOST, DEFAULT_PORT)):
        self.rcvd = []
        smtpd.SMTPServer.__init__(self, localaddr, None)

    def start(self):
        import threading
        self.poller = threading.Thread(target=asyncore.loop,
                kwargs={'timeout':0.1, 'use_poll':True})
        self.poller.start()

    def process_message(self, peer, mailfrom, rcpttos, data):
        self.rcvd.append((mailfrom, rcpttos, data))

    def close(self):
        smtpd.SMTPServer.close(self)
        self.poller.join()



business.py


"""
Simple examples of potential functions that need testing.
"""
import smtplib
import socket
from email.MIMEText import MIMEText

DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 8825

#-------------------------------------------------------------------------------
def send_email(
        toaddrs, subject, content, fromaddr,
        server_name=DEFAULT_HOST, server_port=DEFAULT_PORT):

    msg = MIMEText(content, "plain", "iso-8859-1")

    msg['Subject'] = subject
    msg['From'] = fromaddr
    msg['To'] = u", ".join(toaddrs)

    server = None
    try:
        server = smtplib.SMTP(server_name, server_port)
    except socket.error, e:
        # This is raised by SMTP.connect if the connection fails. It seems
        # appropriate to wrap it in an SMTP error to make things easier for
        # callers. The socket error is not documented as of 2.5.
        raise smtplib.SMTPConnectError(0, e.message)

    response = server.sendmail(fromaddr, toaddrs, msg.as_string())
    server.quit()
    return response

#-------------------------------------------------------------------------------
# These are the pieces of code that We'd like to test, the first sends one email
# the second sends two emails and we'd like to ensure that the appropriate emails
# are sent.
def register_user(username):
    send_email(
        ['foo@foo.com'],
        'Registration Complete',
        'Welcome %s' % username,
        'registration@foo.com')
    return username

def conversation(first_email, second_email):
    for email_address in (first_email, second_email):
        send_email(
            [email_address],
            'Conversation',
            '%s says' % email_address,
            'conversations@foo.com')



integration_test.py


"""
Integration tests which test the side-effect that emails are sent out properly.
"""
import unittest
from smtp_helper import TestSMTPServer
from business import register_user, conversation, DEFAULT_HOST, DEFAULT_PORT


#-------------------------------------------------------------------------------
class TestEmailSent(unittest.TestCase):
    def setUp(self):
        self.smtp_server = TestSMTPServer()
        self.smtp_server.start()

    def tearDown(self):
        self.smtp_server.close()

    def test_registration(self):
        username = register_user('lakin.wecker')

        # When registration succeeds, one email MUST be sent.
        self.assertEqual(len(self.smtp_server.rcvd), 1)
        fromaddr, toaddrs, message = self.smtp_server.rcvd[0]
        self.assertTrue('Welcome lakin.wecker' in message)

    def test_conversation(self):
        conversation('simon@foo.com', 'racoon@foo.com')

        # When conversation succeeds, two emails MUST be sent.
        # one two each side of the conversation

        self.assertEqual(len(self.smtp_server.rcvd), 2)

        fromaddr, toaddrs, message = self.smtp_server.rcvd[0]
        self.assertTrue('simon@foo.com says' in message)

        fromaddr, toaddrs, message = self.smtp_server.rcvd[1]
        self.assertTrue('racoon@foo.com says' in message)


#-------------------------------------------------------------------------------
if __name__ == "__main__":
    unittest.main()