Source code for wsgidav.http_authenticator

# (c) 2009-2018 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Original PyFileServer (c) 2005 Ho Chun Wei.
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
WSGI middleware for HTTP basic and digest authentication.

Usage::

   from http_authenticator import HTTPAuthenticator

   WSGIApp = HTTPAuthenticator(ProtectedWSGIApp, domain_controller, acceptbasic,
                               acceptdigest, defaultdigest)

   where:
     ProtectedWSGIApp is the application requiring authenticated access

     domain_controller is a domain controller object meeting specific
     requirements (below)

     acceptbasic is a boolean indicating whether to accept requests using
     the basic authentication scheme (default = True)

     acceptdigest is a boolean indicating whether to accept requests using
     the digest authentication scheme (default = True)

     defaultdigest is a boolean. if True, an unauthenticated request will
     be sent a digest authentication required response, else the unauthenticated
     request will be sent a basic authentication required response
     (default = True)

The HTTPAuthenticator will put the following authenticated information in the
environ dictionary::

   environ["http_authenticator.realm"] = realm name
   environ["http_authenticator.username"] = username


**Domain Controllers**

The HTTP basic and digest authentication schemes are based on the following
concept:

Each requested relative URI can be resolved to a realm for authentication,
for example:
/fac_eng/courses/ee5903/timetable.pdf -> might resolve to realm 'Engineering General'
/fac_eng/examsolns/ee5903/thisyearssolns.pdf -> might resolve to realm 'Engineering Lecturers'
/med_sci/courses/m500/surgery.htm -> might resolve to realm 'Medical Sciences General'
and each realm would have a set of username and password pairs that would
allow access to the resource.

A domain controller provides this information to the HTTPAuthenticator.
This allows developers to write their own domain controllers, that might,
for example, interface with their own user database.

for simple applications, a SimpleDomainController is provided that will take
in a single realm name (for display) and a single dictionary of username (key)
and password (value) string pairs

