Source code for wsgidav.dav_provider

# (c) 2009-2024 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Original PyFileServer (c) 2005 Ho Chun Wei.
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Abstract base class for DAV resource providers.

This module serves these purposes:

  1. Documentation of the DAVProvider interface
  2. Common base class for all DAV providers
  3. Default implementation for most functionality that a resource provider must
     deliver.

If no default implementation can be provided, then all write actions generate
FORBIDDEN errors. Read requests generate NOT_IMPLEMENTED errors.


**_DAVResource, DAVCollection, DAVNonCollection**

Represents an existing (i.e. mapped) WebDAV resource or collection.

A _DAVResource object is created by a call to the DAVProvider.

The resource may then be used to query different attributes like ``res.name``,
``res.is_collection``, ``res.get_content_length()``, and ``res.support_etag()``.

It also implements operations, that require an *existing* resource, like:
``get_preferred_path()``, ``create_collection()``, or ``get_property_value()``.

Usage::

    res = provider.get_resource_inst(path, environ)
    if res is not None:
        print(res.getName())



**DAVProvider**

A DAV provider represents a shared WebDAV system.

There is only one provider instance per share, which is created during
server start-up. After that, the dispatcher (``request_resolver.RequestResolver``)
parses the request URL and adds it to the WSGI environment, so it
can be accessed like this::

    provider = environ["wsgidav.provider"]

The main purpose of the provider is to create _DAVResource objects for URLs::

    res = provider.get_resource_inst(path, environ)


**Supporting Objects**
The DAVProvider takes two supporting objects:

propertyManager
   An object that provides storage for dead properties assigned for webDAV resources.

   PropertyManagers must provide the methods as described in
   ``wsgidav.interfaces.propertymanagerinterface``

   See prop_man.property_manager.PropertyManager for a sample implementation
   using shelve.

lockManager
   An object that implements locking on webDAV resources.
   It contains an instance of ``LockStorageDict``

lockStorage
   An object that provides storage for locks made on webDAV resources.

   LockStorages must provide the methods as described in
   ``wsgidav.interfaces.lockmanagerinterface``

   See lock_storage for a sample implementation using shelve.

