Because my job needed something like this, I created a Python script that'll connect to an IMAP server, get all the emails, and then calculate the very rough daily average of emails received to the Inbox (as well as the 'smallest' number of emails handled per day, and the 'largest' number of emails handled per day, for the mailbox). It's only designed to base the numbers off of existing emails in the inbox, not deleted items. This is fine, since we don't delete anything in the inbox.
The inbox is the only mailbox used, because it's a listserv mailbox, so it's not necessary to worry about the other mail boxes. (The IMAP backend is dovecot.)
The script allows for you to pass in arguments for its connection data, or if you don't it will prompt you for those items that it needs. (IMAP server, IMAP login username/mailbox, IMAP password). This relies on argparse
for arguments processing.
My only concern is that to gather the dates and do daily statistics, it has to pull every email in the inbox down for analysis. This is going to be evil when run on a mailbox with hundreds, or even thousands, of emails. My main concern is whether there's any way to optimize this, because I don't want to be slamming "fetch" requests to the IMAP server and flood the connection or lag out the IMAP process.
mailbox_daily_average.py:
#!/usr/bin/python
import imaplib
import datetime
import email
import sys
import argparse
def _get_arguments():
# Argument Parser
parser = argparse.ArgumentParser(
description="Run daily average stats for an IMAP mailbox.", add_help=False)
parser.add_argument('--server', default=None, help="Mail server to use")
parser.add_argument('--username', '--mailbox',
default=None,
help="Mailbox to run stats on")
parser.add_argument('--password',
default=None,
help="Login password for mailbox")
return parser.parse_args()
def main():
imap_messages_by_date = {}
args = _get_arguments()
imap_server = args.server
imap_user = args.username
imap_password = args.password
# This section handles interactive obtaining of connection details, if none are provided.
if args.server is None:
imap_server = str(raw_input("Please specify the IMAP server: "))
if args.username is None:
imap_user = str(raw_input("Please specify the IMAP username or mailbox to check: "))
if args.password is None:
imap_password = str(raw_input("Please enter the password for the IMAP mailbox: "))
try:
imap_conn = imaplib.IMAP4_SSL(imap_server, 993)
imap_conn.login(imap_user, imap_password)
imap_conn.select("INBOX", True)
rv, data = imap_conn.search(None, "ALL")
if rv != 'OK':
print "No Messages!"
sys.exit(0)
for num in data[0].split():
rv, msgdata = imap_conn.fetch(num, '(RFC822)')
if rv != 'OK':
print "ERROR getting message ", num
msg = email.message_from_string(msgdata[0][1])
date_tuple = email.utils.parsedate_tz(msg['Date'])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
datestamp = local_date.strftime('%Y-%m-%d')
try:
imap_messages_by_date[datestamp] += 1
except KeyError: # Doesn't exist in system yet
imap_messages_by_date[datestamp] = 1
# for key, value in IMAP_MESSAGES_BY_DATE.iteritems():
# print "Date: %s || Count: %s" % (key, value)
dates_count = 0.0 # Init this here
messages_total_count = 0.0 # Init this here
for key, value in imap_messages_by_date.iteritems():
dates_count += 1
messages_total_count += value
max_emails_per_day = max(imap_messages_by_date.itervalues())
min_emails_per_day = min(imap_messages_by_date.itervalues())
rough_daily_average = messages_total_count / dates_count
print "Min Mails Per Day (So Far): %s" % min_emails_per_day
print "Max Mails Per Day (So Far): %s" % max_emails_per_day
print "(Rough) Daily Mail Average: ", rough_daily_average
except Exception as error:
print "An error has occurred, and the program has crashed; details:\n"
print str(error)
sys.exit(10)
if __name__ == "__main__":
main()
2 Answers 2
Possible bug
Note that the minimum and the average only take into account days when there was mail. Any days with no mail at all are ignored. That may or may not be the intended behaviour.
Overview
I'll start by presenting a vision of how main
should look like:
import argparse
from collections import Counter
from datetime import datetime
from getpass import getpass
import imaplib
from itertools import islice
import email
import email.parser
import sys
...
def main():
args = get_arguments()
imap_server = args.server or raw_input('Server: ')
imap_user = args.username or raw_input('Username: ')
imap_password = args.password or getpass('Password: ')
try:
conn = imaplib.IMAP4_SSL(imap_server, 993)
conn.login(imap_user, imap_password)
msgs = imap_messages(conn, fetch='(BODY[HEADER.FIELDS (DATE)])')
msgs_by_date = Counter(filter(None, (
header_date(msg) for msg in msgs
)))
if not msgs_by_date:
print "Empty mailbox"
return
min_emails_per_day = msgs_by_date.most_common()[-1][1]
max_emails_per_day = msgs_by_date.most_common(1)[0][1]
avg_emails_per_day = float(sum(msgs_by_date.values())) / len(msgs_by_date)
print 'Min Mails Per Day (So Far):', min_emails_per_day
print 'Max Mails Per Day (So Far):', max_emails_per_day
print '(Rough) Daily Mail Average:', avg_emails_per_day
except Exception as error:
print "An error has occurred, and the program has crashed; details:\n"
print error
return 10
if __name__ == "__main__":
sys.exit(main())
Most of the suggestions I would like to make are apparent in that excerpt:
Parameters:
- The leading underscore in
_get_arguments
should be dropped: it's not a private method in a class. - Argument defaulting can be done using the
or
idiom:a or b
will evaluate tob
ifa
isNone
or if it is an empty string. - Passwords should not be echoed on screen, so you should use
getpass.getpass()
instead ofraw_input()
.
IMAP:
This is the most complex part of the logic, so you should move some of the code into functions.
If you only want the Date header, then ask for just the Date header. (See RFC 3501 Sec 6.4.5.)
Depending on the IMAP server implementation, the Date might be stored in a more readily accessible index, and thus faster to retrieve. Also try using the
INTERNALDATE
rather thanBODY[HEADER.FIELDS (DATE)]
to see if it's faster. (The internal date is the time at which the message was introduced into the mailbox, rather than the time at which the sender claims to have sent the message.)
Statistics:
- A good data structure to use would be a
collections.Counter
. - A good way to populate the
Counter
is with a generator expression. - The average can be more elegantly computed using sum() and len(). For consistency, rename
rough_daily_average
toavg_emails_per_day
. - For consistency, use either
print "...: %s" % ...
orprint "...", ...
.
Error handling:
print str(error)
can just beprint error
, sinceprint
implicitly causes conversion to a string.- I prefer not to hide
sys.exit()
inside a function.
IMAP
My main()
uses two functions: imap_messages
and header_date
.
class IMAPException(Exception): pass
def imap_messages(conn, mbox='INBOX', search='ALL', fetch='ALL', batch_size=1000):
# http://stackoverflow.com/a/8991553
def batch(iterable, batch_size):
it = iter(iterable)
while True:
chunk = list(islice(it, batch_size))
if not chunk:
return
yield chunk
ok, (result,) = conn.select(mbox, readonly=True)
if ok != 'OK': raise IMAPException(result)
ok, (result,) = conn.search(None, search)
if ok != 'OK': raise IMAPException(result)
for chunk in batch(result.split(), batch_size):
ok, results = conn.fetch(','.join(chunk), fetch)
if ok != 'OK': raise IMAPException('Failed to fetch some messages')
for msg in results:
yield msg
header_parser = email.parser.HeaderParser()
def header_date(msg):
for info in msg:
parsed_header = header_parser.parsestr(info)
if 'Date' in parsed_header:
date_tuple = email.utils.parsedate_tz(parsed_header['Date'])
local_date = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
return local_date.strftime('%Y-%m-%d')
Remarks:
Extracting some of the hard-coded strings as default parameters makes them not so hard-coded and can improve readability.
The most significant performance improvement would be to issue
FETCH
commands for batches of messages. As noted in the RFC cited above, the IMAP client can sayA654 FETCH 2:4 (FLAGS BODY[HEADER.FIELDS (DATE FROM)])
to get the flags and the Date and From headers for messages 2, 3, and 4.
The caveat is that you can't get too greedy. If the response is too large, you'll get an error. The trick is to limit the batch size.
To make it clear what
.select("INBOX", True)
means, explicitly name the second parameter:.select("INBOX", readonly=True)
.To help avoid some of the ugliness of the indexes in
msgdata[0][1]
, you can use destructuring assignment..search(...)
on an empty mailbox should return(OK, [''])
. If the response code is notOK
, then something went wrong. Yourprint "No messages!"; sys.exit(0);
handling suggests that you are treating that as the expected behaviour for an empty mailbox, rather than as an error.Signal errors using exceptions rather than printing a message.
You should split your code up into more functions to separate the different responsibilities. One thing that should go into a function is prompting for missing parameters.
def _get_argument_parser():
# Argument Parser
parser = argparse.ArgumentParser(
description="Run daily average stats for an IMAP mailbox.", add_help=False)
parser.add_argument('--server', default=None, dest='host', help="Mail server to use")
parser.add_argument('--port', default=993, type=int, help="Port to use")
parser.add_argument('--username', '--mailbox',
default=None, dest='user',
help="Mailbox to run stats on")
parser.add_argument('--password',
default=None,
help="Login password for mailbox")
return parser
def _get_arguments():
parser = _get_argument_parser()
args = vars(parser.parse_args())
parser_actions = parser._option_string_actions.itervalues()
help_string = {action.dest: action.help for action in parser_actions}
for var in args:
if var is None:
args[var] = raw_input(help_string[var])
return argparse.Namespace(**args)
Here I renamed you _get_arguments
to _get_arguments_parser
. In addition I set the dest
to be the same as the one needed by imaplib
later
I also omitted the str
call, because raw_input
always returns a string. I also used some argparse
magic to get the help string for all variables. I then use that string to query correctly for that variable (this gets rid of the Please...
, which should be OK). I cast the Namespace
to a dict to be able to iterate over it, but cast it back to a Namespace
at the end.
For your counting, instead of doing
imap_messages_by_date = {}
...
for num in data[0].split():
...
try:
imap_messages_by_date[datestamp] += 1
except KeyError: # Doesn't exist in system yet
imap_messages_by_date[datestamp] = 1
you should use a collections.defaultdict
:
imap_messages_by_date = collections.defaultdict(int)
for num in data[0].split():
...
imap_messages_by_date[datestamp] += 1
You should also move the definition of imap_messages_by_date
closer to the loop, it took me quite some time to find it in your code.
Connecting to a mail server should go into its separate function as well:
def connect(args)
imap_conn = imaplib.IMAP4_SSL(args.host, args.port)
imap_conn.login(args.user, args.password)
imap_conn.select("INBOX", True)
return imap_conn
Calculating statistics should also go into its own function. Here I would try to avoid iterating over all of imap_messages_by_date
three times and calculate them manually in one pass:
def statistics(data):
dates_count = total_count = max_emails = 0. # to not get integer division
min_emails = sys.maxint # Should be large enough
for day, emails data.iteritems():
dates_count += 1
total_count += emails
if emails > max_emails:
max_emails = emails
if emails < min_emails:
min_emails = emails
return total_count / dates_count, min_emails, max_emails
Getting the number of messages per day is another task. Putting it into a function makes it easier to change it if someone comes up with a better way to count the number of messages in the inbox.
I also split getting the date_stamp
from a msg
into its own function here. It can even be simplified if date_tuple
is of the format (year, month, day, hour, minute, second)
or (year, month, day)
.
def get_datestamp(msg):
date_tuple = email.utils.parsedate_tz(msg['Date'])
if date_tuple:
local_date = datetime.datetime(*date_tuple)
return local_date.strftime('%Y-%m-%d')
print "ERROR email without valid timestamp ", email
return ""
def get_messages_per_day(imap_conn):
rv, data = imap_conn.search(None, "ALL")
if rv != 'OK':
print "No Messages!"
sys.exit(0)
imap_messages_by_date = defaultdict(int)
for num in data[0].split():
rv, msgdata = imap_conn.fetch(num, '(RFC822)')
if rv != 'OK':
print "ERROR getting message ", num
msg = email.message_from_string(msgdata[0][1])
datestamp = get_datestamp(msg)
imap_messages_by_date[datestamp] += 1
return imap_messages_by_date
Final code:
#!/usr/bin/python
import imaplib
import datetime
import email
import sys
import argparse
import collections
def _get_argument_parser():
# Argument Parser
parser = argparse.ArgumentParser(
description="Run daily average stats for an IMAP mailbox.", add_help=False)
parser.add_argument('--server', default=None, dest='host', help="Mail server to use")
parser.add_argument('--port', default=993, type=int, help="Port to use")
parser.add_argument('--username', '--mailbox',
default=None, dest='user',
help="Mailbox to run stats on")
parser.add_argument('--password',
default=None,
help="Login password for mailbox")
return parser
def _get_arguments():
parser = _get_argument_parser()
args = vars(parser.parse_args())
help_string = {action.dest: action.help for action in parser._actions}
for var in args:
if var is None:
args[var] = raw_input(help_string[var])
return argparse.Namespace(**args)
def connect(args)
imap_conn = imaplib.IMAP4_SSL(args.host, args.port)
imap_conn.login(args.user, args.password)
imap_conn.select("INBOX", True)
return imap_conn
def get_datestamp(msg):
date_tuple = email.utils.parsedate_tz(msg['Date'])
if date_tuple:
local_date = datetime.datetime(*date_tuple)
return local_date.strftime('%Y-%m-%d')
print "ERROR email without valid timestamp ", email
return ""
def get_messages_per_day(imap_conn):
rv, data = imap_conn.search(None, "ALL")
if rv != 'OK':
print "No Messages!"
sys.exit(0)
imap_messages_by_date = defaultdict(int)
for num in data[0].split():
rv, msgdata = imap_conn.fetch(num, '(RFC822)')
if rv != 'OK':
print "ERROR getting message ", num
msg = email.message_from_string(msgdata[0][1])
datestamp = get_datestamp(msg)
imap_messages_by_date[datestamp] += 1
return imap_messages_by_date
def statistics(data):
dates_count = total_count = max_emails = 0. # to not get integer division
min_emails = sys.maxint # Should be large enough
for day, emails data.iteritems():
dates_count += 1
total_count += emails
if emails > max_emails:
max_emails = emails
if emails < min_emails:
min_emails = emails
return min_emails, max_emails, total_count / dates_count
def main():
args = _get_arguments()
try:
imap_conn = connect(args)
imap_messages_by_date = get_messages_per_day(imap_conn)
print """Min Mails Per Day (So Far): %s
Max Mails Per Day (So Far): %s
(Rough) Daily Mail Average: %s""" % statistics(imap_messages_by_date)
except Exception as error:
print "An error has occurred, and the program has crashed; details:\n"
print str(error)
sys.exit(10)
if __name__ == "__main__":
main()
Explore related questions
See similar questions with these tags.