Source code for wsgidav.lock_man.lock_storage

# (c) 2009-2023 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Original PyFileServer (c) 2005 Ho Chun Wei.
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Implements two storage providers for `LockManager`.

Two alternative lock storage classes are defined here: one in-memory
(dict-based), and one persistent low performance variant using shelve.

See :class:`~wsgidav.lock_man.lock_manager.LockManager`
See :class:`~wsgidav.lock_man.lock_storage.LockStorageDict`
See :class:`~wsgidav.lock_man.lock_storage.LockStorageShelve`
"""
import os
import shelve
import time

from wsgidav import util
from wsgidav.lock_man.lock_manager import (
    generate_lock_token,
    lock_string,
    normalize_lock_root,
    validate_lock,
)
from wsgidav.rw_lock import ReadWriteLock

__docformat__ = "reStructuredText"

# _logger = util.get_module_logger(__name__)
_logger = util.get_module_logger("wsgidav.lock_man")

# TODO: comment's from Ian Bicking (2005)
# @@: Use of shelve means this is only really useful in a threaded environment.
#    And if you have just a single-process threaded environment, you could get
#    nearly the same effect with a dictionary of threading.Lock() objects.  Of course,
#    it would be better to move off shelve anyway, probably to a system with
#    a directory of per-file locks, using the file locking primitives (which,
#    sadly, are not quite portable).
# @@: It would probably be easy to store the properties as pickle objects
# in a parallel directory structure to the files you are describing.
# Pickle is expedient, but later you could use something more readable
# (pickles aren't particularly readable)


# ========================================================================
# LockStorageDict
# ========================================================================
[docs] class LockStorageDict: """ An in-memory lock manager storage implementation using a dictionary. R/W access is guarded by a thread.lock object. Also, to make it work with a Shelve dictionary, modifying dictionary members is done by re-assignment and we call a _flush() method. This is obviously not persistent, but should be enough in some cases. For a persistent implementation, see lock_storage.LockStorageShelve(). Notes: expire is stored as expiration date in seconds since epoch (not in seconds until expiration). The dictionary is built like:: { 'URL2TOKEN:/temp/litmus/lockme': ['opaquelocktoken:0x1d7b86...', 'opaquelocktoken:0xd7d4c0...'], 'opaquelocktoken:0x1d7b86...': { 'depth': '0', 'owner': "<?xml version=\'1.0\' encoding=\'UTF-8\'?>\\n<owner xmlns="DAV:">" + "litmus test suite</owner>\\n", 'principal': 'tester', 'root': '/temp/litmus/lockme', 'scope': 'shared', 'expire': 1261328382.4530001, 'token': 'opaquelocktoken:0x1d7b86...', 'type': 'write', }, 'opaquelocktoken:0xd7d4c0...': { 'depth': '0', 'owner': '<?xml version=\'1.0\' encoding=\'UTF-8\'?>\\n<owner xmlns="DAV:">' + 'litmus: notowner_sharedlock</owner>\\n', 'principal': 'tester', 'root': '/temp/litmus/lockme', 'scope': 'shared', 'expire': 1261328381.6040001, 'token': 'opaquelocktoken:0xd7d4c0...', 'type': 'write' }, } """ # noqa LOCK_TIME_OUT_DEFAULT = 604800 # 1 week, in seconds LOCK_TIME_OUT_MAX = 4 * 604800 # 1 month, in seconds def __init__(self): self._dict = None self._lock = ReadWriteLock() def __repr__(self): return self.__class__.__name__ # def __del__(self): # pass def _flush(self): """Overloaded by Shelve implementation.""" pass
[docs] def open(self): """Called before first use. May be implemented to initialize a storage. """ assert self._dict is None self._dict = {}
[docs] def close(self): """Called on shutdown.""" self._dict = None
[docs] def cleanup(self): """Purge expired locks (optional).""" pass
[docs] def clear(self): """Delete all entries.""" if self._dict is not None: self._dict.clear()
[docs] def get(self, token): """Return a lock dictionary for a token. If the lock does not exist or is expired, None is returned. token: lock token Returns: Lock dictionary or <None> Side effect: if lock is expired, it will be purged and None is returned. """ self._lock.acquire_read() try: lock = self._dict.get(token) if lock is None: # Lock not found: purge dangling URL2TOKEN entries _logger.debug(f"Lock purged dangling: {token}") self.delete(token) return None expire = float(lock["expire"]) if expire >= 0 and expire < time.time(): _logger.debug(f"Lock timed-out({expire}): {lock_string(lock)}") self.delete(token) return None return lock finally: self._lock.release()
[docs] def create(self, path, lock): """Create a direct lock for a resource path. path: Normalized path (utf8 encoded string, no trailing '/') lock: lock dictionary, without a token entry Returns: New unique lock token.: <lock **Note:** the lock dictionary may be modified on return: - lock['root'] is ignored and set to the normalized <path> - lock['timeout'] may be normalized and shorter than requested - lock['token'] is added """ self._lock.acquire_write() try: # We expect only a lock definition, not an existing lock assert lock.get("token") is None assert lock.get("expire") is None, "Use timeout instead of expire" assert path and "/" in path # Normalize root: /foo/bar org_path = path path = normalize_lock_root(path) lock["root"] = path # Normalize timeout from ttl to expire-date timeout = float(lock.get("timeout")) if timeout is None: timeout = LockStorageDict.LOCK_TIME_OUT_DEFAULT elif timeout < 0 or timeout > LockStorageDict.LOCK_TIME_OUT_MAX: timeout = LockStorageDict.LOCK_TIME_OUT_MAX lock["timeout"] = timeout lock["expire"] = time.time() + timeout validate_lock(lock) token = generate_lock_token() lock["token"] = token # Store lock self._dict[token] = lock # Store locked path reference key = f"URL2TOKEN:{path}" if key not in self._dict: self._dict[key] = [token] else: # Note: Shelve dictionary returns copies, so we must reassign # values: tokList = self._dict[key] tokList.append(token) self._dict[key] = tokList self._flush() _logger.debug(f"LockStorageDict.set({org_path!r}): {lock_string(lock)}") return lock finally: self._lock.release()
[docs] def refresh(self, token, *, timeout): """Modify an existing lock's timeout. token: Valid lock token. timeout: Suggested lifetime in seconds (-1 for infinite). The real expiration time may be shorter than requested! Returns: Lock dictionary. Raises ValueError, if token is invalid. """ assert token in self._dict, "Lock must exist" assert timeout == -1 or timeout > 0 if timeout < 0 or timeout > LockStorageDict.LOCK_TIME_OUT_MAX: timeout = LockStorageDict.LOCK_TIME_OUT_MAX self._lock.acquire_write() try: # Note: shelve dictionary returns copies, so we must reassign # values: lock = self._dict[token] lock["timeout"] = timeout lock["expire"] = time.time() + timeout self._dict[token] = lock self._flush() finally: self._lock.release() return lock
[docs] def delete(self, token): """Delete lock. Returns True on success. False, if token does not exist, or is expired. """ self._lock.acquire_write() try: lock = self._dict.get(token) _logger.debug(f"delete {lock_string(lock)}") if lock is None: return False # Remove url to lock mapping key = "URL2TOKEN:{}".format(lock.get("root")) if key in self._dict: # _logger.debug(" delete token {} from url {}".format(token, lock.get("root"))) tokList = self._dict[key] if len(tokList) > 1: # Note: shelve dictionary returns copies, so we must # reassign values: tokList.remove(token) self._dict[key] = tokList else: del self._dict[key] # Remove the lock del self._dict[token] self._flush() finally: self._lock.release() return True
[docs] def get_lock_list(self, path, *, include_root, include_children, token_only): """Return a list of direct locks for <path>. Expired locks are *not* returned (but may be purged). path: Normalized path (utf8 encoded string, no trailing '/') include_root: False: don't add <path> lock (only makes sense, when include_children is True). include_children: True: Also check all sub-paths for existing locks. token_only: True: only a list of token is returned. This may be implemented more efficiently by some providers. Returns: List of valid lock dictionaries (may be empty). """ assert util.is_str(path) assert path and path.startswith("/") assert include_root or include_children def __appendLocks(toklist): # Since we can do this quickly, we use self.get() even if # token_only is set, so expired locks are purged. for token in toklist: lock = self.get(token) if lock: if token_only: lockList.append(lock["token"]) else: lockList.append(lock) path = normalize_lock_root(path) self._lock.acquire_read() try: key = f"URL2TOKEN:{path}" tokList = self._dict.get(key, []) lockList = [] if include_root: __appendLocks(tokList) if include_children: for u, ltoks in self._dict.items(): if util.is_child_uri(key, u): __appendLocks(ltoks) return lockList finally: self._lock.release()
# ======================================================================== # LockStorageShelve # ========================================================================
[docs] class LockStorageShelve(LockStorageDict): """ A low performance lock manager implementation using shelve. """ def __init__(self, storage_path): super().__init__() self._storage_path = os.path.abspath(storage_path) def __repr__(self): return f"LockStorageShelve({self._storage_path!r})" def _flush(self): """Write persistent dictionary to disc.""" _logger.debug("_flush()") self._lock.acquire_write() # TODO: read access is enough? try: self._dict.sync() finally: self._lock.release()
[docs] def clear(self): """Delete all entries.""" self._lock.acquire_write() # TODO: read access is enough? try: was_closed = self._dict is None if was_closed: self.open() if len(self._dict): self._dict.clear() self._dict.sync() if was_closed: self.close() finally: self._lock.release()
[docs] def open(self): _logger.debug(f"open({self._storage_path!r})") # Open with writeback=False, which is faster, but we have to be # careful to re-assign values to _dict after modifying them self._dict = shelve.open(self._storage_path, writeback=False)
# if __debug__ and self._verbose >= 2: # self._check("After shelve.open()") # self._dump("After shelve.open()")
[docs] def close(self): _logger.debug("close()") self._lock.acquire_write() try: if self._dict is not None: self._dict.close() self._dict = None finally: self._lock.release()