Source code for wsgidav.samples.hg_dav_provider

# (c) 2009-2024 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
DAV provider that publishes a Mercurial repository.

Note: This is **not** production code!

The repository is rendered as three top level collections.

edit:
    Contains the working directory, i.e. all files. This includes uncommitted
    changes and untracked new files.
    This folder is writable.
released:
    Contains the latest committed files, also known as 'tip'.
    This folder is read-only.
archive:
    Contains the last 10 revisions as sub-folders.
    This folder is read-only.

Sample layout::

    /<share>/
        edit/
            server/
                ext_server.py
            README.txt
        released/
        archive/
            19/
            18/
            ...

Supported features:

#. Copying or moving files from ``/edit/..`` to the ``/edit/..`` folder will
   result in a ``hg copy`` or ``hg rename``.
#. Deleting resources from ``/edit/..`` will result in a ``hg remove``.
#. Copying or moving files from ``/edit/..`` to the ``/released`` folder will
   result in a ``hg commit``.
   Note that the destination path is ignored, instead the source path is used.
   So a user can drag a file or folder from somewhere under the ``edit/..``
   directory and drop it directly on the ``released`` directory to commit
   changes.
#. To commit all changes, simply drag'n'drop the ``/edit`` folder on the
   ``/released`` folder.
#. Creating new collections results in creation of a file called ``.directory``,
   which is then ``hg add`` ed since Mercurial doesn't track directories.
#. Some attributes are published as live properties, such as ``{hg:}date``.


Known limitations:

#. This 'commit by drag-and-drop' only works, if the WebDAV clients produces
   MOVE or COPY requests. Alas, some clients will send PUT, MKCOL, ... sequences
   instead.
#. Adding and then removing a file without committing after the 'add' will
   leave this file on disk (untracked)
   This happens for example with lock files that Open Office Write and other
   applications will create.
#. Dragging the 'edit' folder onto 'released' with Windows File Explorer will
   remove the folder in the explorer view, although WsgiDAV did not delete it.
   This seems to be done by the client.


See:
    http://mercurial.selenic.com/wiki/MercurialApi
Requirements:
    ``easy_install mercurial`` or install the API as non-standalone version
    from here: http://mercurial.berkwood.com/
    http://mercurial.berkwood.com/binaries/mercurial-1.4.win32-py2.6.exe
