# (c) 2009-2018 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Original PyFileServer (c) 2005 Ho Chun Wei.
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Implementation of a DAV provider that serves resource from a file system.
:class:`~wsgidav.fs_dav_provider.FilesystemProvider` implements a DAV resource
provider that publishes a file system.
If ``readonly=True`` is passed, write attempts will raise HTTP_FORBIDDEN.
This provider creates instances of :class:`~wsgidav.fs_dav_provider.FileResource`
and :class:`~wsgidav.fs_dav_provider.FolderResource` to represent files and
directories respectively.
"""
import os
import shutil
import stat
import sys
from wsgidav import compat, util
from wsgidav.dav_error import HTTP_FORBIDDEN, DAVError
from wsgidav.dav_provider import DAVCollection, DAVNonCollection, DAVProvider
__docformat__ = "reStructuredText"
_logger = util.getModuleLogger(__name__)
BUFFER_SIZE = 8192
# ========================================================================
# FileResource
# ========================================================================
[docs]class FileResource(DAVNonCollection):
"""Represents a single existing DAV resource instance.
See also _DAVResource, DAVNonCollection, and FilesystemProvider.
"""
def __init__(self, path, environ, filePath):
super(FileResource, self).__init__(path, environ)
self._filePath = filePath
self.filestat = os.stat(self._filePath)
# Setting the name from the file path should fix the case on Windows
self.name = os.path.basename(self._filePath)
self.name = compat.to_native(self.name)
# Getter methods for standard live properties
[docs] def getContentLength(self):
return self.filestat[stat.ST_SIZE]
[docs] def getContentType(self):
return util.guessMimeType(self.path)
[docs] def getCreationDate(self):
return self.filestat[stat.ST_CTIME]
[docs] def getDisplayName(self):
return self.name
[docs] def getEtag(self):
return util.getETag(self._filePath)
[docs] def getLastModified(self):
return self.filestat[stat.ST_MTIME]
[docs] def supportEtag(self):
return True
[docs] def supportRanges(self):
return True
[docs] def getContent(self):
"""Open content as a stream for reading.
See DAVResource.getContent()
"""
assert not self.isCollection
# GC issue 28, 57: if we open in text mode, \r\n is converted to one byte.
# So the file size reported by Windows differs from len(..), thus
# content-length will be wrong.
return open(self._filePath, "rb", BUFFER_SIZE)
[docs] def beginWrite(self, contentType=None):
"""Open content as a stream for writing.
See DAVResource.beginWrite()
"""
assert not self.isCollection
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
# _logger.debug("beginWrite: {}, {}".format(self._filePath, "wb"))
# GC issue 57: always store as binary
return open(self._filePath, "wb", BUFFER_SIZE)
[docs] def delete(self):
"""Remove this resource or collection (recursive).
See DAVResource.delete()
"""
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
os.unlink(self._filePath)
self.removeAllProperties(True)
self.removeAllLocks(True)
[docs] def copyMoveSingle(self, destPath, isMove):
"""See DAVResource.copyMoveSingle() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._locToFilePath(destPath, self.environ)
assert not util.isEqualOrChildUri(self.path, destPath)
# Copy file (overwrite, if exists)
shutil.copy2(self._filePath, fpDest)
# (Live properties are copied by copy2 or copystat)
# Copy dead properties
propMan = self.provider.propManager
if propMan:
destRes = self.provider.getResourceInst(destPath, self.environ)
if isMove:
propMan.moveProperties(self.getRefUrl(), destRes.getRefUrl(),
withChildren=False, environ=self.environ)
else:
propMan.copyProperties(self.getRefUrl(), destRes.getRefUrl(), self.environ)
[docs] def supportRecursiveMove(self, destPath):
"""Return True, if moveRecursive() is available (see comments there)."""
return True
[docs] def moveRecursive(self, destPath):
"""See DAVResource.moveRecursive() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._locToFilePath(destPath, self.environ)
assert not util.isEqualOrChildUri(self.path, destPath)
assert not os.path.exists(fpDest)
_logger.debug("moveRecursive({}, {})".format(self._filePath, fpDest))
shutil.move(self._filePath, fpDest)
# (Live properties are copied by copy2 or copystat)
# Move dead properties
if self.provider.propManager:
destRes = self.provider.getResourceInst(destPath, self.environ)
self.provider.propManager.moveProperties(self.getRefUrl(), destRes.getRefUrl(),
withChildren=True, environ=self.environ)
[docs] def setLastModified(self, destPath, timeStamp, dryRun):
"""Set last modified time for destPath to timeStamp on epoch-format"""
# Translate time from RFC 1123 to seconds since epoch format
secs = util.parseTimeString(timeStamp)
if not dryRun:
os.utime(self._filePath, (secs, secs))
return True
# ========================================================================
# FolderResource
# ========================================================================
[docs]class FolderResource(DAVCollection):
"""Represents a single existing file system folder DAV resource.
See also _DAVResource, DAVCollection, and FilesystemProvider.
"""
def __init__(self, path, environ, filePath):
super(FolderResource, self).__init__(path, environ)
self._filePath = filePath
# self._dict = None
self.filestat = os.stat(self._filePath)
# Setting the name from the file path should fix the case on Windows
self.name = os.path.basename(self._filePath)
self.name = compat.to_native(self.name) # .encode("utf8")
# Getter methods for standard live properties
[docs] def getCreationDate(self):
return self.filestat[stat.ST_CTIME]
[docs] def getDisplayName(self):
return self.name
[docs] def getDirectoryInfo(self):
return None
[docs] def getEtag(self):
return None
[docs] def getLastModified(self):
return self.filestat[stat.ST_MTIME]
[docs] def getMemberNames(self):
"""Return list of direct collection member names (utf-8 encoded).
See DAVCollection.getMemberNames()
"""
# On Windows NT/2k/XP and Unix, if path is a Unicode object, the result
# will be a list of Unicode objects.
# Undecodable filenames will still be returned as string objects
# If we don't request unicode, for example Vista may return a '?'
# instead of a special character. The name would then be unusable to
# build a distinct URL that references this resource.
nameList = []
# self._filePath is unicode, so os.listdir returns unicode as well
assert compat.is_unicode(self._filePath)
for name in os.listdir(self._filePath):
if not compat.is_unicode(name):
name = name.decode(sys.getfilesystemencoding())
assert compat.is_unicode(name)
# Skip non files (links and mount points)
fp = os.path.join(self._filePath, name)
if not os.path.isdir(fp) and not os.path.isfile(fp):
_logger.debug("Skipping non-file {!r}".format(fp))
continue
# name = name.encode("utf8")
name = compat.to_native(name)
nameList.append(name)
return nameList
[docs] def getMember(self, name):
"""Return direct collection member (DAVResource or derived).
See DAVCollection.getMember()
"""
assert compat.is_native(name), "{!r}".format(name)
fp = os.path.join(self._filePath, compat.to_unicode(name))
# name = name.encode("utf8")
path = util.joinUri(self.path, name)
if os.path.isdir(fp):
res = FolderResource(path, self.environ, fp)
elif os.path.isfile(fp):
res = FileResource(path, self.environ, fp)
else:
_logger.debug("Skipping non-file {}".format(path))
res = None
return res
# --- Read / write -------------------------------------------------------
[docs] def createEmptyResource(self, name):
"""Create an empty (length-0) resource.
See DAVResource.createEmptyResource()
"""
assert "/" not in name
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
path = util.joinUri(self.path, name)
fp = self.provider._locToFilePath(path, self.environ)
f = open(fp, "wb")
f.close()
return self.provider.getResourceInst(path, self.environ)
[docs] def createCollection(self, name):
"""Create a new collection as member of self.
See DAVResource.createCollection()
"""
assert "/" not in name
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
path = util.joinUri(self.path, name)
fp = self.provider._locToFilePath(path, self.environ)
os.mkdir(fp)
[docs] def delete(self):
"""Remove this resource or collection (recursive).
See DAVResource.delete()
"""
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
shutil.rmtree(self._filePath, ignore_errors=False)
self.removeAllProperties(True)
self.removeAllLocks(True)
[docs] def copyMoveSingle(self, destPath, isMove):
"""See DAVResource.copyMoveSingle() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._locToFilePath(destPath, self.environ)
assert not util.isEqualOrChildUri(self.path, destPath)
# Create destination collection, if not exists
if not os.path.exists(fpDest):
os.mkdir(fpDest)
try:
# may raise: [Error 5] Permission denied:
# u'C:\\temp\\litmus\\ccdest'
shutil.copystat(self._filePath, fpDest)
except Exception:
_logger.exception("Could not copy folder stats: {}".format(self._filePath))
# (Live properties are copied by copy2 or copystat)
# Copy dead properties
propMan = self.provider.propManager
if propMan:
destRes = self.provider.getResourceInst(destPath, self.environ)
if isMove:
propMan.moveProperties(self.getRefUrl(), destRes.getRefUrl(),
withChildren=False, environ=self.environ)
else:
propMan.copyProperties(self.getRefUrl(), destRes.getRefUrl(), self.environ)
[docs] def supportRecursiveMove(self, destPath):
"""Return True, if moveRecursive() is available (see comments there)."""
return True
[docs] def moveRecursive(self, destPath):
"""See DAVResource.moveRecursive() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._locToFilePath(destPath, self.environ)
assert not util.isEqualOrChildUri(self.path, destPath)
assert not os.path.exists(fpDest)
_logger.debug("moveRecursive({}, {})".format(self._filePath, fpDest))
shutil.move(self._filePath, fpDest)
# (Live properties are copied by copy2 or copystat)
# Move dead properties
if self.provider.propManager:
destRes = self.provider.getResourceInst(destPath, self.environ)
self.provider.propManager.moveProperties(self.getRefUrl(), destRes.getRefUrl(),
withChildren=True, environ=self.environ)
[docs] def setLastModified(self, destPath, timeStamp, dryRun):
"""Set last modified time for destPath to timeStamp on epoch-format"""
# Translate time from RFC 1123 to seconds since epoch format
secs = util.parseTimeString(timeStamp)
if not dryRun:
os.utime(self._filePath, (secs, secs))
return True
# ========================================================================
# FilesystemProvider
# ========================================================================
[docs]class FilesystemProvider(DAVProvider):
def __init__(self, rootFolderPath, readonly=False):
# Expand leading '~' as user home dir; expand %VAR%, $Var, ..
rootFolderPath = os.path.expandvars(os.path.expanduser(rootFolderPath))
rootFolderPath = os.path.abspath(rootFolderPath)
if not rootFolderPath or not os.path.exists(rootFolderPath):
raise ValueError("Invalid root path: {}".format(rootFolderPath))
super(FilesystemProvider, self).__init__()
self.rootFolderPath = rootFolderPath
self.readonly = readonly
def __repr__(self):
rw = "Read-Write"
if self.readonly:
rw = "Read-Only"
return "{} for path '{}' ({})".format(self.__class__.__name__, self.rootFolderPath, rw)
def _locToFilePath(self, path, environ=None):
"""Convert resource path to a unicode absolute file path.
Optional environ argument may be useful e.g. in relation to per-user
sub-folder chrooting inside rootFolderPath.
"""
root_path = self.rootFolderPath
assert root_path is not None
assert compat.is_native(root_path)
assert compat.is_native(path)
path_parts = path.strip("/").split("/")
file_path = os.path.abspath(os.path.join(root_path, *path_parts))
if not file_path.startswith(root_path):
raise RuntimeError(
"Security exception: tried to access file outside root: {}".format(file_path))
# Convert to unicode
file_path = util.toUnicode(file_path)
return file_path
[docs] def isReadOnly(self):
return self.readonly
[docs] def getResourceInst(self, path, environ):
"""Return info dictionary for path.
See DAVProvider.getResourceInst()
"""
self._count_getResourceInst += 1
fp = self._locToFilePath(path, environ)
if not os.path.exists(fp):
return None
if os.path.isdir(fp):
return FolderResource(path, environ, fp)
return FileResource(path, environ, fp)