Usage::

   from http_authenticator import SimpleDomainController
   users = dict(({'John Smith': 'YouNeverGuessMe', 'Dan Brown': 'DontGuessMeEither'})
   realm = 'Sample Realm'
   domain_controller = SimpleDomainController(users, realm)


Domain Controllers must provide the methods as described in
``wsgidav.interfaces.domaincontrollerinterface`` (interface_)

.. _interface : interfaces/domaincontrollerinterface.py

The environ variable here is the WSGI 'environ' dictionary. It is passed to
all methods of the domain controller as a means for developers to pass information
from previous middleware or server config (if required).
"""
import random
import re
import time
from hashlib import md5

from wsgidav import compat, util
from wsgidav.domain_controller import WsgiDAVDomainController
from wsgidav.middleware import BaseMiddleware
from wsgidav.util import calc_base64, calc_hexdigest

__docformat__ = "reStructuredText"

_logger = util.getModuleLogger(__name__)

# HOTFIX for Windows XP (Microsoft-WebDAV-MiniRedir/5.1.2600):
# When accessing a share '/dav/', XP sometimes sends digests for '/'.
# With this fix turned on, we allow '/' digests, when a matching '/dav' account
# is present.
HOTFIX_WINXP_AcceptRootShareLogin = True

# HOTFIX for Windows
# MW 2013-12-31: DON'T set this (will MS office to use anonymous always in
# some scenarios)
HOTFIX_WIN_AcceptAnonymousOptions = False


[docs]class SimpleDomainController(object): """SimpleDomainController : Simple domain controller for HTTPAuthenticator.""" def __init__(self, dictusers=None, realmname="SimpleDomain"): if dictusers is None: self._users = dict({"John Smith": "YouNeverGuessMe"}) else: self._users = dictusers self._realmname = realmname
[docs] def getDomainRealm(self, inputRelativeURL, environ): return self._realmname
[docs] def requireAuthentication(self, realmname, environ): return True
[docs] def isRealmUser(self, realmname, username, environ): return username in self._users
[docs] def getRealmUserPassword(self, realmname, username, environ): if username in self._users: return self._users[username] return None
[docs] def authDomainUser(self, realmname, username, password, environ): if username in self._users: return self._users[username] == password return False
# ======================================================================== # HTTPAuthenticator # ========================================================================
[docs]class HTTPAuthenticator(BaseMiddleware): """WSGI Middleware for basic and digest authenticator.""" def __init__(self, application, config): self._verbose = config.get("verbose", 2) self._application = application self._user_mapping = config.get("user_mapping", {}) self._domaincontroller = config.get( "domaincontroller") or WsgiDAVDomainController(self._user_mapping) self._acceptbasic = config.get("acceptbasic", True) self._acceptdigest = config.get("acceptdigest", True) self._defaultdigest = config.get("defaultdigest", True) self._trusted_auth_header = config.get("trusted_auth_header", None) self._noncedict = dict([]) self._headerparser = re.compile(r"([\w]+)=([^,]*),") # Note: extra parser to handle digest auth requests from certain # clients, that leave commas un-encoded to interfere with the above. self._headerfixparser = re.compile(r'([\w]+)=("[^"]*,[^"]*"),') self._headermethod = re.compile(r"^([\w]+)") wdcName = "NTDomainController" if self._domaincontroller.__class__.__name__ == wdcName: if self._acceptdigest or self._defaultdigest or not self._acceptbasic: _logger.warn( "WARNING: {} requires basic authentication.\n\tSet acceptbasic=True, " "acceptdigest=False, defaultdigest=False".format(wdcName))
[docs] def getDomainController(self): return self._domaincontroller
[docs] def allowAnonymousAccess(self, share): return (isinstance(self._domaincontroller, WsgiDAVDomainController) and not self._user_mapping.get(share))
[docs] def __call__(self, environ, start_response): realmname = self._domaincontroller.getDomainRealm(environ["PATH_INFO"], environ) _logger.debug("realm '{}'".format(realmname)) # _logger.debug("{}".format(environ)) force_allow = False if HOTFIX_WIN_AcceptAnonymousOptions and environ["REQUEST_METHOD"] == "OPTIONS": _logger.warning("No authorization required for OPTIONS method") force_allow = True if force_allow or not self._domaincontroller.requireAuthentication(realmname, environ): # no authentication needed _logger.debug("No authorization required for realm '{}'".format(realmname)) environ["http_authenticator.realm"] = realmname environ["http_authenticator.username"] = "" return self._application(environ, start_response) if self._trusted_auth_header and environ.get(self._trusted_auth_header): # accept a username that was injected by a trusted upstream server _logger.debug("Accept trusted username {}='{}'for realm '{}'".format( self._trusted_auth_header, environ.get(self._trusted_auth_header), realmname)) environ["http_authenticator.realm"] = realmname environ["http_authenticator.username"] = environ.get(self._trusted_auth_header) return self._application(environ, start_response) if "HTTP_AUTHORIZATION" in environ: authheader = environ["HTTP_AUTHORIZATION"] authmatch = self._headermethod.search(authheader) authmethod = "None" if authmatch: authmethod = authmatch.group(1).lower() if authmethod == "digest" and self._acceptdigest: return self.authDigestAuthRequest(environ, start_response) elif authmethod == "digest" and self._acceptbasic: return self.sendBasicAuthResponse(environ, start_response) elif authmethod == "basic" and self._acceptbasic: return self.authBasicAuthRequest(environ, start_response) # The requested auth method is not supported. elif self._defaultdigest and self._acceptdigest: return self.sendDigestAuthResponse(environ, start_response) elif self._acceptbasic: return self.sendBasicAuthResponse(environ, start_response) _logger.warn("HTTPAuthenticator: respond with 400 Bad request; Auth-Method: {}" .format(authmethod)) start_response("400 Bad Request", [("Content-Length", "0"), ("Date", util.getRfc1123Time()), ]) return [""] if self._defaultdigest: return self.sendDigestAuthResponse(environ, start_response) return self.sendBasicAuthResponse(environ, start_response)
[docs] def sendBasicAuthResponse(self, environ, start_response): realmname = self._domaincontroller.getDomainRealm( environ["PATH_INFO"], environ) _logger.debug("401 Not Authorized for realm '{}' (basic)".format(realmname)) wwwauthheaders = 'Basic realm="' + realmname + '"' body = compat.to_bytes(self.getErrorMessage()) start_response("401 Not Authorized", [("WWW-Authenticate", wwwauthheaders), ("Content-Type", "text/html"), ("Content-Length", str(len(body))), ("Date", util.getRfc1123Time()), ]) return [body]
[docs] def authBasicAuthRequest(self, environ, start_response): realmname = self._domaincontroller.getDomainRealm( environ["PATH_INFO"], environ) authheader = environ["HTTP_AUTHORIZATION"] authvalue = "" try: authvalue = authheader[len("Basic "):].strip() except Exception: authvalue = "" # authvalue = authvalue.strip().decode("base64") authvalue = compat.base64_decodebytes(compat.to_bytes(authvalue)) authvalue = compat.to_native(authvalue) username, password = authvalue.split(":", 1) if self._domaincontroller.authDomainUser(realmname, username, password, environ): environ["http_authenticator.realm"] = realmname environ["http_authenticator.username"] = username return self._application(environ, start_response) return self.sendBasicAuthResponse(environ, start_response)
[docs] def sendDigestAuthResponse(self, environ, start_response): realmname = self._domaincontroller.getDomainRealm( environ["PATH_INFO"], environ) random.seed() serverkey = hex(random.getrandbits(32))[2:] etagkey = calc_hexdigest(environ["PATH_INFO"]) timekey = str(time.time()) nonce_source = timekey + \ calc_hexdigest(timekey + ":" + etagkey + ":" + serverkey) nonce = calc_base64(nonce_source) wwwauthheaders = ('Digest realm="{}", nonce="{}", algorithm=MD5, qop="auth"' .format(realmname, nonce)) _logger.debug("401 Not Authorized for realm '{}' (digest): {}" .format(realmname, wwwauthheaders)) body = compat.to_bytes(self.getErrorMessage()) start_response("401 Not Authorized", [("WWW-Authenticate", wwwauthheaders), ("Content-Type", "text/html"), ("Content-Length", str(len(body))), ("Date", util.getRfc1123Time()), ]) return [body]
[docs] def authDigestAuthRequest(self, environ, start_response): realmname = self._domaincontroller.getDomainRealm( environ["PATH_INFO"], environ) isinvalidreq = False authheaderdict = dict([]) authheaders = environ["HTTP_AUTHORIZATION"] + "," if not authheaders.lower().strip().startswith("digest"): isinvalidreq = True # Hotfix for Windows file manager and OSX Finder: # Some clients don't urlencode paths in auth header, so uri value may # contain commas, which break the usual regex headerparser. Example: # Digest username="user",realm="/",uri="a,b.txt",nc=00000001, ... # -> [..., ('uri', '"a'), ('nc', '00000001'), ...] # Override any such values with carefully extracted ones. authheaderlist = self._headerparser.findall(authheaders) authheaderfixlist = self._headerfixparser.findall(authheaders) if authheaderfixlist: _logger.info("Fixing authheader comma-parsing: extend {} with {}" .format(authheaderlist, authheaderfixlist)) authheaderlist += authheaderfixlist for authheader in authheaderlist: authheaderkey = authheader[0] authheadervalue = authheader[1].strip().strip('"') authheaderdict[authheaderkey] = authheadervalue _logger.debug("authDigestAuthRequest: {}".format(environ["HTTP_AUTHORIZATION"])) _logger.debug(" -> {}".format(authheaderdict)) if "username" in authheaderdict: req_username = authheaderdict["username"] req_username_org = req_username # Hotfix for Windows XP: # net use W: http://127.0.0.1/dav /USER:DOMAIN\tester tester # will send the name with double backslashes ('DOMAIN\\tester') # but send the digest for the simple name ('DOMAIN\tester'). if r"\\" in req_username: req_username = req_username.replace("\\\\", "\\") _logger.info("Fixing Windows name with double backslash: '{}' --> '{}'" .format(req_username_org, req_username)) if not self._domaincontroller.isRealmUser(realmname, req_username, environ): isinvalidreq = True else: isinvalidreq = True # TODO: Chun added this comments, but code was commented out # Do not do realm checking - a hotfix for WinXP using some other realm's # auth details for this realm - if user/password match if "realm" in authheaderdict: if authheaderdict["realm"].upper() != realmname.upper(): if HOTFIX_WINXP_AcceptRootShareLogin: # Hotfix: also accept '/' if authheaderdict["realm"].upper() != "/": isinvalidreq = True else: isinvalidreq = True if "algorithm" in authheaderdict: if authheaderdict["algorithm"].upper() != "MD5": isinvalidreq = True # only MD5 supported if "uri" in authheaderdict: req_uri = authheaderdict["uri"] if "nonce" in authheaderdict: req_nonce = authheaderdict["nonce"] else: isinvalidreq = True req_hasqop = False if "qop" in authheaderdict: req_hasqop = True req_qop = authheaderdict["qop"] if req_qop.lower() != "auth": isinvalidreq = True # only auth supported, auth-int not supported else: req_qop = None if "cnonce" in authheaderdict: req_cnonce = authheaderdict["cnonce"] else: req_cnonce = None if req_hasqop: isinvalidreq = True if "nc" in authheaderdict: # is read but nonce-count checking not implemented req_nc = authheaderdict["nc"] else: req_nc = None if req_hasqop: isinvalidreq = True if "response" in authheaderdict: req_response = authheaderdict["response"] else: isinvalidreq = True if not isinvalidreq: req_password = self._domaincontroller.getRealmUserPassword( realmname, req_username, environ) req_method = environ["REQUEST_METHOD"] required_digest = self.computeDigestResponse( req_username, realmname, req_password, req_method, req_uri, req_nonce, req_cnonce, req_qop, req_nc) if required_digest != req_response: _logger.warning("computeDigestResponse('{}', '{}', ...): {} != {}".format( realmname, req_username, required_digest, req_response)) if HOTFIX_WINXP_AcceptRootShareLogin: # Hotfix: also accept '/' digest root_digest = self.computeDigestResponse( req_username, "/", req_password, req_method, req_uri, req_nonce, req_cnonce, req_qop, req_nc) if root_digest == req_response: _logger.warn( "authDigestAuthRequest: HOTFIX: accepting '/' login for '{}'." .format(realmname)) else: isinvalidreq = True else: isinvalidreq = True else: # _logger.debug("digest succeeded for realm '{}', user '{}'" # .format(realmname, req_username)) pass if isinvalidreq: _logger.warn("Authentication failed for user '{}', realm '{}'" .format(req_username, realmname)) return self.sendDigestAuthResponse(environ, start_response) environ["http_authenticator.realm"] = realmname environ["http_authenticator.username"] = req_username return self._application(environ, start_response)
[docs] def computeDigestResponse( self, username, realm, password, method, uri, nonce, cnonce, qop, nc): A1 = username + ":" + realm + ":" + password A2 = method + ":" + uri if qop: digestresp = self.md5kd(self.md5h( A1), nonce + ":" + nc + ":" + cnonce + ":" + qop + ":" + self.md5h(A2)) else: digestresp = self.md5kd(self.md5h(A1), nonce + ":" + self.md5h(A2)) return digestresp
[docs] def md5h(self, data): return md5(compat.to_bytes(data)).hexdigest()
[docs] def md5kd(self, secret, data): return self.md5h(secret + ":" + data)
[docs] def getErrorMessage(self): message = """ <html><head><title>401 Access not authorized</title></head> <body> <h1>401 Access not authorized</h1> </body> </html> """ return message