See :doc:`reference_guide` for more information about the WsgiDAV architecture.
"""
import os
import sys
import time
import traceback
from abc import ABC, abstractmethod
from typing import Optional
from urllib.parse import quote, unquote

from wsgidav import util, xml_tools
from wsgidav.dav_error import (
    HTTP_FORBIDDEN,
    HTTP_NOT_FOUND,
    DAVError,
    PRECONDITION_CODE_ProtectedProperty,
    as_DAVError,
)
from wsgidav.util import etree

__docformat__ = "reStructuredText"

_logger = util.get_module_logger(__name__)

_standardLivePropNames = [
    "{DAV:}creationdate",
    "{DAV:}displayname",
    "{DAV:}getcontenttype",
    "{DAV:}resourcetype",
    "{DAV:}getlastmodified",
    "{DAV:}getcontentlength",
    "{DAV:}getetag",
    "{DAV:}getcontentlanguage",
    # "{DAV:}source", # removed in rfc4918
]
_lockPropertyNames = ["{DAV:}lockdiscovery", "{DAV:}supportedlock"]


# ========================================================================
# _DAVResource
# ========================================================================


class _DAVResource(ABC):
    r"""Represents a single existing DAV resource instance.

    A resource may be a collection (aka 'folder') or a non-collection (aka
    'file').
    _DAVResource is the common base class for the specialized classes::

        _DAVResource
          +- DAVCollection
          \- DAVNonCollection

    Instances of this class are created through the DAVProvider::

        res = provider.get_resource_inst(path, environ)
        if res and res.is_collection():
            print(res.get_display_name())

    In the example above, res will be ``None``, if the path cannot be mapped to
    an existing resource.
    The following attributes and methods are considered 'cheap'::

        res.path
        res.provider
        res.name
        res.is_collection
        res.environ

    Querying other attributes is considered 'expensive' and may be delayed until
    the first access.

        get_content_length()
        get_content_type()
        get_creation_date()
        get_display_name()
        get_etag()
        get_used_bytes()
        get_available_bytes()
        get_last_modified()
        support_ranges()

        support_etag()
        support_modified()
        support_content_length()

    These functions return ``None``, if the property is not available, or
    not supported.

    See also DAVProvider.get_resource_inst().
    """

    def __init__(self, path: str, is_collection: bool, environ: dict):
        assert util.is_str(path)
        assert path == "" or path.startswith("/")
        self.provider: DAVProvider = environ["wsgidav.provider"]
        self.path: str = path
        self.is_collection: bool = is_collection
        self.environ: dict = environ
        self.name: str = util.get_uri_name(self.path)

    def __repr__(self):
        return f"{self.__class__.__name__}({self.path!r})"

    #    def getContentLanguage(self):
    #        """Contains the Content-Language header returned by a GET without accept
    #        headers.
    #
    #        The getcontentlanguage property MUST be defined on any DAV compliant
    #        resource that returns the Content-Language header on a GET.
    #        """
    #        raise NotImplementedError

    @abstractmethod
    def get_content_length(self) -> Optional[int]:
        """Contains the Content-Length header returned by a GET without accept
        headers.

        The getcontentlength property MUST be defined on any DAV compliant
        resource that returns the Content-Length header in response to a GET.

        This method MUST be implemented by non-collections only.
        """
        if self.is_collection:
            return None
        raise NotImplementedError

    def get_content_type(self) -> Optional[str]:
        """Contains the Content-Type header returned by a GET without accept
        headers.

        This getcontenttype property MUST be defined on any DAV compliant
        resource that returns the Content-Type header in response to a GET.
        See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getcontenttype

        This method MUST be implemented by non-collections only.
        """
        if self.is_collection:
            return None
        raise NotImplementedError

    def get_creation_date(self) -> Optional[float]:
        """Records the time and date the resource was created.

        The creationdate property should be defined on all DAV compliant
        resources. If present, it contains a timestamp of the moment when the
        resource was created (i.e., the moment it had non-null state).

        This method SHOULD be implemented, especially by non-collections.
        """
        return None

    def get_directory_info(self):
        """Return a list of dictionaries with information for directory
        rendering.

        This default implementation return None, so the dir browser will
        traverse all members.

        This method COULD be implemented for collection resources.
        """
        assert self.is_collection
        return None

    def get_display_name(self) -> str:
        """Provides a name for the resource that is suitable for presentation to
        a user.

        The displayname property should be defined on all DAV compliant
        resources. If present, the property contains a description of the
        resource that is suitable for presentation to a user.

        This default implementation returns `name`, which is the last path
        segment.
        """
        return self.name

    def get_display_info(self):
        """Return additional info dictionary for displaying (optional).

        This information is not part of the DAV specification, but meant for use
        by the dir browser middleware.

        This default implementation returns ``{'type': '...'}``
        """
        if self.is_collection:
            return {"type": "Directory"}
        elif os.extsep in self.name:
            ext = self.name.split(os.extsep)[-1].upper()
            if len(ext) < 5:
                return {"type": f"{ext}-File"}
        return {"type": "File"}

    def get_etag(self):
        """
        See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag

        This method SHOULD be implemented, especially by non-collections.
        """
        return None

    def get_used_bytes(self) -> Optional[int]:
        """Return used bytes of the DAV collection.
        See http://www.webdav.org/specs/rfc4331.html#quota-used-bytes

        This method COULD be implemented for collection resources.
        """
        return None

    def get_available_bytes(self) -> Optional[int]:
        """Return available bytes of the DAV collection.
        See http://www.webdav.org/specs/rfc4331.html#quota-available-bytes

        This method COULD be implemented for collection resources.
        """
        return None

    def get_last_modified(self):
        """Contains the Last-Modified header returned by a GET method without
        accept headers.

        Return None, if this live property is not supported.

        Note that the last-modified date on a resource may reflect changes in
        any part of the state of the resource, not necessarily just a change to
        the response to the GET method. For example, a change in a property may
        cause the last-modified date to change. The getlastmodified property
        MUST be defined on any DAV compliant resource that returns the
        Last-Modified header in response to a GET.

        This method SHOULD be implemented, especially by non-collections.
        """
        return None

    def is_link(self):
        return False

    def set_last_modified(self, dest_path, time_stamp, *, dry_run):
        """Set last modified time for destPath to timeStamp on epoch-format"""
        raise NotImplementedError

    def support_ranges(self):
        """Return True, if this non-resource supports Range on GET requests.

        This method MUST be implemented by non-collections only.
        """
        raise NotImplementedError

    def support_content_length(self):
        """Return True, if this resource supports Content-Length.

        This default implementation checks `self.get_content_length() is None`.
        """
        return self.get_content_length() is not None

    def support_etag(self):
        """Return True, if this resource supports ETags."""
        raise NotImplementedError
        # This default implementation checks `self.get_etag() is None`.
        # return self.get_etag() is not None

    def support_modified(self):
        """Return True, if this resource supports last modified dates.

        This default implementation checks `self.get_last_modified() is None`.
        """
        return self.get_last_modified() is not None

    def get_preferred_path(self):
        """Return preferred mapping for a resource mapping.

        Different URLs may map to the same resource, e.g.:
            '/a/b' == '/A/b' == '/a/b/'
        get_preferred_path() returns the same value for all these variants, e.g.:
            '/a/b/'   (assuming resource names considered case insensitive)

        @param path: a UTF-8 encoded, unquoted byte string.
        @return: a UTF-8 encoded, unquoted byte string.
        """
        if self.path in ("", "/"):
            return "/"
        # Append '/' for collections
        if self.is_collection and not self.path.endswith("/"):
            return self.path + "/"
        # TODO: handle case-sensitivity, depending on OS
        # (FileSystemProvider could do this with os.path:
        # (?) on unix we can assume that the path already matches exactly the case of filepath
        # on windows we could use path.lower() or get the real case from the
        # file system
        return self.path

    def get_ref_url(self):
        """Return the quoted, absolute, unique URL of a resource, relative to appRoot.

        Byte string, UTF-8 encoded, quoted.
        Starts with a '/'. Collections also have a trailing '/'.

        This is basically the same as get_preferred_path, but deals with
        'virtual locations' as well.

        e.g. '/a/b' == '/A/b' == '/bykey/123' == '/byguid/abc532'

        get_ref_url() returns the same value for all these URLs, so it can be
        used as a key for locking and persistence storage.

        DAV providers that allow virtual-mappings must override this method.

        See also comments in DEVELOPERS.txt glossary.
        """
        return quote(self.provider.share_path + self.get_preferred_path())

    #    def getRefKey(self):
    #        """Return an unambiguous identifier string for a resource.
    #
    #        Since it is always unique for one resource, <refKey> is used as key for
    #        the lock- and property storage dictionaries.
    #
    #        This default implementation calls get_ref_url(), and strips a possible
    #        trailing '/'.
    #        """
    #        refKey = self.get_ref_url(path)
    #        if refKey == "/":
    #            return refKey
    #        return refKey.rstrip("/")

    def get_href(self):
        """Convert path to a URL that can be passed to XML responses.

        Byte string, UTF-8 encoded, quoted.

        See http://www.webdav.org/specs/rfc4918.html#rfc.section.8.3
        We are using the path-absolute option. i.e. starting with '/'.
        URI ; See section 3.2.1 of [RFC2068]
        """
        # Nautilus chokes, if href encodes '(' as '%28'
        # So we don't encode 'extra' and 'safe' characters (see rfc2068 3.2.1)
        safe = "/" + "!*'()," + "$-_|."
        return quote(
            self.provider.mount_path
            + self.provider.share_path
            + self.get_preferred_path(),
            safe=safe,
        )

    #    def getParent(self):
    #        """Return parent _DAVResource or None.
    #
    #        There is NO checking, if the parent is really a mapped collection.
    #        """
    #        parentpath = util.get_uri_parent(self.path)
    #        if not parentpath:
    #            return None
    #        return self.provider.get_resource_inst(parentpath)

    def get_member(self, name):
        """Return child resource with a given name (None, if not found).

        This method COULD be overridden by a derived class, for performance
        reasons.
        This default implementation calls self.provider.get_resource_inst().
        """
        raise NotImplementedError  # implemented by DAVCollecion

    def get_member_list(self):
        """Return a list of direct members (_DAVResource or derived objects).

        This default implementation calls self.get_member_names() and
        self.get_member() for each of them.
        A provider COULD overwrite this for performance reasons.
        """
        if not self.is_collection:
            raise NotImplementedError
        memberList = []
        for name in self.get_member_names():
            member = self.get_member(name)
            assert member is not None
            memberList.append(member)
        return memberList

    def get_member_names(self):
        """Return list of (direct) collection member names (UTF-8 byte strings).

        Every provider MUST provide this method for collection resources.
        """
        raise NotImplementedError

    def get_descendants(
        self,
        *,
        collections=True,
        resources=True,
        depth_first=False,
        depth="infinity",
        add_self=False,
    ):
        """Return a list _DAVResource objects of a collection (children,
        grand-children, ...).

        This default implementation calls self.get_member_list() recursively.

        This function may also be called for non-collections (with add_self=True).

        :Parameters:
            depth_first : bool
                use <False>, to list containers before content.
                (e.g. when moving / copying branches.)
                Use <True>, to list content before containers.
                (e.g. when deleting branches.)
            depth : string
                '0' | '1' | 'infinity'
        """
        assert depth in ("0", "1", "infinity")
        res = []
        if add_self and not depth_first:
            res.append(self)
        if depth != "0" and self.is_collection:
            for child in self.get_member_list():
                if not child:
                    self.get_member_list()
                want = (collections and child.is_collection) or (
                    resources and not child.is_collection
                )
                if want and not depth_first:
                    res.append(child)
                if child.is_collection and depth == "infinity":
                    res.extend(
                        child.get_descendants(
                            collections=collections,
                            resources=resources,
                            depth_first=depth_first,
                            depth=depth,
                            add_self=False,
                        )
                    )
                if want and depth_first:
                    res.append(child)
        if add_self and depth_first:
            res.append(self)
        return res

    # --- Properties ---------------------------------------------------------

    def get_property_names(self, *, is_allprop):
        """Return list of supported property names in Clark Notation.

        Note that 'allprop', despite its name, which remains for
        backward-compatibility, does not return every property, but only dead
        properties and the live properties defined in RFC4918.

        This default implementation returns a combination of:

        - Supported standard live properties in the {DAV:} namespace, if the
          related getter method returns not None.
        - {DAV:}lockdiscovery and {DAV:}supportedlock, if a lock manager is
          present
        - If a property manager is present, then a list of dead properties is
          appended

        A resource provider may override this method, to add a list of
        supported custom live property names.
        """
        # Live properties
        propNameList = []

        propNameList.append("{DAV:}resourcetype")

        if self.get_creation_date() is not None:
            propNameList.append("{DAV:}creationdate")
        if self.get_content_length() is not None:
            assert not self.is_collection
            propNameList.append("{DAV:}getcontentlength")
        if self.get_content_type() is not None:
            propNameList.append("{DAV:}getcontenttype")
        if self.get_used_bytes() is not None:
            propNameList.append("{DAV:}quota-used-bytes")
        if self.get_available_bytes() is not None:
            propNameList.append("{DAV:}quota-available-bytes")
        if self.get_last_modified() is not None:
            propNameList.append("{DAV:}getlastmodified")
        if self.get_display_name() is not None:
            propNameList.append("{DAV:}displayname")
        if self.get_etag() is not None:
            propNameList.append("{DAV:}getetag")

        # Locking properties
        if self.provider.lock_manager and not self.prevent_locking():
            propNameList.extend(_lockPropertyNames)

        # Dead properties
        if self.provider.prop_manager:
            refUrl = self.get_ref_url()
            propNameList.extend(
                self.provider.prop_manager.get_properties(refUrl, self.environ)
            )

        return propNameList

    def get_properties(self, mode, *, name_list=None):
        """Return properties as list of 2-tuples (name, value).

        If mode is 'name', then None is returned for the value.

        name
            the property name in Clark notation.
        value
            may have different types, depending on the status:
            - string or unicode: for standard property values.
            - etree.Element: for complex values.
            - DAVError in case of errors.
            - None: if mode == 'name'.

        @param mode: "allprop", "name", or "named"
        @param name_list: list of property names in Clark Notation (required for mode 'named')

        This default implementation basically calls self.get_property_names() to
        get the list of names, then call self.get_property_value on each of them.
        """
        assert mode in ("allprop", "name", "named")

        if mode in ("allprop", "name"):
            # TODO: 'allprop' could have nameList, when <include> option is
            # implemented
            assert name_list is None
            name_list = self.get_property_names(is_allprop=mode == "allprop")
        else:
            assert name_list is not None

        propList = []
        namesOnly = mode == "name"
        for name in name_list:
            try:
                if namesOnly:
                    propList.append((name, None))
                else:
                    value = self.get_property_value(name)
                    propList.append((name, value))
            except DAVError as e:
                propList.append((name, e))
            except Exception as e:
                propList.append((name, as_DAVError(e)))
                if self.provider.verbose >= 2:
                    traceback.print_exc(10, sys.stdout)

        return propList

    def get_property_value(self, name):
        """Return the value of a property.

        name:
            the property name in Clark notation.
        return value:
            may have different types, depending on the status:

            - string or unicode: for standard property values.
            - lxml.etree.Element: for complex values.

            If the property is not available, a DAVError is raised.

        This default implementation handles ``{DAV:}lockdiscovery`` and
        ``{DAV:}supportedlock`` using the associated lock manager.

        All other *live* properties (i.e. name starts with ``{DAV:}``) are
        delegated to the self.xxx() getters.

        Finally, other properties are considered *dead*, and are handled  by
        the associated property manager.
        """
        refUrl = self.get_ref_url()

        # lock properties
        lm = self.provider.lock_manager
        if lm and name == "{DAV:}lockdiscovery":
            # TODO: we return HTTP_NOT_FOUND if no lockmanager is present.
            # Correct?
            activelocklist = lm.get_url_lock_list(refUrl)
            lockdiscoveryEL = etree.Element(name)
            for lock in activelocklist:
                activelockEL = etree.SubElement(lockdiscoveryEL, "{DAV:}activelock")

                locktypeEL = etree.SubElement(activelockEL, "{DAV:}locktype")
                # Note: make sure `{DAV:}` is not handled as format tag:
                etree.SubElement(locktypeEL, "{}{}".format("{DAV:}", lock["type"]))

                lockscopeEL = etree.SubElement(activelockEL, "{DAV:}lockscope")
                # Note: make sure `{DAV:}` is not handled as format tag:
                etree.SubElement(lockscopeEL, "{}{}".format("{DAV:}", lock["scope"]))

                etree.SubElement(activelockEL, "{DAV:}depth").text = lock["depth"]
                if lock["owner"]:
                    # lock["owner"] is an XML string
                    # owner may be empty (#64)
                    ownerEL = xml_tools.string_to_xml(lock["owner"])
                    activelockEL.append(ownerEL)

                timeout = lock["timeout"]
                if timeout < 0:
                    timeout = "Infinite"
                else:
                    # The time remaining on the lock
                    expire = lock["expire"]
                    timeout = "Second-" + str(int(expire - time.time()))
                etree.SubElement(activelockEL, "{DAV:}timeout").text = timeout

                locktokenEL = etree.SubElement(activelockEL, "{DAV:}locktoken")
                etree.SubElement(locktokenEL, "{DAV:}href").text = lock["token"]

                # TODO: this is ugly:
                #       res.get_property_value("{DAV:}lockdiscovery")
                #
                #                lockRoot = self.get_href(self.provider.ref_url_to_path(lock["root"]))
                lockPath = self.provider.ref_url_to_path(lock["root"])
                lockRes = self.provider.get_resource_inst(lockPath, self.environ)
                # FIXME: test for None
                lockHref = lockRes.get_href()

                lockrootEL = etree.SubElement(activelockEL, "{DAV:}lockroot")
                etree.SubElement(lockrootEL, "{DAV:}href").text = lockHref

            return lockdiscoveryEL

        elif lm and name == "{DAV:}supportedlock":
            # TODO: we return HTTP_NOT_FOUND if no lockmanager is present. Correct?
            # TODO: the lockmanager should decide about it's features
            supportedlockEL = etree.Element(name)

            lockentryEL = etree.SubElement(supportedlockEL, "{DAV:}lockentry")
            lockscopeEL = etree.SubElement(lockentryEL, "{DAV:}lockscope")
            etree.SubElement(lockscopeEL, "{DAV:}exclusive")
            locktypeEL = etree.SubElement(lockentryEL, "{DAV:}locktype")
            etree.SubElement(locktypeEL, "{DAV:}write")

            lockentryEL = etree.SubElement(supportedlockEL, "{DAV:}lockentry")
            lockscopeEL = etree.SubElement(lockentryEL, "{DAV:}lockscope")
            etree.SubElement(lockscopeEL, "{DAV:}shared")
            locktypeEL = etree.SubElement(lockentryEL, "{DAV:}locktype")
            etree.SubElement(locktypeEL, "{DAV:}write")

            return supportedlockEL

        elif name.startswith("{DAV:}"):
            # Standard live property (raises HTTP_NOT_FOUND if not supported)
            if name == "{DAV:}creationdate" and self.get_creation_date() is not None:
                # Note: uses RFC3339 format (ISO 8601)
                return util.get_rfc3339_time(self.get_creation_date())
            elif name == "{DAV:}getcontenttype" and self.get_content_type() is not None:
                return self.get_content_type()
            elif name == "{DAV:}resourcetype":
                if self.is_collection:
                    resourcetypeEL = etree.Element(name)
                    etree.SubElement(resourcetypeEL, "{DAV:}collection")
                    return resourcetypeEL
                return ""
            elif name == "{DAV:}quota-used-bytes":
                return self.get_used_bytes()
            elif name == "{DAV:}quota-available-bytes":
                return self.get_available_bytes()
            elif (
                name == "{DAV:}getlastmodified" and self.get_last_modified() is not None
            ):
                # Note: uses RFC1123 format
                return util.get_rfc1123_time(self.get_last_modified())
            elif (
                name == "{DAV:}getcontentlength"
                and self.get_content_length() is not None
            ):
                # Note: must be a numeric string
                return str(self.get_content_length())
            elif name == "{DAV:}getetag" and self.get_etag() is not None:
                return self.get_etag()
            elif name == "{DAV:}displayname" and self.get_display_name() is not None:
                return self.get_display_name()

            # Unsupported, no persistence available, or property not found
            raise DAVError(HTTP_NOT_FOUND)

        # Dead property
        pm = self.provider.prop_manager
        if pm:
            value = pm.get_property(refUrl, name, self.environ)
            if value is not None:
                return xml_tools.string_to_xml(value)

        # No persistence available, or property not found
        raise DAVError(HTTP_NOT_FOUND)

    def set_property_value(self, name, value, *, dry_run=False):
        """Set a property value or remove a property.

        value == None means 'remove property'.
        Raise HTTP_FORBIDDEN if property is read-only, or not supported.

        When dry_run is True, this function should raise errors, as in a real
        run, but MUST NOT change any data.

        This default implementation

        - raises HTTP_FORBIDDEN, if trying to modify a locking property
        - raises HTTP_FORBIDDEN, if trying to modify an immutable {DAV:}
          property
        - handles Windows' Win32LastModifiedTime to set the getlastmodified
          property, if enabled
        - stores everything else as dead property, if a property manager is
          present.
        - raises HTTP_FORBIDDEN, else

        Removing a non-existing prop is NOT an error.

        Note: RFC 4918 states that {DAV:}displayname 'SHOULD NOT be protected'

        A resource provider may override this method, to update supported custom
        live properties.
        """
        assert value is None or xml_tools.is_etree_element(value)

        if name in _lockPropertyNames:
            # Locking properties are always read-only
            raise DAVError(
                HTTP_FORBIDDEN, err_condition=PRECONDITION_CODE_ProtectedProperty
            )

        # Live property
        config = self.environ["wsgidav.config"]
        # hotfixes = util.get_dict_value(config, "hotfixes", as_dict=True)
        mutableLiveProps = config.get("mutable_live_props", [])
        # Accept custom live property updates on resources if configured.
        if (
            name.startswith("{DAV:}")
            and name in _standardLivePropNames
            and name in mutableLiveProps
        ):
            # Please note that some properties should not be mutable according
            # to RFC4918. This includes the 'getlastmodified' property, which
            # it may still make sense to make mutable in order to support time
            # stamp changes from e.g. utime calls or the touch or rsync -a
            # commands.
            if name in ("{DAV:}getlastmodified", "{DAV:}last_modified"):
                try:
                    return self.set_last_modified(
                        self.path, value.text, dry_run=dry_run
                    )
                except Exception:
                    _logger.warning(
                        f"Provider does not support set_last_modified on {self.path}."
                    )

            # Unsupported or not allowed
            raise DAVError(HTTP_FORBIDDEN)

        # Handle MS Windows Win32LastModifiedTime, if enabled.
        # Note that the WebDAV client in Win7 and earlier has issues and can't be used
        # with this so we ignore older clients. Others pre-Win10 should be tested.
        if name.startswith("{urn:schemas-microsoft-com:}"):
            agent = self.environ.get("HTTP_USER_AGENT", "None")
            hotfixes = util.get_dict_value(config, "hotfixes", as_dict=True)
            win32_emu = hotfixes.get("emulate_win32_lastmod", False)
            if win32_emu and "MiniRedir/6.1" not in agent:
                if "Win32LastModifiedTime" in name:
                    return self.set_last_modified(
                        self.path, value.text, dry_run=dry_run
                    )
                elif "Win32FileAttributes" in name:
                    return True
                elif "Win32CreationTime" in name:
                    return True
                elif "Win32LastAccessTime" in name:
                    return True

        # Dead property
        pm = self.provider.prop_manager
        if pm and not name.startswith("{DAV:}"):
            refUrl = self.get_ref_url()
            if value is None:
                return pm.remove_property(refUrl, name, dry_run, self.environ)
            else:
                value = etree.tostring(value)
                return pm.write_property(refUrl, name, value, dry_run, self.environ)

        raise DAVError(HTTP_FORBIDDEN)

    def remove_all_properties(self, *, recursive):
        """Remove all associated dead properties."""
        if self.provider.prop_manager:
            self.provider.prop_manager.remove_properties(
                self.get_ref_url(), self.environ
            )

    # --- Locking ------------------------------------------------------------

    def prevent_locking(self):
        """Return True, to prevent locking.

        This default implementation returns ``False``, so standard processing
        takes place: locking (and refreshing of locks) is implemented using
        the lock manager, if one is configured.
        """
        return False

    def is_locked(self):
        """Return True, if URI is locked."""
        if self.provider.lock_manager is None:
            return False
        return self.provider.lock_manager.is_url_locked(self.get_ref_url())

    def remove_all_locks(self, *, recursive):
        if self.provider.lock_manager:
            self.provider.lock_manager.remove_all_locks_from_url(
                self.get_ref_url(), recursive=recursive
            )

    # --- Read / write -------------------------------------------------------

    def create_empty_resource(self, name):
        """Create and return an empty (length-0) resource as member of self.

        Called for LOCK requests on unmapped URLs.

        Preconditions (to be ensured by caller):

          - this must be a collection
          - <self.path + name> must not exist
          - there must be no conflicting locks

        Returns a DAVResuource.

        This method MUST be implemented by all providers that support write
        access.
        This default implementation simply raises HTTP_FORBIDDEN.
        """
        assert self.is_collection
        raise DAVError(HTTP_FORBIDDEN)

    def create_collection(self, name):
        """Create a new collection as member of self.

        Preconditions (to be ensured by caller):

          - this must be a collection
          - <self.path + name> must not exist
          - there must be no conflicting locks

        This method MUST be implemented by all providers that support write
        access.
        This default implementation raises HTTP_FORBIDDEN.
        """
        assert self.is_collection
        raise DAVError(HTTP_FORBIDDEN)

    def get_content(self):
        """Open content as a stream for reading.

        Returns a file-like object / stream containing the contents of the
        resource specified.
        The calling application will close() the stream.

        This method MUST be implemented by all providers.
        """
        assert not self.is_collection
        raise NotImplementedError

    def begin_write(self, *, content_type=None):
        """Open content as a stream for writing.

        This method MUST be implemented by all providers that support write
        access.
        """
        assert not self.is_collection
        raise DAVError(HTTP_FORBIDDEN)

    def end_write(self, *, with_errors):
        """Called when PUT has finished writing.

        This is only a notification that MAY be handled.
        """
        return None

    def handle_delete(self):
        """Handle a DELETE request natively.

        This method is called by the DELETE handler after checking for valid
        request syntax and making sure that there are no conflicting locks and
        If-headers.
        Depending on the return value, this provider can control further
        processing:

        False:
            handle_delete() did not do anything. WsgiDAV will process the request
            by calling delete() for every resource, bottom-up.
        True:
            handle_delete() has successfully performed the DELETE request.
            HTTP_NO_CONTENT will be reported to the DAV client.
        List of errors:
            handle_delete() tried to perform the delete request, but failed
            completely or partially. A list of errors is returned like
            ``[ (<ref-url>, <DAVError>), ... ]``
            These errors will be reported to the client.
        DAVError raised:
            handle_delete() refuses to perform the delete request. The DAVError
            will be reported to the client.

        An implementation may choose to apply other semantics and return True.
        For example deleting '/by_tag/cool/myres' may simply remove the 'cool'
        tag from 'my_res'.
        In this case, the resource might still be available by other URLs, so
        locks and properties are not removed.

        This default implementation returns ``False``, so standard processing
        takes place.

        Implementation of this method is OPTIONAL.
        """
        return False

    def support_recursive_delete(self):
        """Return True, if delete() may be called on non-empty collections
        (see comments there).

        This method MUST be implemented for collections (not called on
        non-collections).
        """
        assert self.is_collection
        raise NotImplementedError

    def delete(self):
        """Remove this resource (recursive).

        Preconditions (ensured by caller):

          - there are no conflicting locks or If-headers
          - if support_recursive_delete() is False, and this is a collection,
            all members have already been deleted.

        When support_recursive_delete is True, this method must be prepared to
        handle recursive deletes. This implies that child errors must be
        reported as tuple list [ (<ref-url>, <DAVError>), ... ].
        See http://www.webdav.org/specs/rfc4918.html#delete-collections

        This function

          - removes this resource
          - if this is a non-empty collection, also removes all members.
            Note that this may only occur, if support_recursive_delete is True.
          - For recursive deletes, return a list of error tuples for all failed
            resource paths.
          - removes associated direct locks
          - removes associated dead properties
          - raises HTTP_FORBIDDEN for read-only resources
          - raises HTTP_INTERNAL_ERROR on error

        This method MUST be implemented by all providers that support write
        access.
        """
        raise NotImplementedError

    def handle_copy(self, dest_path, *, depth_infinity):
        """Handle a COPY request natively.

        This method is called by the COPY handler after checking for valid
        request syntax and making sure that there are no conflicting locks and
        If-headers.
        Depending on the return value, this provider can control further
        processing:

        False:
            handle_copy() did not do anything. WsgiDAV will process the request
            by calling copy_move_single() for every resource, bottom-up.
        True:
            handle_copy() has successfully performed the COPY request.
            HTTP_NO_CONTENT/HTTP_CREATED will be reported to the DAV client.
        List of errors:
            handle_copy() tried to perform the copy request, but failed
            completely or partially. A list of errors is returned like
            ``[ (<ref-url>, <DAVError>), ... ]``
            These errors will be reported to the client.
        DAVError raised:
            handle_copy() refuses to perform the copy request. The DAVError
            will be reported to the client.

        An implementation may choose to apply other semantics and return True.
        For example copying '/by_tag/cool/myres' to '/by_tag/hot/myres' may
        simply add a 'hot' tag.
        In this case, the resource might still be available by other URLs, so
        locks and properties are not removed.

        This default implementation returns ``False``, so standard processing
        takes place.

        Implementation of this method is OPTIONAL.
        """
        return False

    def copy_move_single(self, dest_path, *, is_move):
        """Copy or move this resource to destPath (non-recursive).

        Preconditions (ensured by caller):

          - there must not be any conflicting locks on destination
          - overwriting is only allowed (i.e. destPath exists), when source and
            dest are of the same type ((non-)collections) and a Overwrite='T'
            was passed
          - destPath must not be a child path of this resource

        This function

          - Overwrites non-collections content, if destination exists.
          - MUST NOT copy collection members.
          - MUST NOT copy locks.
          - SHOULD copy live properties, when appropriate.
            E.g. displayname should be copied, but creationdate should be
            reset if the target did not exist before.
            See http://www.webdav.org/specs/rfc4918.html#dav.properties
          - SHOULD copy dead properties.
          - raises HTTP_FORBIDDEN for read-only providers
          - raises HTTP_INTERNAL_ERROR on error

        When is_move is True,

          - Live properties should be moved too (e.g. creationdate)
          - Non-collections must be moved, not copied
          - For collections, this function behaves like in copy-mode:
            destination collection must be created and properties are copied.
            Members are NOT created.
            The source collection MUST NOT be removed.

        This method MUST be implemented by all providers that support write
        access.
        """
        raise NotImplementedError

    def handle_move(self, dest_path):
        """Handle a MOVE request natively.

        This method is called by the MOVE handler after checking for valid
        request syntax and making sure that there are no conflicting locks and
        If-headers.
        Depending on the return value, this provider can control further
        processing:

        False:
            handle_move() did not do anything. WsgiDAV will process the request
            by calling delete() and copy_move_single() for every resource,
            bottom-up.
        True:
            handle_move() has successfully performed the MOVE request.
            HTTP_NO_CONTENT/HTTP_CREATED will be reported to the DAV client.
        List of errors:
            handle_move() tried to perform the move request, but failed
            completely or partially. A list of errors is returned like
            ``[ (<ref-url>, <DAVError>), ... ]``
            These errors will be reported to the client.
        DAVError raised:
            handle_move() refuses to perform the move request. The DAVError
            will be reported to the client.

        An implementation may choose to apply other semantics and return True.
        For example moving '/by_tag/cool/myres' to '/by_tag/hot/myres' may
        simply remove the 'cool' tag from 'my_res' and add a 'hot' tag instead.
        In this case, the resource might still be available by other URLs, so
        locks and properties are not removed.

        This default implementation returns ``False``, so standard processing
        takes place.

        Implementation of this method is OPTIONAL.
        """
        return False

    def support_recursive_move(self, dest_path):
        """Return True, if move_recursive() is available (see comments there)."""
        assert self.is_collection
        raise NotImplementedError

    def move_recursive(self, dest_path):
        """Move this resource and members to destPath.

        This method is only called, when support_recursive_move() returns True.

        MOVE is frequently used by clients to rename a file without changing its
        parent collection, so it's not appropriate to reset all live properties
        that are set at resource creation. For example, the DAV:creationdate
        property value SHOULD remain the same after a MOVE.

        Preconditions (ensured by caller):

          - there must not be any conflicting locks or If-header on source
          - there must not be any conflicting locks or If-header on destination
          - destPath must not exist
          - destPath must not be a member of this resource

        This method must be prepared to handle recursive moves. This implies
        that child errors must be reported as tuple list
        [ (<ref-url>, <DAVError>), ... ].
        See http://www.webdav.org/specs/rfc4918.html#move-collections

        This function

          - moves this resource and all members to destPath.
          - MUST NOT move associated locks.
            Instead, if the source (or children thereof) have locks, then
            these locks should be removed.
          - SHOULD maintain associated live properties, when applicable
            See http://www.webdav.org/specs/rfc4918.html#dav.properties
          - MUST maintain associated dead properties
          - raises HTTP_FORBIDDEN for read-only resources
          - raises HTTP_INTERNAL_ERROR on error

        An implementation may choose to apply other semantics.
        For example copying '/by_tag/cool/myres' to '/by_tag/new/myres' may
        simply add a 'new' tag to 'my_res'.

        This method is only called, when self.support_recursive_move() returns
        True. Otherwise, the request server implements MOVE using delete/copy.

        This method MAY be implemented in order to improve performance.
        """
        raise DAVError(HTTP_FORBIDDEN)

    def resolve(self, script_name, path_info):
        """Return a _DAVResource object for the path (None, if not found).

        `path_info`: is a URL relative to this object.

        DAVCollection.resolve() provides an implementation.
        """
        raise NotImplementedError

    def finalize_headers(self, environ, response_headers):
        """Perform custom operations on the response headers.

        This gets called before the response is started.
        It enables adding additional headers or modifying the default ones.
        """
        return None


