# -*- coding: iso-8859-1 -*-
# (c) 2009-2018 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Tools that make it easier to implement custom WsgiDAV providers.
"""
import os
import stat
from wsgidav import compat, util
from wsgidav.dav_provider import DAVCollection, DAVNonCollection
__docformat__ = "reStructuredText en"
_logger = util.getModuleLogger(__name__)
# ============================================================================
# VirtualCollection
# ============================================================================
[docs]class VirtualCollection(DAVCollection):
"""Abstract base class for collections that contain a list of static members.
Member names are passed to the constructor.
getMember() is implemented by calling self.provider.getResourceInst()
"""
def __init__(self, path, environ, displayInfo, memberNameList):
DAVCollection.__init__(self, path, environ)
if compat.is_basestring(displayInfo):
displayInfo = {"type": displayInfo}
assert type(displayInfo) is dict
assert type(memberNameList) is list
self.displayInfo = displayInfo
self.memberNameList = memberNameList
[docs] def getDisplayInfo(self):
return self.displayInfo
[docs] def getMemberNames(self):
return self.memberNameList
[docs] def preventLocking(self):
"""Return True, since we don't want to lock virtual collections."""
return True
[docs] def getMember(self, name):
# raise NotImplementedError
return self.provider.getResourceInst(util.joinUri(self.path, name),
self.environ)
# ============================================================================
# _VirtualNonCollection classes
# ============================================================================
class _VirtualNonCollection(DAVNonCollection):
"""Abstract base class for all non-collection resources."""
def __init__(self, path, environ):
DAVNonCollection.__init__(self, path, False, environ)
def getContentLength(self):
return None
def getContentType(self):
return None
def getCreationDate(self):
return None
def getDisplayName(self):
return self.name
def getDisplayInfo(self):
raise NotImplementedError
def getEtag(self):
return None
def getLastModified(self):
return None
def supportRanges(self):
return False
# def handleDelete(self):
# raise DAVError(HTTP_FORBIDDEN)
# def handleMove(self, destPath):
# raise DAVError(HTTP_FORBIDDEN)
# def handleCopy(self, destPath, depthInfinity):
# raise DAVError(HTTP_FORBIDDEN)
# ============================================================================
# VirtualTextResource
# ============================================================================
[docs]class VirtualTextResource(_VirtualNonCollection):
"""A virtual file, containing a string."""
def __init__(self, path, environ, content,
displayName=None, displayType=None):
_VirtualNonCollection.__init__(self, path, environ)
self.content = content
self.displayName = displayName
self.displayType = displayType
[docs] def getContentLength(self):
return len(self.getContent().read())
[docs] def getContentType(self):
if self.name.endswith(".txt"):
return "text/plain"
return "text/html"
[docs] def getDisplayName(self):
return self.displayName or self.name
[docs] def getDisplayInfo(self):
return {"type": "Virtual info file"}
[docs] def preventLocking(self):
return True
# def getRefUrl(self):
# refPath = "/by_key/%s/%s" % (self._data["key"], self.name)
# return compat.quote(self.provider.sharePath + refPath)
[docs] def getContent(self):
return compat.StringIO(self.content)
# ============================================================================
# FileResource
# ============================================================================
[docs]class FileResource(_VirtualNonCollection):
"""Represents an existing file."""
BUFFER_SIZE = 8192
def __init__(self, path, environ, filePath):
if not os.path.exists(filePath):
_logger.error("FileResource({!r}) does not exist.".format(filePath))
_VirtualNonCollection.__init__(self, path, environ)
self.filePath = filePath
[docs] def getContentLength(self):
statresults = os.stat(self.filePath)
return statresults[stat.ST_SIZE]
[docs] def getContentType(self):
if not os.path.isfile(self.filePath):
return "text/html"
# (mimetype, _mimeencoding) = mimetypes.guess_type(self.filePath)
# if not mimetype:
# mimetype = "application/octet-stream"
# return mimetype
return util.guessMimeType(self.filePath)
[docs] def getCreationDate(self):
statresults = os.stat(self.filePath)
return statresults[stat.ST_CTIME]
[docs] def getDisplayInfo(self):
return {"type": "File"}
[docs] def getLastModified(self):
statresults = os.stat(self.filePath)
return statresults[stat.ST_MTIME]
# def getRefUrl(self):
# refPath = "/by_key/%s/%s" % (self._data["key"], os.path.basename(self.filePath))
# return compat.quote(self.provider.sharePath + refPath)
[docs] def getContent(self):
# mime = self.getContentType()
# GC issue 57: always store as binary
# if mime.startswith("text"):
# return open(self.filePath, "r", FileResource.BUFFER_SIZE)
return open(self.filePath, "rb", FileResource.BUFFER_SIZE)
# ============================================================================
# Resolvers
# ============================================================================
[docs]class DAVResolver(object):
"""Return a DAVResource object for a path (None, if not found)."""
def __init__(self, parentResolver, name):
self.parentResolver = parentResolver
self.name = name
[docs] def resolve(self, scriptName, pathInfo, environ):
raise NotImplementedError