[Python-checkins] cpython (merge 3.5 -> default): Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.

r.david.murray python-checkins at python.org
Sun Nov 8 01:06:01 EST 2015


https://hg.python.org/cpython/rev/7368b86432c6
changeset: 99010:7368b86432c6
parent: 99008:13550b3ca1a6
parent: 99009:d13263ecf0c6
user: R David Murray <rdmurray at bitdance.com>
date: Sun Nov 08 01:05:11 2015 -0500
summary:
 Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.
files:
 Lib/smtplib.py | 19 +-
 Lib/test/test_smtplib.py | 179 ++++++++++++++++++--------
 Misc/NEWS | 2 +
 3 files changed, 132 insertions(+), 68 deletions(-)
diff --git a/Lib/smtplib.py b/Lib/smtplib.py
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -630,12 +630,12 @@
 (code, resp) = self.docmd("AUTH", mechanism + " " + response)
 else:
 (code, resp) = self.docmd("AUTH", mechanism)
- # Server replies with 334 (challenge) or 535 (not supported)
- if code == 334:
- challenge = base64.decodebytes(resp)
- response = encode_base64(
- authobject(challenge).encode('ascii'), eol='')
- (code, resp) = self.docmd(response)
+ # If server responds with a challenge, send the response.
+ if code == 334:
+ challenge = base64.decodebytes(resp)
+ response = encode_base64(
+ authobject(challenge).encode('ascii'), eol='')
+ (code, resp) = self.docmd(response)
 if code in (235, 503):
 return (code, resp)
 raise SMTPAuthenticationError(code, resp)
@@ -657,11 +657,10 @@
 def auth_login(self, challenge=None):
 """ Authobject to use with LOGIN authentication. Requires self.user and
 self.password to be set."""
- (code, resp) = self.docmd(
- encode_base64(self.user.encode('ascii'), eol=''))
- if code == 334:
+ if challenge is None:
+ return self.user
+ else:
 return self.password
