Source code for wsgidav.samples.dav_provider_tools

# -*- coding: iso-8859-1 -*-
# (c) 2009-2018 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Tools that make it easier to implement custom WsgiDAV providers.
"""
import os
import stat

from wsgidav import compat, util
from wsgidav.dav_provider import DAVCollection, DAVNonCollection

__docformat__ = "reStructuredText en"

_logger = util.getModuleLogger(__name__)

# ============================================================================
# VirtualCollection
# ============================================================================


[docs]class VirtualCollection(DAVCollection): """Abstract base class for collections that contain a list of static members. Member names are passed to the constructor. getMember() is implemented by calling self.provider.getResourceInst() """ def __init__(self, path, environ, displayInfo, memberNameList): DAVCollection.__init__(self, path, environ) if compat.is_basestring(displayInfo): displayInfo = {"type": displayInfo} assert type(displayInfo) is dict assert type(memberNameList) is list self.displayInfo = displayInfo self.memberNameList = memberNameList
[docs] def getDisplayInfo(self): return self.displayInfo
[docs] def getMemberNames(self): return self.memberNameList
[docs] def preventLocking(self): """Return True, since we don't want to lock virtual collections.""" return True
[docs] def getMember(self, name): # raise NotImplementedError return self.provider.getResourceInst(util.joinUri(self.path, name), self.environ)
# ============================================================================ # _VirtualNonCollection classes # ============================================================================ class _VirtualNonCollection(DAVNonCollection): """Abstract base class for all non-collection resources.""" def __init__(self, path, environ): DAVNonCollection.__init__(self, path, False, environ) def getContentLength(self): return None def getContentType(self): return None def getCreationDate(self): return None def getDisplayName(self): return self.name def getDisplayInfo(self): raise NotImplementedError def getEtag(self): return None def getLastModified(self): return None def supportRanges(self): return False # def handleDelete(self): # raise DAVError(HTTP_FORBIDDEN) # def handleMove(self, destPath): # raise DAVError(HTTP_FORBIDDEN) # def handleCopy(self, destPath, depthInfinity): # raise DAVError(HTTP_FORBIDDEN) # ============================================================================ # VirtualTextResource # ============================================================================
[docs]class VirtualTextResource(_VirtualNonCollection): """A virtual file, containing a string.""" def __init__(self, path, environ, content, displayName=None, displayType=None): _VirtualNonCollection.__init__(self, path, environ) self.content = content self.displayName = displayName self.displayType = displayType
[docs] def getContentLength(self): return len(self.getContent().read())
[docs] def getContentType(self): if self.name.endswith(".txt"): return "text/plain" return "text/html"
[docs] def getDisplayName(self): return self.displayName or self.name
[docs] def getDisplayInfo(self): return {"type": "Virtual info file"}
[docs] def preventLocking(self): return True
# def getRefUrl(self): # refPath = "/by_key/%s/%s" % (self._data["key"], self.name) # return compat.quote(self.provider.sharePath + refPath)
[docs] def getContent(self): return compat.StringIO(self.content)
# ============================================================================ # FileResource # ============================================================================
[docs]class FileResource(_VirtualNonCollection): """Represents an existing file.""" BUFFER_SIZE = 8192 def __init__(self, path, environ, filePath): if not os.path.exists(filePath): _logger.error("FileResource({!r}) does not exist.".format(filePath)) _VirtualNonCollection.__init__(self, path, environ) self.filePath = filePath
[docs] def getContentLength(self): statresults = os.stat(self.filePath) return statresults[stat.ST_SIZE]
[docs] def getContentType(self): if not os.path.isfile(self.filePath): return "text/html" # (mimetype, _mimeencoding) = mimetypes.guess_type(self.filePath) # if not mimetype: # mimetype = "application/octet-stream" # return mimetype return util.guessMimeType(self.filePath)
[docs] def getCreationDate(self): statresults = os.stat(self.filePath) return statresults[stat.ST_CTIME]
[docs] def getDisplayInfo(self): return {"type": "File"}
[docs] def getLastModified(self): statresults = os.stat(self.filePath) return statresults[stat.ST_MTIME]
# def getRefUrl(self): # refPath = "/by_key/%s/%s" % (self._data["key"], os.path.basename(self.filePath)) # return compat.quote(self.provider.sharePath + refPath)
[docs] def getContent(self): # mime = self.getContentType() # GC issue 57: always store as binary # if mime.startswith("text"): # return open(self.filePath, "r", FileResource.BUFFER_SIZE) return open(self.filePath, "rb", FileResource.BUFFER_SIZE)
# ============================================================================ # Resolvers # ============================================================================
[docs]class DAVResolver(object): """Return a DAVResource object for a path (None, if not found).""" def __init__(self, parentResolver, name): self.parentResolver = parentResolver self.name = name
[docs] def resolve(self, scriptName, pathInfo, environ): raise NotImplementedError