# ========================================================================
# DAVNonCollection
# ========================================================================
[docs] class DAVNonCollection(_DAVResource): """ A DAVNonCollection is a _DAVResource, that has content (like a 'file' on a filesystem). A DAVNonCollecion is able to read and write file content. See also _DAVResource """ def __init__(self, path: str, environ: dict): super().__init__(path, False, environ)
[docs] @abstractmethod def get_content_length(self): """Returns the byte length of the content. MUST be implemented. See also _DAVResource.get_content_length() """ raise NotImplementedError
[docs] def get_content_type(self): """Contains the Content-Type header returned by a GET without accept headers. This getcontenttype property MUST be defined on any DAV compliant resource that returns the Content-Type header in response to a GET. See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getcontenttype This default implementation guesses the type from the filen name. """ return util.guess_mime_type(self.path)
[docs] @abstractmethod def get_content(self): """Open content as a stream for reading. Returns a file-like object / stream containing the contents of the resource specified. The application will close() the stream. This method MUST be implemented by all providers. """ raise NotImplementedError
[docs] @abstractmethod def get_etag(self): """ See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag This method SHOULD be implemented, especially by non-collections. Return None if not supported for this resource instance. See also `DAVNonCollection.support_etag()` and `util.get_file_etag(path)`. """
[docs] @abstractmethod def support_etag(self): """Return True, if this resource supports ETags. See also `DAVNonCollection.get_etag()`. """
[docs] def support_ranges(self): """Return True, if this non-resource supports Range on GET requests. This default implementation returns False. """ return False
[docs] def begin_write(self, *, content_type=None): """Open content as a stream for writing. This method MUST be implemented by all providers that support write access. """ raise DAVError(HTTP_FORBIDDEN)
[docs] def end_write(self, *, with_errors): """Called when PUT has finished writing. This is only a notification that MAY be handled. """ pass
[docs] def resolve(self, script_name, path_info): """Return a _DAVResource object for the path (None, if not found). Since non-collection don't have members, we return None if path is not empty. """ if path_info in ("", "/"): return self return None
# ======================================================================== # DAVCollection # ========================================================================
[docs] class DAVCollection(_DAVResource): """ A DAVCollection is a _DAVResource, that has members (like a 'folder' on a filesystem). A DAVCollecion 'knows' its members, and how to obtain them from the backend storage. There is also optional built-in support for member caching. See also _DAVResource """ def __init__(self, path: str, environ: dict) -> None: super().__init__(path, True, environ) # Allow caching of members # self.memberCache = {"enabled": False, # "expire": 10, # Purge, if not used for n seconds # "maxAge": 60, # Force purge, if older than n seconds # "created": None, # "lastUsed": None, # "members": None, # } # def _cacheSet(self, members): # if self.memberCache["enabled"]: # if not members: # # We cannot cache None, because _cacheGet() == None means 'not in cache' # members = [] # self.memberCache["created"] = self.memberCache["lastUsed"] = datetime.now() # self.memberCache["members"] = members # # def _cacheGet(self): # if not self.memberCache["enabled"]: # return None # now = datetime.now() # if (now - self.memberCache["lastUsed"]) > self.memberCache["expire"]: # return None # elif (now - self.memberCache["created"]) > self.memberCache["maxAge"]: # return None # self.memberCache["lastUsed"] = datetime.now() # return self.memberCache["members"] # # def _cachePurge(self): # self.memberCache["created"] = self.memberCache["lastUsed"] = None # self.memberCache["members"] = None # def getContentLanguage(self): # return None
[docs] def get_content_length(self) -> Optional[int]: return None
[docs] def get_content_type(self) -> Optional[str]: return None
[docs] def create_empty_resource(self, name: str) -> _DAVResource: """Create and return an empty (length-0) resource as member of self. Called for LOCK requests on unmapped URLs. Preconditions (to be ensured by caller): - this must be a collection - <self.path + name> must not exist - there must be no conflicting locks Returns a DAVResuource. This method MUST be implemented by all providers that support write access. This default implementation simply raises HTTP_FORBIDDEN. """ raise DAVError(HTTP_FORBIDDEN)
[docs] def create_collection(self, name): """Create a new collection as member of self. Preconditions (to be ensured by caller): - this must be a collection - <self.path + name> must not exist - there must be no conflicting locks This method MUST be implemented by all providers that support write access. This default implementation raises HTTP_FORBIDDEN. """ assert self.is_collection raise DAVError(HTTP_FORBIDDEN)
[docs] def get_etag(self): """ See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag For non-collections we default to None, because it is harder to implement. See also `DAVCollection.support_etag()`. """ return None
[docs] def get_member(self, name): """Return child resource with a given name (None, if not found). This method COULD be overridden by a derived class, for performance reasons. This default implementation calls self.provider.get_resource_inst(). """ assert self.is_collection return self.provider.get_resource_inst( util.join_uri(self.path, name), self.environ )
[docs] @abstractmethod def get_member_names(self): """Return list of (direct) collection member names (UTF-8 byte strings). This method MUST be implemented. """ assert self.is_collection raise NotImplementedError
[docs] def support_etag(self): """Return True, if this resource supports ETags. For non-collections we default to False, because it is harder to implement. See also `DAVCollection.get_etag()`. """ return False
[docs] def support_recursive_delete(self): """Return True, if delete() may be called on non-empty collections (see comments there). This default implementation returns False. """ return False
[docs] def delete(self): """Remove this resource (possibly recursive). This method MUST be implemented if resource allows write access. See _DAVResource.delete() """ raise DAVError(HTTP_FORBIDDEN)
[docs] def copy_move_single(self, dest_path, *, is_move): """Copy or move this resource to destPath (non-recursive). This method MUST be implemented if resource allows write access. See _DAVResource.copy_move_single() """ raise DAVError(HTTP_FORBIDDEN)
[docs] def support_recursive_move(self, dest_path): """Return True, if move_recursive() is available (see comments there).""" return False
[docs] def move_recursive(self, dest_path): """Move this resource and members to destPath. This method MAY be implemented in order to improve performance. """ raise DAVError(HTTP_FORBIDDEN)
[docs] def resolve(self, script_name, path_info): """Return a _DAVResource object for the path (None, if not found). `path_info`: is a URL relative to this object. """ if path_info in ("", "/"): return self assert path_info.startswith("/") name, rest = util.pop_path(path_info) res = self.get_member(name) if res is None or rest in ("", "/"): return res return res.resolve(util.join_uri(script_name, name), rest)
# ======================================================================== # DAVProvider # ========================================================================
[docs] class DAVProvider(ABC): """Abstract base class for DAV resource providers. There will be only one DAVProvider instance per share (not per request). """ def __init__(self): self.mount_path = "" self.share_path = None self.lock_manager = None self.prop_manager = None self.verbose = 3 self._count_get_resource_inst = 0 self._count_get_resource_inst_init = 0 # self.caseSensitiveUrls = True def __repr__(self): return self.__class__.__name__
[docs] def is_readonly(self): return False
[docs] def set_mount_path(self, mount_path): """Set application root for this resource provider. This is the value of SCRIPT_NAME, when WsgiDAVApp is called. """ assert mount_path in ("", "/") or ( mount_path.startswith("/") and not mount_path.endswith("/") ) self.mount_path = mount_path
[docs] def set_share_path(self, share_path): """Set application location for this resource provider. @param share_path: a UTF-8 encoded, unquoted byte string. """ # if isinstance(share_path, unicode): # share_path = share_path.encode("utf8") assert share_path == "" or share_path.startswith("/") if share_path == "/": share_path = "" # This allows to code 'absPath = share_path + path' assert share_path in ("", "/") or not share_path.endswith("/") self.share_path = share_path
[docs] def set_lock_manager(self, lock_manager): if lock_manager and not hasattr(lock_manager, "check_write_permission"): raise ValueError( "Must be compatible with wsgidav.lock_man.lock_manager.LockManager" ) self.lock_manager = lock_manager
[docs] def set_prop_manager(self, prop_manager): if prop_manager and not hasattr(prop_manager, "copy_properties"): raise ValueError( "Must be compatible with wsgidav.prop_man.property_manager.PropertyManager" ) self.prop_manager = prop_manager
[docs] def ref_url_to_path(self, ref_url): """Convert a refUrl to a path, by stripping the share prefix. Used to calculate the <path> from a storage key by inverting get_ref_url(). """ return "/" + unquote(util.removeprefix(ref_url, self.share_path)).lstrip("/")
[docs] @abstractmethod def get_resource_inst(self, path: str, environ: dict): """Return a _DAVResource object for path. Should be called only once per request and resource:: res = provider.get_resource_inst(path, environ) if res and not res.is_collection: print(res.get_content_type()) If <path> does not exist, None is returned. <environ> may be used by the provider to implement per-request caching. See _DAVResource for details. This method MUST be implemented. """ raise NotImplementedError
[docs] def exists(self, path: str, environ: dict): """Return True, if path maps to an existing resource. This method should only be used, if no other information is queried for <path>. Otherwise a _DAVResource should be created first. This method SHOULD be overridden by a more efficient implementation. """ return self.get_resource_inst(path, environ) is not None
[docs] def is_collection(self, path: str, environ: dict): """Return True, if path maps to an existing collection resource. This method should only be used, if no other information is queried for <path>. Otherwise a _DAVResource should be created first. """ res = self.get_resource_inst(path, environ) return res and res.is_collection
[docs] def custom_request_handler(self, environ, start_response, default_handler): """Optionally implement custom request handling. requestmethod = environ["REQUEST_METHOD"] Either - handle the request completely - do additional processing and call default_handler(environ, start_response) """ return default_handler(environ, start_response)