- raise SMTPAuthenticationError(code, resp)
 
 def login(self, user, password, *, initial_response_ok=True):
 """Log in on an SMTP server that requires authentication.
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,8 +1,10 @@
 import asyncore
+import base64
 import email.mime.text
 from email.message import EmailMessage
 from email.base64mime import body_encode as encode_base64
 import email.utils
+import hmac
 import socket
 import smtpd
 import smtplib
@@ -623,20 +625,12 @@
 sim_auth = ('Mr.A at somewhere.com', 'somepassword')
 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
-sim_auth_credentials = {
- 'login': 'TXIuQUBzb21ld2hlcmUuY29t',
- 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
- 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
- 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
- }
-sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
-sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
-
 sim_lists = {'list-1':['Mr.A at somewhere.com','Mrs.C at somewhereesle.com'],
 'list-2':['Ms.B at xn--fo-fka.com',],
 }
 
 # Simulated SMTP channel & server
+class ResponseException(Exception): pass
 class SimSMTPChannel(smtpd.SMTPChannel):
 
 quit_response = None
@@ -646,12 +640,109 @@
 rcpt_count = 0
 rset_count = 0
 disconnect = 0
+ AUTH = 99 # Add protocol state to enable auth testing.
+ authenticated_user = None
 
 def __init__(self, extra_features, *args, **kw):
 self._extrafeatures = ''.join(
 [ "250-{0}\r\n".format(x) for x in extra_features ])
 super(SimSMTPChannel, self).__init__(*args, **kw)
 
+ # AUTH related stuff. It would be nice if support for this were in smtpd.
+ def found_terminator(self):
+ if self.smtp_state == self.AUTH:
+ line = self._emptystring.join(self.received_lines)
+ print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
+ self.received_lines = []
+ try:
+ self.auth_object(line)
+ except ResponseException as e:
+ self.smtp_state = self.COMMAND
+ self.push('%s %s' % (e.smtp_code, e.smtp_error))
+ return
+ super().found_terminator()
+
+
+ def smtp_AUTH(self, arg):
+ if not self.seen_greeting:
+ self.push('503 Error: send EHLO first')
+ return
+ if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
+ self.push('500 Error: command "AUTH" not recognized')
+ return
+ if self.authenticated_user is not None:
+ self.push(
+ '503 Bad sequence of commands: already authenticated')
+ return
+ args = arg.split()
+ if len(args) not in [1, 2]:
+ self.push('501 Syntax: AUTH <mechanism> [initial-response]')
+ return
+ auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
+ try:
+ self.auth_object = getattr(self, auth_object_name)
+ except AttributeError:
+ self.push('504 Command parameter not implemented: unsupported '
+ ' authentication mechanism {!r}'.format(auth_object_name))
+ return
+ self.smtp_state = self.AUTH
+ self.auth_object(args[1] if len(args) == 2 else None)
+
+ def _authenticated(self, user, valid):
+ if valid:
+ self.authenticated_user = user
+ self.push('235 Authentication Succeeded')
+ else:
+ self.push('535 Authentication credentials invalid')
+ self.smtp_state = self.COMMAND
+
+ def _decode_base64(self, string):
+ return base64.decodebytes(string.encode('ascii')).decode('utf-8')
+
+ def _auth_plain(self, arg=None):
+ if arg is None:
+ self.push('334 ')
+ else:
+ logpass = self._decode_base64(arg)
+ try:
+ *_, user, password = logpass.split('0円')
+ except ValueError as e:
+ self.push('535 Splitting response {!r} into user and password'
+ ' failed: {}'.format(logpass, e))
+ return
+ self._authenticated(user, password == sim_auth[1])
+
+ def _auth_login(self, arg=None):
+ if arg is None:
+ # base64 encoded 'Username:'
+ self.push('334 VXNlcm5hbWU6')
+ elif not hasattr(self, '_auth_login_user'):
+ self._auth_login_user = self._decode_base64(arg)
+ # base64 encoded 'Password:'
+ self.push('334 UGFzc3dvcmQ6')
+ else:
+ password = self._decode_base64(arg)
+ self._authenticated(self._auth_login_user, password == sim_auth[1])
+ del self._auth_login_user
+
+ def _auth_cram_md5(self, arg=None):
+ if arg is None:
+ self.push('334 {}'.format(sim_cram_md5_challenge))
+ else:
+ logpass = self._decode_base64(arg)
+ try:
+ user, hashed_pass = logpass.split()
+ except ValueError as e:
+ self.push('535 Splitting response {!r} into user and password'
+ 'failed: {}'.format(logpass, e))
+ return False
+ valid_hashed_pass = hmac.HMAC(
+ sim_auth[1].encode('ascii'),
+ self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
+ 'md5').hexdigest()
+ self._authenticated(user, hashed_pass == valid_hashed_pass)
+ # end AUTH related stuff.
+
 def smtp_EHLO(self, arg):
 resp = ('250-testhost\r\n'
 '250-EXPN\r\n'
@@ -683,20 +774,6 @@
 else:
 self.push('550 No access for you!')
 
- def smtp_AUTH(self, arg):
- mech = arg.strip().lower()
- if mech=='cram-md5':
- self.push('334 {}'.format(sim_cram_md5_challenge))
- elif mech not in sim_auth_credentials:
- self.push('504 auth type unimplemented')
- return
- elif mech=='plain':
- self.push('334 ')
- elif mech=='login':
- self.push('334 ')
- else:
- self.push('550 No access for you!')
-
 def smtp_QUIT(self, arg):
 if self.quit_response is None:
 super(SimSMTPChannel, self).smtp_QUIT(arg)
@@ -841,63 +918,49 @@
 self.assertEqual(smtp.expn(u), expected_unknown)
 smtp.quit()
 
- # SimSMTPChannel doesn't fully support AUTH because it requires a
- # synchronous read to obtain the credentials...so instead smtpd
- # sees the credential sent by smtplib's login method as an unknown command,
- # which results in smtplib raising an auth error. Fortunately the error
- # message contains the encoded credential, so we can partially check that it
- # was generated correctly (partially, because the 'word' is uppercased in
- # the error message).
-
 def testAUTH_PLAIN(self):
 self.serv.add_feature("AUTH PLAIN")
 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_plain, str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
 smtp.close()
 
 def testAUTH_LOGIN(self):
 self.serv.add_feature("AUTH LOGIN")
 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- try: smtp.login(sim_auth[0], sim_auth[1])
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_login_user, str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
 smtp.close()
 
 def testAUTH_CRAM_MD5(self):
 self.serv.add_feature("AUTH CRAM-MD5")
 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-
- try: smtp.login(sim_auth[0], sim_auth[1])
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_credentials['cram-md5'], str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
 smtp.close()
 
 def testAUTH_multiple(self):
 # Test that multiple authentication methods are tried.
 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- try: smtp.login(sim_auth[0], sim_auth[1])
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_login_user, str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
 smtp.close()
 
 def test_auth_function(self):
- smtp = smtplib.SMTP(HOST, self.port,
- local_hostname='localhost', timeout=15)
- self.serv.add_feature("AUTH CRAM-MD5")
- smtp.user, smtp.password = sim_auth[0], sim_auth[1]
- supported = {'CRAM-MD5': smtp.auth_cram_md5,
- 'PLAIN': smtp.auth_plain,
- 'LOGIN': smtp.auth_login,
- }
- for mechanism, method in supported.items():
- try: smtp.auth(mechanism, method, initial_response_ok=False)
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
- str(err))
- smtp.close()
+ supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
+ for mechanism in supported:
+ self.serv.add_feature("AUTH {}".format(mechanism))
+ for mechanism in supported:
+ with self.subTest(mechanism=mechanism):
+ smtp = smtplib.SMTP(HOST, self.port,
+ local_hostname='localhost', timeout=15)
+ smtp.ehlo('foo')
+ smtp.user, smtp.password = sim_auth[0], sim_auth[1]
+ method = 'auth_' + mechanism.lower().replace('-', '_')
+ resp = smtp.auth(mechanism, getattr(smtp, method))
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
+ smtp.close()
 
 def test_quit_resets_greeting(self):
 smtp = smtplib.SMTP(HOST, self.port,
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -79,6 +79,8 @@
 Library
 -------
 
+- Issue #25446: Fix regression in smtplib's AUTH LOGIN support.
+
 - Issue #18010: Fix the pydoc web server's module search function to handle
 exceptions from importing packages.
 
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /