# -*- coding: utf-8 -*-
"""
server_cli
==========
:Author: Martin Wendt
:Copyright: Licensed under the MIT license, see LICENSE file in this package.
Standalone server that runs WsgiDAV.
These tasks are performed:
- Set up the configuration from defaults, configuration file, and command line
options.
- Instantiate the WsgiDAVApp object (which is a WSGI application)
- Start a WSGI server for this WsgiDAVApp object
Configuration is defined like this:
1. Get the name of a configuration file from command line option
``--config-file=FILENAME`` (or short ``-cFILENAME``).
If this option is omitted, we use ``wsgidav.yaml`` in the current
directory.
2. Set reasonable default settings.
3. If configuration file exists: read and use it to overwrite defaults.
4. If command line options are passed, use them to override settings:
``--host`` option overrides ``hostname`` setting.
``--port`` option overrides ``port`` setting.
``--root=FOLDER`` option creates a FilesystemProvider that publishes
FOLDER on the '/' share.
"""
import argparse
import copy
import logging
import os
import platform
import sys
import webbrowser
from pprint import pformat
from threading import Timer
import yaml
from wsgidav import __version__, util
from wsgidav.default_conf import DEFAULT_CONFIG, DEFAULT_VERBOSE
from wsgidav.fs_dav_provider import FilesystemProvider
from wsgidav.wsgidav_app import WsgiDAVApp
from wsgidav.xml_tools import use_lxml
try:
# Try pyjson5 first because it's faster than json5
from pyjson5 import load as json_load
except ImportError:
from json5 import load as json_load
__docformat__ = "reStructuredText"
#: Try this config files if no --config=... option is specified
DEFAULT_CONFIG_FILES = ("wsgidav.yaml", "wsgidav.json")
_logger = logging.getLogger("wsgidav")
def _get_common_info(config):
"""Calculate some common info."""
# Support SSL
ssl_certificate = util.fix_path(config.get("ssl_certificate"), config)
ssl_private_key = util.fix_path(config.get("ssl_private_key"), config)
ssl_certificate_chain = util.fix_path(config.get("ssl_certificate_chain"), config)
ssl_adapter = config.get("ssl_adapter", "builtin")
use_ssl = False
if ssl_certificate and ssl_private_key:
use_ssl = True
# _logger.info("SSL / HTTPS enabled. Adapter: {}".format(ssl_adapter))
elif ssl_certificate or ssl_private_key:
raise RuntimeError(
"Option 'ssl_certificate' and 'ssl_private_key' must be used together."
)
protocol = "https" if use_ssl else "http"
url = f"{protocol}://{config['host']}:{config['port']}"
info = {
"use_ssl": use_ssl,
"ssl_cert": ssl_certificate,
"ssl_pk": ssl_private_key,
"ssl_adapter": ssl_adapter,
"ssl_chain": ssl_certificate_chain,
"protocol": protocol,
"url": url,
}
return info
[docs]class FullExpandedPath(argparse.Action):
"""Expand user- and relative-paths"""
[docs] def __call__(self, parser, namespace, values, option_string=None):
new_val = os.path.abspath(os.path.expanduser(values))
setattr(namespace, self.dest, new_val)
def _init_command_line_options():
"""Parse command line options into a dictionary."""
description = """\
Run a WEBDAV server to share file system folders.
Examples:
Share filesystem folder '/temp' for anonymous access (no config file used):
wsgidav --port=80 --host=0.0.0.0 --root=/temp --auth=anonymous
Run using a specific configuration file:
wsgidav --port=80 --host=0.0.0.0 --config=~/my_wsgidav.yaml
If no config file is specified, the application will look for a file named
'wsgidav.yaml' in the current directory.
See
http://wsgidav.readthedocs.io/en/latest/run-configure.html
for some explanation of the configuration file format.
"""
epilog = """\
Licensed under the MIT license.
See https://github.com/mar10/wsgidav for additional information.
"""
parser = argparse.ArgumentParser(
prog="wsgidav",
description=description,
epilog=epilog,
allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"-p",
"--port",
type=int,
# default=8080,
help="port to serve on (default: 8080)",
)
parser.add_argument(
"-H", # '-h' conflicts with --help
"--host",
help=(
"host to serve from (default: localhost). 'localhost' is only "
"accessible from the local computer. Use 0.0.0.0 to make your "
"application public"
),
)
parser.add_argument(
"-r",
"--root",
dest="root_path",
action=FullExpandedPath,
help="path to a file system folder to publish as share '/'.",
)
parser.add_argument(
"--auth",
choices=("anonymous", "nt", "pam-login"),
help="quick configuration of a domain controller when no config file "
"is used",
)
parser.add_argument(
"--server",
choices=SUPPORTED_SERVERS.keys(),
# default="cheroot",
help="type of pre-installed WSGI server to use (default: cheroot).",
)
parser.add_argument(
"--ssl-adapter",
choices=("builtin", "pyopenssl"),
# default="builtin",
help="used by 'cheroot' server if SSL certificates are configured "
"(default: builtin).",
)
qv_group = parser.add_mutually_exclusive_group()
qv_group.add_argument(
"-v",
"--verbose",
action="count",
default=3,
help="increment verbosity by one (default: %(default)s, range: 0..5)",
)
qv_group.add_argument(
"-q", "--quiet", default=0, action="count", help="decrement verbosity by one"
)
qv_group = parser.add_mutually_exclusive_group()
qv_group.add_argument(
"-c",
"--config",
dest="config_file",
action=FullExpandedPath,
help=(
f"configuration file (default: {DEFAULT_CONFIG_FILES} in current directory)"
),
)
qv_group.add_argument(
"--no-config",
action="store_true",
help=f"do not try to load default {DEFAULT_CONFIG_FILES}",
)
parser.add_argument(
"--browse",
action="store_true",
help="open browser on start",
)
parser.add_argument(
"-V",
"--version",
action="store_true",
help="print version info and exit (may be combined with --verbose)",
)
args = parser.parse_args()
args.verbose -= args.quiet
del args.quiet
if args.root_path and not os.path.isdir(args.root_path):
msg = "{} is not a directory".format(args.root_path)
parser.error(msg)
if args.version:
if args.verbose >= 4:
version_info = "WsgiDAV/{} Python/{}({} bit) {}".format(
__version__,
util.PYTHON_VERSION,
"64" if sys.maxsize > 2**32 else "32",
platform.platform(aliased=True),
)
version_info += "\nPython from: {}".format(sys.executable)
else:
version_info = "{}".format(__version__)
print(version_info)
sys.exit()
if args.no_config:
pass
# ... else ignore default config files
elif args.config_file is None:
# If --config was omitted, use default (if it exists)
for filename in DEFAULT_CONFIG_FILES:
defPath = os.path.abspath(filename)
if os.path.exists(defPath):
if args.verbose >= 3:
print("Using default configuration file: {}".format(defPath))
args.config_file = defPath
break
else:
# If --config was specified convert to absolute path and assert it exists
args.config_file = os.path.abspath(args.config_file)
if not os.path.isfile(args.config_file):
parser.error(
"Could not find specified configuration file: {}".format(
args.config_file
)
)
# Convert args object to dictionary
cmdLineOpts = args.__dict__.copy()
if args.verbose >= 5:
print("Command line args:")
for k, v in cmdLineOpts.items():
print(" {:>12}: {}".format(k, v))
return cmdLineOpts, parser
def _read_config_file(config_file, _verbose):
"""Read configuration file options into a dictionary."""
config_file = os.path.abspath(config_file)
if not os.path.exists(config_file):
raise RuntimeError(f"Couldn't open configuration file {config_file!r}.")
if config_file.endswith(".json"):
with open(config_file, mode="rt", encoding="utf-8-sig") as fp:
conf = json_load(fp)
elif config_file.endswith(".yaml"):
with open(config_file, mode="rt", encoding="utf-8-sig") as fp:
conf = yaml.safe_load(fp)
else:
raise RuntimeError(
f"Unsupported config file format (expected yaml or json): {config_file}"
)
conf["_config_file"] = config_file
conf["_config_root"] = os.path.dirname(config_file)
return conf
def _init_config():
"""Setup configuration dictionary from default, command line and configuration file."""
cli_opts, parser = _init_command_line_options()
cli_verbose = cli_opts["verbose"]
# Set config defaults
config = copy.deepcopy(DEFAULT_CONFIG)
config["_config_file"] = None
config["_config_root"] = os.getcwd()
# Configuration file overrides defaults
config_file = cli_opts.get("config_file")
if config_file:
file_opts = _read_config_file(config_file, cli_verbose)
util.deep_update(config, file_opts)
if cli_verbose != DEFAULT_VERBOSE and "verbose" in file_opts:
if cli_verbose >= 2:
print(
"Config file defines 'verbose: {}' but is overridden by command line: {}.".format(
file_opts["verbose"], cli_verbose
)
)
config["verbose"] = cli_verbose
else:
if cli_verbose >= 2:
print("Running without configuration file.")
# Command line overrides file
if cli_opts.get("port"):
config["port"] = cli_opts.get("port")
if cli_opts.get("host"):
config["host"] = cli_opts.get("host")
if cli_opts.get("profile") is not None:
config["profile"] = True
if cli_opts.get("server") is not None:
config["server"] = cli_opts.get("server")
if cli_opts.get("ssl_adapter") is not None:
config["ssl_adapter"] = cli_opts.get("ssl_adapter")
# Command line overrides file only if -v or -q where passed:
if cli_opts.get("verbose") != DEFAULT_VERBOSE:
config["verbose"] = cli_opts.get("verbose")
if cli_opts.get("root_path"):
root_path = os.path.abspath(cli_opts.get("root_path"))
config["provider_mapping"]["/"] = FilesystemProvider(root_path)
if config["verbose"] >= 5:
# TODO: remove passwords from user_mapping
config_cleaned = util.purge_passwords(config)
print(
"Configuration({}):\n{}".format(
cli_opts["config_file"], pformat(config_cleaned)
)
)
if not config["provider_mapping"]:
parser.error("No DAV provider defined.")
# Quick-configuration of DomainController
auth = cli_opts.get("auth")
auth_conf = util.get_dict_value(config, "http_authenticator", as_dict=True)
if auth and auth_conf.get("domain_controller"):
parser.error(
"--auth option can only be used when no domain_controller is configured"
)
if auth == "anonymous":
if config["simple_dc"]["user_mapping"]:
parser.error(
"--auth=anonymous can only be used when no user_mapping is configured"
)
auth_conf.update(
{
"domain_controller": "wsgidav.dc.simple_dc.SimpleDomainController",
"accept_basic": True,
"accept_digest": True,
"default_to_digest": True,
}
)
config["simple_dc"]["user_mapping"] = {"*": True}
elif auth == "nt":
if config.get("nt_dc"):
parser.error(
"--auth=nt can only be used when no nt_dc settings are configured"
)
auth_conf.update(
{
"domain_controller": "wsgidav.dc.nt_dc.NTDomainController",
"accept_basic": True,
"accept_digest": False,
"default_to_digest": False,
}
)
config["nt_dc"] = {}
elif auth == "pam-login":
if config.get("pam_dc"):
parser.error(
"--auth=pam-login can only be used when no pam_dc settings are configured"
)
auth_conf.update(
{
"domain_controller": "wsgidav.dc.pam_dc.PAMDomainController",
"accept_basic": True,
"accept_digest": False,
"default_to_digest": False,
}
)
config["pam_dc"] = {"service": "login"}
# print(config)
# if cli_opts.get("reload"):
# print("Installing paste.reloader.", file=sys.stderr)
# from paste import reloader # @UnresolvedImport
# reloader.install()
# if config_file:
# # Add config file changes
# reloader.watch_file(config_file)
# # import pydevd
# # pydevd.settrace()
return cli_opts, config
def _run_cheroot(app, config, _server):
"""Run WsgiDAV using cheroot.server (https://cheroot.cherrypy.dev/)."""
try:
from cheroot import server, wsgi
except ImportError:
_logger.exception("Could not import Cheroot (https://cheroot.cherrypy.dev/).")
_logger.error("Try `pip install cheroot`.")
return False
version = wsgi.Server.version
version = f"WsgiDAV/{__version__} {version} Python {util.PYTHON_VERSION}"
wsgi.Server.version = version
info = _get_common_info(config)
# Support SSL
if info["use_ssl"]:
ssl_adapter = info["ssl_adapter"]
ssl_adapter = server.get_ssl_adapter_class(ssl_adapter)
wsgi.Server.ssl_adapter = ssl_adapter(
info["ssl_cert"], info["ssl_pk"], info["ssl_chain"]
)
_logger.info("SSL / HTTPS enabled. Adapter: {}".format(ssl_adapter))
_logger.info(f"Running {version}")
_logger.info(f"Serving on {info['url']} ...")
server_args = {
"bind_addr": (config["host"], config["port"]),
"wsgi_app": app,
"server_name": version,
# File Explorer needs lot of threads (see issue #149):
"numthreads": 50, # TODO: still required?
}
# Override or add custom args
custom_args = util.get_dict_value(config, "server_args", as_dict=True)
server_args.update(custom_args)
class PatchedServer(wsgi.Server):
STARTUP_NOTIFICATION_DELAY = 0.5
def serve(self, *args, **kwargs):
_logger.error("wsgi.Server.serve")
if startup_event and not startup_event.is_set():
Timer(self.STARTUP_NOTIFICATION_DELAY, startup_event.set).start()
_logger.error("wsgi.Server is ready")
return super().serve(*args, **kwargs)
# If the caller passed a startup event, monkey patch the server to set it
# when the request handler loop is entered
startup_event = config.get("startup_event")
if startup_event:
server = PatchedServer(**server_args)
else:
server = wsgi.Server(**server_args)
try:
server.start()
except KeyboardInterrupt:
_logger.warning("Caught Ctrl-C, shutting down...")
finally:
server.stop()
return
def _run_ext_wsgiutils(app, config, _server):
"""Run WsgiDAV using ext_wsgiutils_server from the wsgidav package."""
from wsgidav.server import ext_wsgiutils_server
_logger.warning(
"WARNING: This single threaded server (ext-wsgiutils) is not meant for production."
)
try:
ext_wsgiutils_server.serve(config, app)
except KeyboardInterrupt:
_logger.warning("Caught Ctrl-C, shutting down...")
return
# def _run_flup(app, config, server):
# """Run WsgiDAV using flup.server.fcgi (http://trac.saddi.com/flup/wiki/FlupServers)."""
# try:
# if server == "flup-fcgi":
# from flup.server.fcgi import WSGIServer
# from flup.server.fcgi import __version__ as flupver
# elif server == "flup-fcgi-fork":
# from flup.server.fcgi_fork import WSGIServer
# from flup.server.fcgi_fork import __version__ as flupver
# else:
# raise ValueError
# except ImportError:
# _logger.exception(f"Could not import {server} (https://gunicorn.org).")
# _logger.error("Try `pip install flup`.")
# return False
# version = f"{WSGIServer.__module__}/{flupver}"
# version = f"WsgiDAV/{__version__} {version} Python {util.PYTHON_VERSION}"
# _logger.info(f"Running {version} ...")
# server = WSGIServer(
# app,
# bindAddress=(config["host"], config["port"]),
# # debug=True,
# )
# try:
# server.run()
# except KeyboardInterrupt:
# _logger.warning("Caught Ctrl-C, shutting down...")
# return
def _run_gevent(app, config, server):
"""Run WsgiDAV using gevent if gevent (https://www.gevent.org).
See
https://github.com/gevent/gevent/blob/master/src/gevent/pywsgi.py#L1356
https://github.com/gevent/gevent/blob/master/src/gevent/server.py#L38
for more options.
"""
try:
import gevent
import gevent.monkey
from gevent.pywsgi import WSGIServer
except ImportError:
_logger.exception("Could not import gevent (http://www.gevent.org).")
_logger.error("Try `pip install gevent`.")
return False
gevent.monkey.patch_all()
info = _get_common_info(config)
version = f"gevent/{gevent.__version__}"
version = f"WsgiDAV/{__version__} {version} Python {util.PYTHON_VERSION}"
# Override or add custom args
server_args = {
"wsgi_app": app,
"bind_addr": (config["host"], config["port"]),
}
custom_args = util.get_dict_value(config, "server_args", as_dict=True)
server_args.update(custom_args)
if info["use_ssl"]:
dav_server = WSGIServer(
server_args["bind_addr"],
app,
keyfile=info["ssl_pk"],
certfile=info["ssl_cert"],
ca_certs=info["ssl_chain"],
)
else:
dav_server = WSGIServer(server_args["bind_addr"], app)
# If the caller passed a startup event, monkey patch the server to set it
# when the request handler loop is entered
startup_event = config.get("startup_event")
if startup_event:
def _patched_start():
dav_server.start_accepting = org_start # undo the monkey patch
org_start()
_logger.info("gevent is ready")
startup_event.set()
org_start = dav_server.start_accepting
dav_server.start_accepting = _patched_start
_logger.info(f"Running {version}")
_logger.info(f"Serving on {info['url']} ...")
try:
gevent.spawn(dav_server.serve_forever())
except KeyboardInterrupt:
_logger.warning("Caught Ctrl-C, shutting down...")
return
def _run_gunicorn(app, config, server):
"""Run WsgiDAV using Gunicorn (https://gunicorn.org)."""
try:
import gunicorn.app.base
except ImportError:
_logger.exception("Could not import Gunicorn (https://gunicorn.org).")
_logger.error("Try `pip install gunicorn` (UNIX only).")
return False
info = _get_common_info(config)
class GunicornApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
# See https://docs.gunicorn.org/en/latest/settings.html
server_args = {
"bind": "{}:{}".format(config["host"], config["port"]),
"threads": 50,
"timeout": 1200,
}
if info["use_ssl"]:
server_args.update(
{
"keyfile": info["ssl_pk"],
"certfile": info["ssl_cert"],
"ca_certs": info["ssl_chain"],
# "ssl_version": ssl_version
# "cert_reqs": ssl_cert_reqs
# "ciphers": ssl_ciphers
}
)
# Override or add custom args
custom_args = util.get_dict_value(config, "server_args", as_dict=True)
server_args.update(custom_args)
version = f"gunicorn/{gunicorn.__version__}"
version = f"WsgiDAV/{__version__} {version} Python {util.PYTHON_VERSION}"
_logger.info(f"Running {version} ...")
GunicornApplication(app, server_args).run()
def _run_paste(app, config, server):
"""Run WsgiDAV using paste.httpserver, if Paste is installed.
See http://pythonpaste.org/modules/httpserver.html for more options
"""
try:
from paste import httpserver
except ImportError:
_logger.exception(
"Could not import paste.httpserver (https://github.com/cdent/paste)."
)
_logger.error("Try `pip install paste`.")
return False
info = _get_common_info(config)
version = httpserver.WSGIHandler.server_version
version = f"WsgiDAV/{__version__} {version} Python {util.PYTHON_VERSION}"
# See http://pythonpaste.org/modules/httpserver.html for more options
server = httpserver.serve(
app,
host=config["host"],
port=config["port"],
server_version=version,
# This option enables handling of keep-alive and expect-100:
protocol_version="HTTP/1.1",
start_loop=False,
)
if config["verbose"] >= 5:
__handle_one_request = server.RequestHandlerClass.handle_one_request
def handle_one_request(self):
__handle_one_request(self)
if self.close_connection == 1:
_logger.debug("HTTP Connection : close")
else:
_logger.debug("HTTP Connection : continue")
server.RequestHandlerClass.handle_one_request = handle_one_request
_logger.info(f"Running {version} ...")
host, port = server.server_address
if host == "0.0.0.0":
_logger.info(f"Serving on 0.0.0.0:{port} view at http://127.0.0.1:{port}")
else:
_logger.info(f"Serving on {info['url']}")
try:
server.serve_forever()
except KeyboardInterrupt:
_logger.warning("Caught Ctrl-C, shutting down...")
return
def _run_uvicorn(app, config, server):
"""Run WsgiDAV using Uvicorn (https://www.uvicorn.org)."""
try:
import uvicorn
except ImportError:
_logger.exception("Could not import Uvicorn (https://www.uvicorn.org).")
_logger.error("Try `pip install uvicorn`.")
return False
info = _get_common_info(config)
# See https://www.uvicorn.org/settings/
server_args = {
"interface": "wsgi",
"host": config["host"],
"port": config["port"],
# TODO: see _run_cheroot()
}
if info["use_ssl"]:
server_args.update(
{
"ssl_keyfile": info["ssl_pk"],
"ssl_certfile": info["ssl_cert"],
"ssl_ca_certs": info["ssl_chain"],
# "ssl_keyfile_password": ssl_keyfile_password
# "ssl_version": ssl_version
# "ssl_cert_reqs": ssl_cert_reqs
# "ssl_ciphers": ssl_ciphers
}
)
# Override or add custom args
custom_args = util.get_dict_value(config, "server_args", as_dict=True)
server_args.update(custom_args)
version = f"uvicorn/{uvicorn.__version__}"
version = f"WsgiDAV/{__version__} {version} Python {util.PYTHON_VERSION}"
_logger.info(f"Running {version} ...")
uvicorn.run(app, **server_args)
def _run_wsgiref(app, config, _server):
"""Run WsgiDAV using wsgiref.simple_server (https://docs.python.org/3/library/wsgiref.html)."""
from wsgiref.simple_server import WSGIRequestHandler, make_server
version = WSGIRequestHandler.server_version
version = f"WsgiDAV/{__version__} {version}" # Python {util.PYTHON_VERSION}"
_logger.info(f"Running {version} ...")
_logger.warning(
"WARNING: This single threaded server (wsgiref) is not meant for production."
)
WSGIRequestHandler.server_version = version
httpd = make_server(config["host"], config["port"], app)
# httpd.RequestHandlerClass.server_version = version
try:
httpd.serve_forever()
except KeyboardInterrupt:
_logger.warning("Caught Ctrl-C, shutting down...")
return
SUPPORTED_SERVERS = {
"cheroot": _run_cheroot,
# "cherrypy": _run__cherrypy,
"ext-wsgiutils": _run_ext_wsgiutils,
# "flup-fcgi_fork": _run_flup,
# "flup-fcgi": _run_flup,
"gevent": _run_gevent,
"gunicorn": _run_gunicorn,
"paste": _run_paste,
"uvicorn": _run_uvicorn,
"wsgiref": _run_wsgiref,
}
[docs]def run():
cli_opts, config = _init_config()
util.init_logging(config)
info = _get_common_info(config)
app = WsgiDAVApp(config)
server = config["server"]
handler = SUPPORTED_SERVERS.get(server)
if not handler:
raise RuntimeError(
"Unsupported server type {!r} (expected {!r})".format(
server, "', '".join(SUPPORTED_SERVERS.keys())
)
)
if not use_lxml and config["verbose"] >= 3:
_logger.warning(
"Could not import lxml: using xml instead (up to 10% slower). "
"Consider `pip install lxml`(see https://pypi.python.org/pypi/lxml)."
)
if cli_opts["browse"]:
BROWSE_DELAY = 2.0
def _worker():
url = info["url"]
url = url.replace("0.0.0.0", "127.0.0.1")
_logger.info(f"Starting browser on {url} ...")
webbrowser.open(url)
Timer(BROWSE_DELAY, _worker).start()
handler(app, config, server)
return
if __name__ == "__main__":
# Just in case...
from multiprocessing import freeze_support
freeze_support()
run()