Source code for wsgidav.samples.virtual_dav_provider

# -*- coding: utf-8 -*-
# (c) 2009-2023 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Sample implementation of a DAV provider that provides a browsable,
multi-categorized resource tree.

Note that this is simply an example with no concrete real world benefit.
But it demonstrates some techniques to customize WsgiDAV.

Compared to a published file system, we have these main differences:

#. A resource like ``My doc 1`` has several attributes like ``key``,
   ``orga``, ``tags``, ``status``, ``description``.
   Also there may be a list of attached files.
#. These attributes are used to dynamically create a virtual hierarchy.
   For example, if ``status`` is ``draft``, a collection
   ``<share>/by_status/draft/`` is created and the resource is mapped to
   ``<share>/by_status/draft/My doc 1``.
#. The resource ``My doc 1`` is rendered as a collection, that contains
   some virtual descriptive files and the attached files.
#. The same resource may be referenced using different paths
   For example ``<share>/by_tag/cool/My doc 1``,
   ``<share>/by_tag/hot/My doc 1``, and ``<share>/by_key/1`` map to the same
   resource.
   Only the latter is considered the *real-path*, all others are
   *virtual-paths*.
#. The attributes are exposed as live properties, like "{virtres:}key",
   "{virtres:}tags", and "{virtres:}description".
   Some of them are even writable. Note that modifying an attribute may also
   change the dynamically created tree structure.
   For example changing "{virtres:}status" from 'draft' to 'published' will
   make the resource appear as ``<share>/by_status/published/My doc 1``.
#. This provider implements native delete/move/copy methods, to change the
   semantics of these operations for the virtual '/by_tag/' collection.
   For example issuing a DELETE on ``<share>/by_tag/cool/My doc 1`` will
   simply remove the 'cool' tag from that resource.
#. Virtual collections and artifacts cannot be locked.
   However a resource can be locked.
   For example locking ``<share>/by_tag/cool/My doc 1`` will also lock
   ``<share>/by_key/1``.
#. Some paths may be hidden, i.e. by_key is not browsable (but can be referenced)
   TODO: is this WebDAV compliant?

The *database* is a simple hard coded variable ``_resourceData``, that contains
a list of resource description dictionaries.

A resource is served as an collection, which is generated on-the-fly and
contains some virtual files (*artifacts*).

In general, a URL is interpreted like this::

  <share>/<category-type>/<category-key>/<resource-name>/<artifact-name>


An example layout::

    <share>/
        by_tag/
            cool/
                My doc 1/
                    .Info.html
                    .Info.txt
                    .Description.txt
                    MySpec.pdf
                    MySpec.doc
                My doc 2/
            hot/
                My doc 1/
                My doc 2/
            nice/
                My doc 2/
                My doc 3
        by_orga/
            development/
                My doc 3/
            marketing/
                My doc 1/
                My doc 2/
        by_status/
            draft/
                My doc 2
            published/
                My doc 1
                My doc 3
        by_key/
            1/
            2/
            3/

When accessed using WebDAV, the following URLs both return the same resource
'My doc 1'::

    <share>/by_tag/cool/My doc 1
    <share>/by_tag/hot/My doc 1
    <share>/by_key/1
"""
import os
import stat
from io import BytesIO
from urllib.parse import quote

from wsgidav import util
from wsgidav.dav_error import (
    HTTP_FORBIDDEN,
    HTTP_INTERNAL_ERROR,
    DAVError,
    PRECONDITION_CODE_ProtectedProperty,
)
from wsgidav.dav_provider import DAVCollection, DAVNonCollection, DAVProvider
from wsgidav.util import join_uri

__docformat__ = "reStructuredText en"

_logger = util.get_module_logger(__name__)

BUFFER_SIZE = 8192

# ============================================================================
# Fake hierarchical repository
# ============================================================================
"""
This is a dummy 'database', that serves as an example source for the
VirtualResourceProvider.

All files listed in resPathList are expected to exist in FILE_FOLDER.
"""
FILE_FOLDER = r"c:\temp\virtfiles"

_resourceData = [
    {
        "key": "1",
        "title": "My doc 1",
        "orga": "development",
        "tags": ["cool", "hot"],
        "status": "draft",
        "description": "This resource contains two specification files.",
        "resPathList": [
            os.path.join(FILE_FOLDER, "MySpec.doc"),
            os.path.join(FILE_FOLDER, "MySpec.pdf"),
        ],
    },
    {
        "key": "2",
        "title": "My doc 2",
        "orga": "development",
        "tags": ["cool", "nice"],
        "status": "published",
        "description": "This resource contains one file.",
        "resPathList": [os.path.join(FILE_FOLDER, "My URS.doc")],
    },
    {
        "key": "3",
        "title": "My doc (euro:\u20AC, uuml:��)".encode("utf8"),
        "orga": "marketing",
        "tags": ["nice"],
        "status": "published",
        "description": "Long text describing it",
        "resPathList": [os.path.join(FILE_FOLDER, "My URS.doc")],
    },
]


def _get_res_list_by_attr(attrName, attrVal):
    """ """
    assert attrName in RootCollection._visibleMemberNames
    if attrName == "by_status":
        return [data for data in _resourceData if data.get("status") == attrVal]
    elif attrName == "by_orga":
        resList = [data for data in _resourceData if data.get("orga") == attrVal]
    elif attrName == "by_tag":
        resList = [data for data in _resourceData if attrVal in data.get("tags")]
    return resList


def _get_res_by_key(key):
    """ """
    for data in _resourceData:
        if data["key"] == key:
            return data
    return None


# ============================================================================
#
# ============================================================================
[docs] class RootCollection(DAVCollection): """Resolve top-level requests '/'.""" _visibleMemberNames = ("by_orga", "by_tag", "by_status") _validMemberNames = _visibleMemberNames + ("by_key",) def __init__(self, environ): super().__init__("/", environ)
[docs] def get_member_names(self): return self._visibleMemberNames
[docs] def get_member(self, name): # Handle visible categories and also /by_key/... if name in self._validMemberNames: return CategoryTypeCollection(join_uri(self.path, name), self.environ) return None
[docs] class CategoryTypeCollection(DAVCollection): """Resolve '/catType' URLs, for example '/by_tag'.""" def __init__(self, path, environ): super().__init__(path, environ)
[docs] def get_display_info(self): return {"type": "Category type"}
[docs] def get_member_names(self): names = [] for data in _resourceData: if self.name == "by_status": if not data["status"] in names: names.append(data["status"]) elif self.name == "by_orga": if not data["orga"] in names: names.append(data["orga"]) elif self.name == "by_tag": for tag in data["tags"]: if tag not in names: names.append(tag) names.sort() return names
[docs] def get_member(self, name): if self.name == "by_key": data = _get_res_by_key(name) if data: return VirtualResource(join_uri(self.path, name), self.environ, data) else: return None return CategoryCollection(join_uri(self.path, name), self.environ, self.name)
[docs] class CategoryCollection(DAVCollection): """Resolve '/catType/cat' URLs, for example '/by_tag/cool'.""" def __init__(self, path, environ, catType): super().__init__(path, environ) self.catType = catType
[docs] def get_display_info(self): return {"type": "Category"}
[docs] def get_member_names(self): names = [ data["title"] for data in _get_res_list_by_attr(self.catType, self.name) ] names.sort() return names
[docs] def get_member(self, name): for data in _get_res_list_by_attr(self.catType, self.name): if data["title"] == name: return VirtualResource(join_uri(self.path, name), self.environ, data) return None
# ============================================================================ # VirtualResource # ============================================================================
[docs] class VirtualResource(DAVCollection): """A virtual 'resource', displayed as a collection of artifacts and files.""" _artifactNames = ( ".Info.txt", ".Info.html", ".Description.txt", # ".Admin.html", ) _supportedProps = [ "{virtres:}key", "{virtres:}title", "{virtres:}status", "{virtres:}orga", "{virtres:}tags", "{virtres:}description", ] def __init__(self, path, environ, data): super().__init__(path, environ) self.data = data
[docs] def get_display_info(self): return {"type": "Virtual Resource"}
[docs] def get_member_names(self): names = list(self._artifactNames) for f in self.data["resPathList"]: name = os.path.basename(f) names.append(name) return names
[docs] def get_member(self, name): if name in self._artifactNames: return VirtualArtifact(join_uri(self.path, name), self.environ, self.data) for file_path in self.data["resPathList"]: fname = os.path.basename(file_path) if fname == name: return VirtualResFile( join_uri(self.path, name), self.environ, self.data, file_path ) return None
[docs] def handle_delete(self): """Change semantic of DELETE to remove resource tags.""" # DELETE is only supported for the '/by_tag/' collection if "/by_tag/" not in self.path: raise DAVError(HTTP_FORBIDDEN) # path must be '/by_tag/<tag>/<resname>' catType, tag, _rest = util.save_split(self.path.strip("/"), "/", 2) assert catType == "by_tag" assert tag in self.data["tags"] self.data["tags"].remove(tag) return True # OK
[docs] def handle_copy(self, dest_path, *, depth_infinity): """Change semantic of COPY to add resource tags.""" # destPath must be '/by_tag/<tag>/<resname>' if "/by_tag/" not in dest_path: raise DAVError(HTTP_FORBIDDEN) catType, tag, _rest = util.save_split(dest_path.strip("/"), "/", 2) assert catType == "by_tag" if tag not in self.data["tags"]: self.data["tags"].append(tag) return True # OK
[docs] def handle_move(self, dest_path): """Change semantic of MOVE to change resource tags.""" # path and destPath must be '/by_tag/<tag>/<resname>' if "/by_tag/" not in self.path: raise DAVError(HTTP_FORBIDDEN) if "/by_tag/" not in dest_path: raise DAVError(HTTP_FORBIDDEN) catType, tag, _rest = util.save_split(self.path.strip("/"), "/", 2) assert catType == "by_tag" assert tag in self.data["tags"] self.data["tags"].remove(tag) catType, tag, _rest = util.save_split(dest_path.strip("/"), "/", 2) assert catType == "by_tag" if tag not in self.data["tags"]: self.data["tags"].append(tag) return True # OK
[docs] def get_ref_url(self): refPath = "/by_key/%s" % self.data["key"] return quote(self.provider.share_path + refPath)
[docs] def get_property_names(self, *, is_allprop): """Return list of supported property names in Clark Notation. See DAVResource.get_property_names() """ # Let base class implementation add supported live and dead properties propNameList = super().get_property_names(is_allprop=is_allprop) # Add custom live properties (report on 'allprop' and 'propnames') propNameList.extend(VirtualResource._supportedProps) return propNameList
[docs] def get_property_value(self, name): """Return the value of a property. See get_property_value() """ # Supported custom live properties if name == "{virtres:}key": return self.data["key"] elif name == "{virtres:}title": return self.data["title"] elif name == "{virtres:}status": return self.data["status"] elif name == "{virtres:}orga": return self.data["orga"] elif name == "{virtres:}tags": # 'tags' is a string list return ",".join(self.data["tags"]) elif name == "{virtres:}description": return self.data["description"] # Let base class implementation report live and dead properties return super().get_property_value(name)
[docs] def set_property_value(self, name, value, dry_run=False): """Set or remove property value. See DAVResource.set_property_value() """ if value is None: # We can never remove properties raise DAVError(HTTP_FORBIDDEN) if name == "{virtres:}tags": # value is of type etree.Element self.data["tags"] = value.text.split(",") elif name == "{virtres:}description": # value is of type etree.Element self.data["description"] = value.text elif name in VirtualResource._supportedProps: # Supported property, but read-only raise DAVError( HTTP_FORBIDDEN, err_condition=PRECONDITION_CODE_ProtectedProperty ) else: # Unsupported property raise DAVError(HTTP_FORBIDDEN) # Write OK return
# ============================================================================ # _VirtualNonCollection classes # ============================================================================ class _VirtualNonCollection(DAVNonCollection): """Abstract base class for all non-collection resources.""" def __init__(self, path, environ): super().__init__(path, environ) def get_content_length(self): return None def get_content_type(self): return None def get_creation_date(self): return None def get_display_name(self): return self.name def get_display_info(self): raise NotImplementedError def get_etag(self): return None def get_last_modified(self): return None def support_ranges(self): return False # def handle_delete(self): # raise DAVError(HTTP_FORBIDDEN) # def handle_move(self, destPath): # raise DAVError(HTTP_FORBIDDEN) # def handle_copy(self, destPath, depthInfinity): # raise DAVError(HTTP_FORBIDDEN) # ============================================================================ # VirtualArtifact # ============================================================================
[docs] class VirtualArtifact(_VirtualNonCollection): """A virtual file, containing resource descriptions.""" def __init__(self, path, environ, data): # assert name in _artifactNames super().__init__(path, environ) self.data = data
[docs] def get_content_length(self): return len(self.get_content().read())
[docs] def get_content_type(self): if self.name.endswith(".txt"): return "text/plain" return "text/html"
[docs] def get_display_info(self): return {"type": "Virtual info file"}
[docs] def prevent_locking(self): return True
[docs] def get_ref_url(self): refPath = "/by_key/%s/%s" % (self.data["key"], self.name) return quote(self.provider.share_path + refPath)
[docs] def get_content(self): fileLinks = [ "<a href='%s'>%s</a>\n" % (os.path.basename(f), f) for f in self.data["resPathList"] ] dict = self.data.copy() dict["fileLinks"] = ", ".join(fileLinks) if self.name == ".Info.html": html = ( """\ <html><head> <title>%(title)s</title> </head><body> <h1>%(title)s</h1> <table> <tr> <td>Description</td> <td>%(description)s</td> </tr><tr> <td>Status</td> <td>%(status)s</td> </tr><tr> <td>Tags</td> <td>%(tags)s</td> </tr><tr> <td>Orga unit</td> <td>%(orga)s</td> </tr><tr> <td>Files</td> <td>%(fileLinks)s</td> </tr><tr> <td>Key</td> <td>%(key)s</td> </tr> </table> <p>This is a virtual WsgiDAV resource called '%(title)s'.</p> </body></html>""" % dict ) elif self.name == ".Info.txt": lines = [ self.data["title"], "=" * len(self.data["title"]), self.data["description"], "", "Status: %s" % self.data["status"], "Orga: %8s" % self.data["orga"], "Tags: '%s'" % "', '".join(self.data["tags"]), "Key: %s" % self.data["key"], ] html = "\n".join(lines) elif self.name == ".Description.txt": html = self.data["description"] else: raise DAVError(HTTP_INTERNAL_ERROR, "Invalid artifact '%s'" % self.name) return BytesIO(util.to_bytes(html))
# ============================================================================ # VirtualResFile # ============================================================================
[docs] class VirtualResFile(_VirtualNonCollection): """Represents an existing file, that is a member of a VirtualResource.""" def __init__(self, path, environ, data, file_path): if not os.path.exists(file_path): _logger.error("VirtualResFile(%r) does not exist." % file_path) super().__init__(path, environ) self.data = data self.file_path = file_path
[docs] def get_content_length(self): statresults = os.stat(self.file_path) return statresults[stat.ST_SIZE]
[docs] def get_content_type(self): if not os.path.isfile(self.file_path): return "text/html" # (mimetype, _mimeencoding) = mimetypes.guess_type(self.file_path) # if not mimetype: # mimetype = "application/octet-stream" # return mimetype return util.guess_mime_type(self.file_path)
[docs] def get_creation_date(self): statresults = os.stat(self.file_path) return statresults[stat.ST_CTIME]
[docs] def get_display_info(self): return {"type": "Content file"}
[docs] def get_last_modified(self): statresults = os.stat(self.file_path) return statresults[stat.ST_MTIME]
[docs] def get_ref_url(self): refPath = "/by_key/%s/%s" % (self.data["key"], os.path.basename(self.file_path)) return quote(self.provider.share_path + refPath)
[docs] def get_content(self): # mime = self.get_content_type() # GC issue 57: always store as binary # if mime.startswith("text"): # return open(self.file_path, "r", BUFFER_SIZE) return open(self.file_path, "rb", BUFFER_SIZE)
# ============================================================================ # VirtualResourceProvider # ============================================================================
[docs] class VirtualResourceProvider(DAVProvider): """ DAV provider that serves a VirtualResource derived structure. """ def __init__(self): super().__init__() self.resourceData = _resourceData
[docs] def get_resource_inst(self, path, environ): """Return _VirtualResource object for path. path is expected to be categoryType/category/name/artifact for example: 'by_tag/cool/My doc 2/info.html' See DAVProvider.get_resource_inst() """ _logger.info("get_resource_inst('%s')" % path) self._count_get_resource_inst += 1 root = RootCollection(environ) return root.resolve("", path)
# class VirtualResourceProvider(DAVProvider): # """ # DAV provider that serves a VirtualResource derived structure. # """ # def __init__(self): # super().__init__() # self.resourceData = _resourceData # self.rootCollection = VirtualCollection(self, "/", environ) # # # def get_resource_inst(self, path, environ): # """Return _VirtualResource object for path. # # path is expected to be # categoryType/category/name/artifact # for example: # 'by_tag/cool/My doc 2/info.html' # # See DAVProvider.get_resource_inst() # """ # self._count_get_resource_inst += 1 # # catType, cat, resName, artifactName = util.save_split(path.strip("/"), "/", 3) # # _logger.info("get_resource_inst('%s'): catType=%s, cat=%s, resName=%s" % (path, catType, # cat, resName)) # # if catType and catType not in _alllowedCategories: # return None # # if catType == _realCategory: # # Accessing /by_key/<key> # data = _get_res_by_key(cat) # if data: # return VirtualResource(self, path, environ, data) # return None # # elif resName: # # Accessing /<catType>/<cat>/<name> or /<catType>/<cat>/<resName>/<artifactName> # res = None # for data in _get_res_list_by_attr(catType, cat): # if data["title"] == resName: # res = data # break # if not res: # return None # # Accessing /<catType>/<cat>/<name> # if artifactName in _artifactNames: # # Accessing /<catType>/<cat>/<name>/.info.html, or similar # return VirtualArtifact(self, util.join_uri(path, artifactName), environ, # res, artifactName) # elif artifactName: # # Accessing /<catType>/<cat>/<name>/<file-name> # for f in res["resPathList"]: # if artifactName == os.path.basename(f): # return VirtualResFile(self, path, environ, res, f) # return None # # Accessing /<catType>/<cat>/<name> # return VirtualResource(self, path, environ, data) # # elif cat: # # Accessing /catType/cat: return list of matching names # resList = _get_res_list_by_attr(catType, cat) # nameList = [ data["title"] for data in resList ] # return VirtualCollection(self, path, environ, nameList) # # elif catType: # # Accessing /catType/: return all possible values for this catType # if catType in _browsableCategories: # resList = [] # for data in _resourceData: # if catType == "by_status": # if not data["status"] in resList: # resList.append(data["status"]) # elif catType == "by_orga": # if not data["orga"] in resList: # resList.append(data["orga"]) # elif catType == "by_tag": # for tag in data["tags"]: # if not tag in resList: # resList.append(tag) # # return VirtualCollection(self, path, environ, resList) # # Known category type, but not browsable (e.g. 'by_key') # raise DAVError(HTTP_FORBIDDEN) # # # Accessing /: return list of categories # return VirtualCollection(self, path, environ, _browsableCategories)