# (c) 2009-2024 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Original PyFileServer (c) 2005 Ho Chun Wei.
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Implements the `LockManager` object that provides the locking functionality.
The LockManager requires a LockStorageDict object to implement persistence.
Two alternative lock storage classes are defined in the lock_storage module:
- wsgidav.lock_man.lock_storage.LockStorageDict
- wsgidav.lock_man.lock_storage.LockStorageShelve
The lock data model is a dictionary with these fields:
root:
Resource URL.
principal:
Name of the authenticated user that created the lock.
type:
Must be 'write'.
scope:
Must be 'shared' or 'exclusive'.
depth:
Must be '0' or 'infinity'.
owner:
String identifying the owner.
timeout:
Seconds remaining until lock expiration.
This value is passed to create() and refresh()
expire:
Converted timeout for persistence: expire = time() + timeout.
token:
Automatically generated unique token.
"""
import random
import time
from pprint import pformat
from wsgidav import util
from wsgidav.dav_error import (
HTTP_LOCKED,
DAVError,
DAVErrorCondition,
PRECONDITION_CODE_LockConflict,
)
from wsgidav.rw_lock import ReadWriteLock
__docformat__ = "reStructuredText"
_logger = util.get_module_logger("wsgidav.lock_man")
# ========================================================================
# Tool functions
# ========================================================================
[docs]
def generate_lock_token():
return "opaquelocktoken:" + util.to_str(hex(random.getrandbits(256)))
[docs]
def normalize_lock_root(path):
# Normalize root: /foo/bar
assert path
path = util.to_str(path)
path = "/" + path.strip("/")
return path
[docs]
def is_lock_expired(lock):
expire = float(lock["expire"])
return expire >= 0 and expire < time.time()
[docs]
def lock_string(lock_dict):
"""Return readable rep."""
if not lock_dict:
return "Lock: None"
if lock_dict["expire"] < 0:
expire = "Infinite ({})".format(lock_dict["expire"])
else:
expire = "{} (in {} seconds)".format(
util.get_log_time(lock_dict["expire"]), lock_dict["expire"] - time.time()
)
return "Lock(<{}..>, {!r}, {}, {}, depth-{}, until {}".format(
# first 4 significant token characters
lock_dict.get("token", "?" * 30)[18:22],
lock_dict.get("root"),
lock_dict.get("principal"),
lock_dict.get("scope"),
lock_dict.get("depth"),
expire,
)
[docs]
def validate_lock(lock):
assert util.is_str(lock["root"])
assert lock["root"].startswith("/")
assert lock["type"] == "write"
assert lock["scope"] in ("shared", "exclusive")
assert lock["depth"] in ("0", "infinity")
assert util.is_bytes(lock["owner"]), lock # XML bytestring
# raises TypeError:
timeout = float(lock["timeout"])
assert timeout > 0 or timeout == -1, "timeout must be positive or -1"
assert util.is_str(lock["principal"])
if "token" in lock:
assert util.is_str(lock["token"])
# ========================================================================
# LockManager
# ========================================================================
[docs]
class LockManager:
"""
Implements locking functionality using a custom storage layer.
"""
LOCK_TIME_OUT_DEFAULT = 604800 # 1 week, in seconds
def __init__(self, storage):
"""
storage:
LockManagerStorage object
"""
assert hasattr(storage, "get_lock_list")
self._lock = ReadWriteLock()
self.storage = storage
self.storage.open()
def __del__(self):
self.storage.close()
def __repr__(self):
return f"{self.__class__.__name__}({self.storage!r})"
def _dump(self, msg=""):
urlDict = {} # { <url>: [<tokenlist>] }
ownerDict = {} # { <LOCKOWNER>: [<tokenlist>] }
userDict = {} # { <LOCKUSER>: [<tokenlist>] }
tokenDict = {} # { <token>: <LOCKURLS> }
_logger.info(f"{self}: {msg}")
for lock in self.storage.get_lock_list(
"/", include_root=True, include_children=True, token_only=False
):
tok = lock["token"]
tokenDict[tok] = lock_string(lock)
userDict.setdefault(lock["principal"], []).append(tok)
ownerDict.setdefault(lock["owner"], []).append(tok)
urlDict.setdefault(lock["root"], []).append(tok)
_logger.info(f"Locks:\n{pformat(tokenDict, indent=0, width=255)}")
if tokenDict:
_logger.info(f"Locks by URL:\n{pformat(urlDict, indent=4, width=255)}")
_logger.info(
f"Locks by principal:\n{pformat(userDict, indent=4, width=255)}"
)
_logger.info(f"Locks by owner:\n{pformat(ownerDict, indent=4, width=255)}")
def _generate_lock(
self, principal, lock_type, lock_scope, lock_depth, lock_owner, path, timeout
):
"""Acquire lock and return lock_dict.
principal
Name of the principal.
lock_type
Must be 'write'.
lock_scope
Must be 'shared' or 'exclusive'.
lock_depth
Must be '0' or 'infinity'.
lock_owner
String identifying the owner.
path
Resource URL.
timeout
Seconds to live
This function does NOT check, if the new lock creates a conflict!
"""
if timeout is None:
timeout = LockManager.LOCK_TIME_OUT_DEFAULT
elif timeout < 0:
timeout = -1
lock_dict = {
"root": path,
"type": lock_type,
"scope": lock_scope,
"depth": lock_depth,
"owner": lock_owner,
"timeout": timeout,
"principal": principal,
}
#
self.storage.create(path, lock_dict)
return lock_dict
[docs]
def acquire(
self,
*,
url,
lock_type,
lock_scope,
lock_depth,
lock_owner,
timeout,
principal,
token_list,
):
"""Check for permissions and acquire a lock.
On success return new lock dictionary.
On error raise a DAVError with an embedded DAVErrorCondition.
"""
url = normalize_lock_root(url)
self._lock.acquire_write()
try:
# Raises DAVError on conflict:
self._check_lock_permission(
url, lock_type, lock_scope, lock_depth, token_list, principal
)
return self._generate_lock(
principal, lock_type, lock_scope, lock_depth, lock_owner, url, timeout
)
finally:
self._lock.release()
[docs]
def refresh(self, token, *, timeout=None):
"""Set new timeout for lock, if existing and valid."""
if timeout is None:
timeout = LockManager.LOCK_TIME_OUT_DEFAULT
return self.storage.refresh(token, timeout=timeout)
[docs]
def get_lock(self, token, *, key=None):
"""Return lock_dict, or None, if not found or invalid.
Side effect: if lock is expired, it will be purged and None is returned.
key:
name of lock attribute that will be returned instead of a dictionary.
"""
assert key in (
None,
"type",
"scope",
"depth",
"owner",
"root",
"timeout",
"principal",
"token",
)
lock = self.storage.get(token)
if key is None or lock is None:
return lock
return lock[key]
[docs]
def release(self, token):
"""Delete lock."""
self.storage.delete(token)
[docs]
def is_token_locked_by_user(self, token, principal):
"""Return True, if <token> exists, is valid, and bound to <principal>."""
return self.get_lock(token, key="principal") == principal
[docs]
def get_url_lock_list(self, url, *, recursive=False):
"""Return list of lock_dict, if <url> is protected by at least one direct, valid lock.
Side effect: expired locks for this url are purged.
"""
url = normalize_lock_root(url)
lockList = self.storage.get_lock_list(
url, include_root=True, include_children=recursive, token_only=False
)
return lockList
[docs]
def get_indirect_url_lock_list(self, url, *, principal=None):
"""Return a list of valid lockDicts, that protect <path> directly or indirectly.
If a principal is given, only locks owned by this principal are returned.
Side effect: expired locks for this path and all parents are purged.
"""
url = normalize_lock_root(url)
lockList = []
u = url
while u:
lock_list = self.storage.get_lock_list(
u, include_root=True, include_children=False, token_only=False
)
for lock in lock_list:
if u != url and lock["depth"] != "infinity":
continue # We only consider parents with Depth: infinity
# TODO: handle shared locks in some way?
# if (lock["scope"] == "shared" and lock_scope == "shared"
# and principal != lock["principal"]):
# continue # Only compatible with shared locks by other users
if principal is None or principal == lock["principal"]:
lockList.append(lock)
u = util.get_uri_parent(u)
return lockList
[docs]
def is_url_locked(self, url):
"""Return True, if url is directly locked."""
lockList = self.get_url_lock_list(url)
return len(lockList) > 0
[docs]
def is_url_locked_by_token(self, url, lock_token):
"""Check, if url (or any of it's parents) is locked by lock_token."""
lockUrl = self.get_lock(lock_token, key="root")
return lockUrl and util.is_equal_or_child_uri(lockUrl, url)
[docs]
def remove_all_locks_from_url(self, url, *, recursive=False):
self._lock.acquire_write()
try:
lockList = self.get_url_lock_list(url, recursive=recursive)
for lock in lockList:
self.release(lock["token"])
finally:
self._lock.release()
def _check_lock_permission(
self, url, lock_type, lock_scope, lock_depth, token_list, principal
):
"""Check, if <principal> can lock <url>, otherwise raise an error.
If locking <url> would create a conflict, DAVError(HTTP_LOCKED) is
raised. An embedded DAVErrorCondition contains the conflicting resource.
@see http://www.webdav.org/specs/rfc4918.html#lock-model
- Parent locks WILL NOT be conflicting, if they are depth-0.
- Exclusive depth-infinity parent locks WILL be conflicting, even if
they are owned by <principal>.
- Child locks WILL NOT be conflicting, if we request a depth-0 lock.
- Exclusive child locks WILL be conflicting, even if they are owned by
<principal>. (7.7)
- It is not enough to check whether a lock is owned by <principal>, but
also the token must be passed with the request. (Because <principal>
may run two different applications on his client.)
- <principal> cannot lock-exclusive, if he holds a parent shared-lock.
(This would only make sense, if he was the only shared-lock holder.)
- TODO: litmus tries to acquire a shared lock on one resource twice
(locks: 27 'double_sharedlock') and fails, when we return HTTP_LOCKED.
So we allow multi shared locks on a resource even for the same
principal.
@param url: URL that shall be locked
@param lock_type: "write"
@param lock_scope: "shared"|"exclusive"
@param lock_depth: "0"|"infinity"
@param token_list: list of lock tokens, that the user submitted in If: header
@param principal: name of the principal requesting a lock
@return: None (or raise)
"""
assert lock_type == "write"
assert lock_scope in ("shared", "exclusive")
assert lock_depth in ("0", "infinity")
_logger.debug(
f"checkLockPermission({url}, {lock_scope}, {lock_depth}, {principal})"
)
# Error precondition to collect conflicting URLs
errcond = DAVErrorCondition(PRECONDITION_CODE_LockConflict)
self._lock.acquire_read()
try:
# Check url and all parents for conflicting locks
u = url
while u:
lock_list = self.get_url_lock_list(u)
for lock in lock_list:
_logger.debug(f" check parent {u}, {lock_string(lock)}")
if u != url and lock["depth"] != "infinity":
# We only consider parents with Depth: infinity
continue
elif lock["scope"] == "shared" and lock_scope == "shared":
# Only compatible with shared locks (even by same
# principal)
continue
# Lock conflict
_logger.debug(
f" -> DENIED due to locked parent {lock_string(lock)}"
)
errcond.add_href(lock["root"])
u = util.get_uri_parent(u)
if lock_depth == "infinity":
# Check child URLs for conflicting locks
child_ocks = self.storage.get_lock_list(
url, include_root=False, include_children=True, token_only=False
)
for lock in child_ocks:
assert util.is_child_uri(url, lock["root"])
# if util.is_child_uri(url, lock["root"]):
_logger.debug(f" -> DENIED due to locked child {lock_string(lock)}")
errcond.add_href(lock["root"])
finally:
self._lock.release()
# If there were conflicts, raise HTTP_LOCKED for <url>, and pass
# conflicting resource with 'no-conflicting-lock' precondition
if len(errcond.hrefs) > 0:
raise DAVError(HTTP_LOCKED, err_condition=errcond)
return
[docs]
def check_write_permission(self, *, url, depth, token_list, principal):
"""Check, if <principal> can modify <url>, otherwise raise HTTP_LOCKED.
If modifying <url> is prevented by a lock, DAVError(HTTP_LOCKED) is
raised. An embedded DAVErrorCondition contains the conflicting locks.
<url> may be modified by <principal>, if it is not currently locked
directly or indirectly (i.e. by a locked parent).
For depth-infinity operations, <url> also must not have locked children.
It is not enough to check whether a lock is owned by <principal>, but
also the token must be passed with the request. Because <principal> may
run two different applications.
See http://www.webdav.org/specs/rfc4918.html#lock-model
http://www.webdav.org/specs/rfc4918.html#rfc.section.7.4
TODO: verify assumptions:
- Parent locks WILL NOT be conflicting, if they are depth-0.
- Exclusive child locks WILL be conflicting, even if they are owned by <principal>.
@param url: URL that shall be modified, created, moved, or deleted
@param depth: "0"|"infinity"
@param token_list: list of lock tokens, that the principal submitted in If: header
@param principal: name of the principal requesting a lock
@return: None or raise error
"""
assert util.is_str(url)
assert depth in ("0", "infinity")
_logger.debug(
f"check_write_permission({url}, {depth}, {token_list}, {principal})"
)
# Error precondition to collect conflicting URLs
errcond = DAVErrorCondition(PRECONDITION_CODE_LockConflict)
self._lock.acquire_read()
try:
# Check url and all parents for conflicting locks
u = url
while u:
lock_list = self.get_url_lock_list(u)
_logger.debug(f" checking {u}")
for lock in lock_list:
_logger.debug(f" lock={lock_string(lock)}")
if u != url and lock["depth"] != "infinity":
# We only consider parents with Depth: infinity
continue
elif principal == lock["principal"] and lock["token"] in token_list:
# User owns this lock
continue
else:
# Token is owned by principal, but not passed with lock list
_logger.debug(
f" -> DENIED due to locked parent {lock_string(lock)}"
)
errcond.add_href(lock["root"])
u = util.get_uri_parent(u)
if depth == "infinity":
# Check child URLs for conflicting locks
child_ocks = self.storage.get_lock_list(
url, include_root=False, include_children=True, token_only=False
)
for lock in child_ocks:
assert util.is_child_uri(url, lock["root"])
# if util.is_child_uri(url, lock["root"]):
_logger.debug(f" -> DENIED due to locked child {lock_string(lock)}")
errcond.add_href(lock["root"])
finally:
self._lock.release()
# If there were conflicts, raise HTTP_LOCKED for <url>, and pass
# conflicting resource with 'no-conflicting-lock' precondition
if len(errcond.hrefs) > 0:
raise DAVError(HTTP_LOCKED, err_condition=errcond)
return