"""
import os
import sys
import time
from hashlib import md5
from pprint import pprint

from wsgidav import util
from wsgidav.dav_error import HTTP_FORBIDDEN, DAVError
from wsgidav.dav_provider import DAVProvider, _DAVResource
from wsgidav.samples.dav_provider_tools import VirtualCollection

try:
    import mercurial.ui
    from mercurial import commands, hg
    from mercurial.__version__ import version as hgversion

    # from mercurial import util as hgutil
except ImportError:
    print(
        "Could not import Mercurial API. Try 'easy_install -U mercurial'.",
        file=sys.stderr,
    )
    raise

__docformat__ = "reStructuredText en"

_logger = util.get_module_logger(__name__)

BUFFER_SIZE = 8192


# ============================================================================
# HgResource
# ============================================================================
[docs] class HgResource(_DAVResource): """Abstract base class for all resources.""" def __init__(self, path, is_collection, environ, rev, localHgPath): super().__init__(path, is_collection, environ) self.rev = rev self.localHgPath = localHgPath self.absFilePath = self._getFilePath() assert "\\" not in self.localHgPath assert "/" not in self.absFilePath if is_collection: self.fctx = None else: # Change Context for the requested revision: # rev=None: current working dir # rev="tip": TIP # rev=<int>: Revision ID wdctx = self.provider.repo[self.rev] self.fctx = wdctx[self.localHgPath] # util.status("HgResource: path=%s, rev=%s, localHgPath=%s, fctx=%s" % ( # self.path, self.rev, self.localHgPath, self.fctx)) # util.status("HgResource: name=%s, dn=%s, abspath=%s" % ( # self.name, self.get_display_name(), self.absFilePath)) def _getFilePath(self, *addParts): parts = self.localHgPath.split("/") if addParts: parts.extend(addParts) return os.path.join(self.provider.repo.root, *parts) def _commit(self, message): user = self.environ.get("wsgidav.auth.user_name") or "Anonymous" commands.commit( self.provider.ui, self.provider.repo, self.localHgPath, addremove=True, user=user, message=message, ) def _check_write_access(self): """Raise HTTP_FORBIDDEN, if resource is unwritable.""" if self.rev is not None: # Only working directory may be edited raise DAVError(HTTP_FORBIDDEN)
[docs] def get_content_length(self): if self.is_collection: return None return self.fctx.size()
[docs] def get_content_type(self): if self.is_collection: return None # (mimetype, _mimeencoding) = mimetypes.guess_type(self.path) # if not mimetype: # return "application/octet-stream" # return mimetype return util.guess_mime_type(self.path)
[docs] def get_creation_date(self): # statresults = os.stat(self._file_path) # return statresults[stat.ST_CTIME] return None # TODO
[docs] def get_display_name(self): if self.is_collection or self.fctx.filerev() is None: return self.name return f"{self.name}@{self.fctx.filerev()}"
[docs] def get_etag(self): return ( md5(self.path).hexdigest() + "-" + util.to_str(self.get_last_modified()) + "-" + str(self.get_content_length()) )
[docs] def get_last_modified(self): if self.is_collection: return None # (secs, tz-ofs) return self.fctx.date()[0]
[docs] def support_ranges(self): return False
[docs] def get_member_names(self): assert self.is_collection cache = self.environ["wsgidav.hg.cache"][util.to_str(self.rev)] dirinfos = cache["dirinfos"] if self.localHgPath not in dirinfos: return [] return dirinfos[self.localHgPath][0] + dirinfos[self.localHgPath][1]
# return self.provider._listMembers(self.path)
[docs] def get_member(self, name): # Rely on provider to get member oinstances assert self.is_collection return self.provider.get_resource_inst( util.join_uri(self.path, name), self.environ )
[docs] def get_display_info(self): if self.is_collection: return {"type": "Directory"} return {"type": "File"}
[docs] def get_property_names(self, *, is_allprop): """Return list of supported property names in Clark Notation. See DAVResource.get_property_names() """ # Let base class implementation add supported live and dead properties propNameList = super().get_property_names(is_allprop=is_allprop) # Add custom live properties (report on 'allprop' and 'propnames') if self.fctx: propNameList.extend( [ "{hg:}branch", "{hg:}date", "{hg:}description", "{hg:}filerev", "{hg:}rev", "{hg:}user", ] ) return propNameList
[docs] def get_property_value(self, name): """Return the value of a property. See get_property_value() """ # Supported custom live properties if name == "{hg:}branch": return self.fctx.branch() elif name == "{hg:}date": # (secs, tz-ofs) return util.to_str(self.fctx.date()[0]) elif name == "{hg:}description": return self.fctx.description() elif name == "{hg:}filerev": return util.to_str(self.fctx.filerev()) elif name == "{hg:}rev": return util.to_str(self.fctx.rev()) elif name == "{hg:}user": return util.to_str(self.fctx.user()) # Let base class implementation report live and dead properties return super().get_property_value(name)
[docs] def set_property_value(self, name, value, dry_run=False): """Set or remove property value. See DAVResource.set_property_value() """ raise DAVError(HTTP_FORBIDDEN)
[docs] def prevent_locking(self): """Return True, to prevent locking. See prevent_locking() """ if self.rev is not None: # Only working directory may be locked return True return False
[docs] def create_empty_resource(self, name): """Create and return an empty (length-0) resource as member of self. See DAVResource.create_empty_resource() """ assert self.is_collection self._check_write_access() filepath = self._getFilePath(name) f = open(filepath, "w") f.close() commands.add(self.provider.ui, self.provider.repo, filepath) # get_resource_inst() won't work, because the cached manifest is outdated # return self.provider.get_resource_inst(self.path.rstrip("/")+"/"+name, self.environ) return HgResource( self.path.rstrip("/") + "/" + name, False, self.environ, self.rev, self.localHgPath + "/" + name, )
[docs] def create_collection(self, name): """Create a new collection as member of self. A dummy member is created, because Mercurial doesn't handle folders. """ assert self.is_collection self._check_write_access() collpath = self._getFilePath(name) os.mkdir(collpath) filepath = self._getFilePath(name, ".directory") f = open(filepath, "w") f.write("Created by WsgiDAV.") f.close() commands.add(self.provider.ui, self.provider.repo, filepath)
[docs] def get_content(self): """Open content as a stream for reading. See DAVResource.get_content() """ assert not self.is_collection d = self.fctx.data() return util.StringIO(d)
[docs] def begin_write(self, *, content_type=None): """Open content as a stream for writing. See DAVResource.begin_write() """ assert not self.is_collection self._check_write_access() mode = "wb" # GC issue 57: always store as binary # if contentType and contentType.startswith("text"): # mode = "w" return open(self.absFilePath, mode, BUFFER_SIZE)
[docs] def end_write(self, *, with_errors): """Called when PUT has finished writing. See DAVResource.end_write() """ if not with_errors: commands.add(self.provider.ui, self.provider.repo, self.localHgPath)
# def handle_delete(self): # """Handle a DELETE request natively. # # """ # self._check_write_access() # return False
[docs] def support_recursive_delete(self): """Return True, if delete() may be called on non-empty collections (see comments there).""" return True
[docs] def delete(self): """Remove this resource (recursive).""" self._check_write_access() filepath = self._getFilePath() commands.remove(self.provider.ui, self.provider.repo, filepath, force=True)
[docs] def handle_copy(self, dest_path, *, depth_infinity): """Handle a COPY request natively.""" destType, destHgPath = util.pop_path(dest_path) destHgPath = destHgPath.strip("/") ui = self.provider.ui repo = self.provider.repo _logger.info(f"handle_copy {self.localHgPath} -> {destHgPath}") if self.rev is None and destType == "edit": # COPY /edit/a/b to /edit/c/d: turn into 'hg copy -f a/b c/d' commands.copy(ui, repo, self.localHgPath, destHgPath, force=True) elif self.rev is None and destType == "released": # COPY /edit/a/b to /released/c/d # This is interpreted as 'hg commit a/b' (ignoring the dest. path) self._commit(f"WsgiDAV commit (COPY {self.path} -> {dest_path})") else: raise DAVError(HTTP_FORBIDDEN) # Return True: request was handled return True
[docs] def handle_move(self, dest_path): """Handle a MOVE request natively.""" destType, destHgPath = util.pop_path(dest_path) destHgPath = destHgPath.strip("/") ui = self.provider.ui repo = self.provider.repo _logger.info(f"handle_copy {self.localHgPath} -> {destHgPath}") if self.rev is None and destType == "edit": # MOVE /edit/a/b to /edit/c/d: turn into 'hg rename -f a/b c/d' commands.rename(ui, repo, self.localHgPath, destHgPath, force=True) elif self.rev is None and destType == "released": # MOVE /edit/a/b to /released/c/d # This is interpreted as 'hg commit a/b' (ignoring the dest. path) self._commit(f"WsgiDAV commit (MOVE {self.path} -> {dest_path})") else: raise DAVError(HTTP_FORBIDDEN) # Return True: request was handled return True
# ============================================================================ # HgResourceProvider # ============================================================================
[docs] class HgResourceProvider(DAVProvider): """ DAV provider that serves a VirtualResource derived structure. """ def __init__(self, repoRoot): super().__init__() self.repoRoot = repoRoot print("Mercurial version %s" % hgversion) self.ui = mercurial.ui.ui() self.repo = hg.repository(self.ui, repoRoot) self.ui.status("Connected to repository %s\n" % self.repo.root) self.repoRoot = self.repo.root # Some commands (remove) seem to expect cwd set to the repo # TODO: try to go along without this, because it prevents serving # multiple repos. Instead pass absolute paths to the commands. # print(os.getcwd()) os.chdir(self.repo.root) # Verify integrity of the repository _logger.warning("Verify repository '%s' tree..." % self.repo.root) commands.verify(self.ui, self.repo) # self.ui.status("Changelog: %s\n" % self.repo.changelog) print("Status:") pprint(self.repo.status()) self.repo.ui.status( "the default user_name to be used in commits: %s\n" % self.repo.ui.user_name() ) # self.repo.ui.status("a short form of user name USER %s\n" % self.repo.ui.shortuser(user)) self.ui.status("Expandpath: %s\n" % self.repo.ui.expandpath(repoRoot)) print("Working directory state summary:") self.ui.pushbuffer() commands.summary(self.ui, self.repo, remote=False) res = self.ui.popbuffer().strip() reslines = [tuple(line.split(":", 1)) for line in res.split("\n")] pprint(reslines) print("Repository state summary:") self.ui.pushbuffer() commands.identify(self.ui, self.repo, num=True, id=True, branch=True, tags=True) res = self.ui.popbuffer().strip() reslines = [tuple(line.split(":", 1)) for line in res.split("\n")] pprint(reslines) self._get_log() def _get_log(self, limit=None): """Read log entries into a list of dictionaries.""" self.ui.pushbuffer() commands.log(self.ui, self.repo, limit=limit, date=None, rev=None, user=None) res = self.ui.popbuffer().strip() logList = [] for logentry in res.split("\n\n"): log = {} logList.append(log) for line in logentry.split("\n"): k, v = line.split(":", 1) assert k in ("changeset", "tag", "user", "date", "summary") log[k.strip()] = v.strip() log["parsed_date"] = util.parse_time_string(log["date"]) local_id, unid = log["changeset"].split(":") log["local_id"] = int(local_id) log["unid"] = unid # pprint(logList) return logList def _get_repo_info(self, environ, rev, reload=False): """Return a dictionary containing all files under source control. dirinfos: Dictionary containing direct members for every collection. {folderpath: (collectionlist, filelist), ...} files: Sorted list of all file paths in the manifest. filedict: Dictionary containing all files under source control. :: {'dirinfos': {'': (['wsgidav', 'tools', 'WsgiDAV.egg-info', 'tests'], ['index.rst', 'wsgidav MAKE_DAILY_BUILD.launch', 'wsgidav run_server.py DEBUG.launch', 'wsgidav-paste.conf', ... 'setup.py']), 'wsgidav': (['addons', 'samples', 'server', 'interfaces'], ['__init__.pyc', 'dav_error.pyc', 'dav_provider.pyc', ... 'wsgidav_app.py']), }, 'files': ['.hgignore', 'ADDONS.txt', 'wsgidav/samples/mysql_dav_provider.py', ... ], 'filedict': {'.hgignore': True, 'README.txt': True, 'WsgiDAV.egg-info/PKG-INFO': True, } } """ caches = environ.setdefault("wsgidav.hg.cache", {}) if caches.get(util.to_str(rev)) is not None: _logger.debug("_get_repo_info(%s): cache hit." % rev) return caches[util.to_str(rev)] start_time = time.time() self.ui.pushbuffer() commands.manifest(self.ui, self.repo, rev) res = self.ui.popbuffer() files = [] dirinfos = {} filedict = {} for file in res.split("\n"): if file.strip() == "": continue file = file.replace("\\", "/") # add all parent directories to 'dirinfos' parents = file.split("/") if len(parents) >= 1: p1 = "" for i in range(0, len(parents) - 1): p2 = parents[i] dir = dirinfos.setdefault(p1, ([], [])) if p2 not in dir[0]: dir[0].append(p2) if p1 == "": p1 = p2 else: p1 = f"{p1}/{p2}" dirinfos.setdefault(p1, ([], []))[1].append(parents[-1]) filedict[file] = True files.sort() cache = {"files": files, "dirinfos": dirinfos, "filedict": filedict} caches[util.to_str(rev)] = cache _logger.info(f"_getRepoInfo({rev}) took {time.time() - start_time:.3f}") return cache # def _listMembers(self, path, rev=None): # """Return a list of all non-collection members""" # # Pattern for direct members: # glob = "glob:" + os.path.join(path, "*").lstrip("/") # print(glob) # self.ui.pushbuffer() # commands.status(self.ui, self.repo, # glob, # all=True) # lines = self.ui.popbuffer().strip().split("\n") # pprint(lines) # return dict
[docs] def get_resource_inst(self, path, environ): """Return HgResource object for path. See DAVProvider.get_resource_inst() """ self._count_get_resource_inst += 1 # HG expects the resource paths without leading '/' localHgPath = path.strip("/") rev = None cmd, rest = util.pop_path(path) if cmd == "": return VirtualCollection( path, environ, "root", ["edit", "released", "archive"] ) elif cmd == "edit": localHgPath = rest.strip("/") rev = None elif cmd == "released": localHgPath = rest.strip("/") rev = "tip" elif cmd == "archive": if rest == "/": # Browse /archive: return a list of revision folders: loglist = self._get_log(limit=10) members = [util.to_str(m["local_id"]) for m in loglist] return VirtualCollection(path, environ, "Revisions", members) revid, rest = util.pop_path(rest) try: int(revid) except Exception: # Tried to access /archive/anyname return None # Access /archive/19 rev = revid localHgPath = rest.strip("/") else: return None # read mercurial repo into request cache cache = self._get_repo_info(environ, rev) if localHgPath in cache["filedict"]: # It is a version controlled file return HgResource(path, False, environ, rev, localHgPath) if localHgPath in cache["dirinfos"] or localHgPath == "": # It is an existing folder return HgResource(path, True, environ, rev, localHgPath) return None