Skip to content
Snippets Groups Projects
Commit 4f592fc4 authored by ale's avatar ale
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
*.pyc
*.egg-info
.coverage
.tox
include: "https://git.autistici.org/ai3/build-deb/raw/master/ci-common.yml"
stages:
- test
- build_pkgsrc
- build_pkg
- upload_pkg
run_tests:
stage: test
image: "debian:stretch"
script:
- apt-get update
- env DEBIAN_FRONTEND=noninteractive apt-get install -qy --no-install-recommends python python-dev python-pip python-tox libmariadb-dev-compat libldap2-dev libsasl2-dev default-jre-headless build-essential
- tox -e py27
LICENSE 0 → 100644
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
webdav-server
===
The WebDAV server component, meant to run in the [users-dav](https://git.autistici.org/ai3/docker/users-dav)
container. Authenticates users against the [webdav-auth authentication server](https://git.autistici.org/ai3/tools/webdav-server).
It requires Python3, and runs as a FastCGI server (using flup).
import json
import socket
class DavAuthClient(object):
def __init__(self, socketpath):
self.socketpath = socketpath
self._accounts = None
def _do_request(self, request):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(self.socketpath)
sock.sendall(json.dumps(request).encode('utf-8'))
sock.shutdown(socket.SHUT_WR)
return json.load(sock.makefile('r'))
finally:
sock.close()
def authenticate(self, dn, password):
return self._do_request({
'type': 'auth', 'dn': dn, 'password': password})
def get_accounts(self):
# Cache the account map.
if self._accounts is None:
self._accounts = self._do_request({'type': 'get_accounts'})
return self._accounts
"""FastCGI WebDAV server."""
import contextlib
import datetime
import errno
import logging
import os
import sys
from ai_webdav.client import DavAuthClient
try:
import raven
from raven.middleware import Sentry
has_sentry = True
except ImportError:
has_sentry = False
from wsgidav.fs_dav_provider import FilesystemProvider
from wsgidav.wsgidav_app import DEFAULT_CONFIG, WsgiDAVApp
from wsgidav.dc.base_dc import BaseDomainController
from wsgidav.dir_browser import WsgiDavDirBrowser
from wsgidav.error_printer import ErrorPrinter
from wsgidav.http_authenticator import HTTPAuthenticator
from wsgidav.request_resolver import RequestResolver
log = logging.getLogger('webdav')
# Authentication is not entirely straightforward, so let's explain it:
# the only fixed parameter we start with is the UNIX user ID. There
# may be more than one FTP account with that ID, so we retrieve their
# list from LDAP once when the program starts, and then we associate
# the right one with the request path on every incoming request.
# Furthermore, we only allow access to accounts whose home directory
# actually exists on the local server.
class Auth(object):
def __init__(self, authclient):
self.accounts = authclient.get_accounts()
if not self.accounts:
raise Exception('No accounts!')
log.info('accounts map: %s', str(self.accounts))
def get_provider_mapping(self):
return dict((realm, FilesystemProvider(ac['home']))
for realm, ac in self.accounts.items()
if os.path.exists(ac['home']))
class DomainController(BaseDomainController):
authclient = None
def __init__(self, wsgi_app, config):
super(DomainController, self).__init__(wsgi_app, config)
self.accounts = self.authclient.get_accounts()
self.auth_cache = set()
def __repr__(self):
return self.__class__.__name__
def get_domain_realm(self, path_info, environ):
return self._calc_realm_from_path_provider(path_info, environ)
def require_authentication(self, realmname, environ):
return True
def supports_http_digest_auth(self):
return False
def basic_auth_user(self, realmname, username, password, environ):
if (realmname, username, password) in self.auth_cache:
return True
if self._authenticate(realmname, username, password):
self.auth_cache.add((realmname, username, password))
return True
return False
def _authenticate(self, realmname, username, password):
# Find the account associated with the given realm.
account = self.accounts.get(realmname)
if not account:
return False
# Make sure that the username matches. This check enforces the
# account -> homeDirectory association.
if username != account['ftpname']:
log.error('unauthorized access to account %s from user %s',
account['ftpname'], username)
return False
# Verify the authentication credentials.
try:
return (self.authclient.authenticate(account['dn'], password) == 'ok')
except Exception as e:
log.error('unexpected exception in authenticate(): %s', e)
return False
def _abort(code, msg, start_response):
start_response('%d %s' % (code, msg), [('Content-Type', 'text/html')])
return ['<h1>%s</h1>' % msg]
def error_middleware(app):
def error_wrapper(environ, start_response):
try:
return app(environ, start_response)
except EnvironmentError as e:
# Raise 403 errors on EACCES, let other errors pass through
# (so we can examine the stack trace).
if e.errno == errno.EACCES:
return _abort(403, 'Permission denied', start_response)
raise
return error_wrapper
def create_app(config):
authclient = DavAuthClient(
config.get('auth_socket', '/var/run/authdav/auth'))
auth = Auth(authclient)
DomainController.authclient = authclient
dav_config = DEFAULT_CONFIG.copy()
dav_config.update({
'verbose': config.get('verbose', 1),
'provider_mapping': auth.get_provider_mapping(),
'http_authenticator': {
'domain_controller': 'ai_webdav.dav_server.DomainController',
'accept_basic': True,
'accept_digest': False,
'default_to_digest': False,
},
'middleware_stack': [
# Drop the DebugFilter from the default list.
ErrorPrinter,
HTTPAuthenticator,
WsgiDavDirBrowser,
RequestResolver,
],
})
return WsgiDAVApp(dav_config)
def run_fastcgi(app, config):
from flup.server.fcgi import WSGIServer
app = error_middleware(app)
if has_sentry and config.get('sentry_url'):
app = Sentry(app, client=raven.Client(config['sentry_url']))
WSGIServer(app).run()
def main():
logging.basicConfig()
config = dict(
sentry_url=os.getenv('SENTRY_URL'),
verbose=int(os.getenv('VERBOSE', '0')),
auth_socket=os.getenv('AUTH_SOCKET', '/var/run/authdav/auth'),
)
run_fastcgi(create_app(config), config)
if __name__ == '__main__':
main()
import logging
import os
import socket
import unittest
TEST_FTPNAME = 'user1'
TEST_PASS = 'password1'
TEST_DN = 'dn'
TEST_UID = 19475
class StubBase(object):
def setUp(self):
self._stubs = []
super(StubBase, self).setUp()
def tearDown(self):
super(StubBase, self).tearDown()
for baseref, attr, fn in self._stubs:
setattr(baseref, attr, fn)
def stub(self, baseref, attr, fn):
oldfn = getattr(baseref, attr)
self._stubs.append((baseref, attr, oldfn))
setattr(baseref, attr, fn)
from .client import *
def connect(*args, **kwargs):
"""connect(host, port=0, auth=None, username=None, password=None, protocol='http', path="/")"""
return Client(*args, **kwargs)
__version__ = "1.2.0"
import requests
import platform
from numbers import Number
import xml.etree.cElementTree as xml
from collections import namedtuple
try:
basestring
except NameError:
basestring = str
py_majversion, py_minversion, py_revversion = platform.python_version_tuple()
if py_majversion == '2':
from httplib import responses as HTTP_CODES
from urlparse import urlparse
else:
from http.client import responses as HTTP_CODES
from urllib.parse import urlparse
DOWNLOAD_CHUNK_SIZE_BYTES = 1 * 1024 * 1024
class WebdavException(Exception):
pass
class ConnectionFailed(WebdavException):
pass
def codestr(code):
return HTTP_CODES.get(code, 'UNKNOWN')
File = namedtuple('File', ['name', 'size', 'mtime', 'ctime', 'contenttype'])
def prop(elem, name, default=None):
child = elem.find('.//{DAV:}' + name)
return default if child is None else child.text
def elem2file(elem):
return File(
prop(elem, 'href'),
int(prop(elem, 'getcontentlength', 0)),
prop(elem, 'getlastmodified', ''),
prop(elem, 'creationdate', ''),
prop(elem, 'getcontenttype', ''),
)
class OperationFailed(WebdavException):
_OPERATIONS = dict(
HEAD = "get header",
GET = "download",
PUT = "upload",
DELETE = "delete",
MKCOL = "create directory",
PROPFIND = "list directory",
)
def __init__(self, method, path, expected_code, actual_code):
self.method = method
self.path = path
self.expected_code = expected_code
self.actual_code = actual_code
operation_name = self._OPERATIONS[method]
self.reason = 'Failed to {operation_name} "{path}"'.format(**locals())
expected_codes = (expected_code,) if isinstance(expected_code, Number) else expected_code
expected_codes_str = ", ".join('{0} {1}'.format(code, codestr(code)) for code in expected_codes)
actual_code_str = codestr(actual_code)
msg = '''\
{self.reason}.
Operation : {method} {path}
Expected code : {expected_codes_str}
Actual code : {actual_code} {actual_code_str}'''.format(**locals())
super(OperationFailed, self).__init__(msg)
class Client(object):
def __init__(self, host, port=0, auth=None, username=None, password=None,
protocol='http', verify_ssl=True, path=None, cert=None):
if not port:
port = 443 if protocol == 'https' else 80
self.baseurl = '{0}://{1}:{2}'.format(protocol, host, port)
if path:
self.baseurl = '{0}/{1}'.format(self.baseurl, path)
self.cwd = '/'
self.session = requests.session()
self.session.verify = verify_ssl
self.session.stream = True
if cert:
self.session.cert = cert
if auth:
self.session.auth = auth
elif username and password:
self.session.auth = (username, password)
def _send(self, method, path, expected_code, **kwargs):
url = self._get_url(path)
response = self.session.request(method, url, allow_redirects=False, **kwargs)
if isinstance(expected_code, Number) and response.status_code != expected_code \
or not isinstance(expected_code, Number) and response.status_code not in expected_code:
raise OperationFailed(method, path, expected_code, response.status_code)
return response
def _get_url(self, path):
path = str(path).strip()
if path.startswith('/'):
return self.baseurl + path
return "".join((self.baseurl, self.cwd, path))
def cd(self, path):
path = path.strip()
if not path:
return
stripped_path = '/'.join(part for part in path.split('/') if part) + '/'
if stripped_path == '/':
self.cwd = stripped_path
elif path.startswith('/'):
self.cwd = '/' + stripped_path
else:
self.cwd += stripped_path
def mkdir(self, path, safe=False):
expected_codes = 201 if not safe else (201, 301, 405)
self._send('MKCOL', path, expected_codes)
def mkdirs(self, path):
dirs = [d for d in path.split('/') if d]
if not dirs:
return
if path.startswith('/'):
dirs[0] = '/' + dirs[0]
old_cwd = self.cwd
try:
for dir in dirs:
try:
self.mkdir(dir, safe=True)
except Exception as e:
if e.actual_code == 409:
raise
finally:
self.cd(dir)
finally:
self.cd(old_cwd)
def rmdir(self, path, safe=False):
path = str(path).rstrip('/') + '/'
expected_codes = 204 if not safe else (204, 404)
self._send('DELETE', path, expected_codes)
def delete(self, path):
self._send('DELETE', path, 204)
def upload(self, local_path_or_fileobj, remote_path):
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'rb') as f:
self._upload(f, remote_path)
else:
self._upload(local_path_or_fileobj, remote_path)
def _upload(self, fileobj, remote_path):
self._send('PUT', remote_path, (200, 201, 204), data=fileobj)
def download(self, remote_path, local_path_or_fileobj):
response = self._send('GET', remote_path, 200, stream=True)
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'wb') as f:
self._download(f, response)
else:
self._download(local_path_or_fileobj, response)
def _download(self, fileobj, response):
for chunk in response.iter_content(DOWNLOAD_CHUNK_SIZE_BYTES):
fileobj.write(chunk)
def ls(self, remote_path='.'):
headers = {'Depth': '1'}
response = self._send('PROPFIND', remote_path, (207, 301), headers=headers)
# Redirect
if response.status_code == 301:
url = urlparse(response.headers['location'])
return self.ls(url.path)
tree = xml.fromstring(response.content)
return [elem2file(elem) for elem in tree.findall('{DAV:}response')]
def exists(self, remote_path):
response = self._send('HEAD', remote_path, (200, 301, 404))
return True if response.status_code != 404 else False
from __future__ import print_function
import mock
import shutil
import sys
import tempfile
import unittest
import urllib
from wsgi_intercept import requests_intercept, add_wsgi_intercept, remove_wsgi_intercept
try:
from StringIO import StringIO
except ImportError:
from io import BytesIO as StringIO
from ai_webdav import dav_server
from ai_webdav.test import *
from ai_webdav.test import easywebdav
from nose.tools import nottest
DAV_HOST = 'example.com'
DAV_PORT = 80
def fix_env(app):
# Oops, wsgidav is not happy if QUERY_STRING is not present at all
# in the WSGI environment, so define it to an empty string if
# necessary.
def _fix_env(environ, start_response):
if 'QUERY_STRING' not in environ:
environ['QUERY_STRING'] = ''
return app(environ, start_response)
return _fix_env
class TestDavHandler(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.socketpath = self.tmpdir + '/sock'
super(TestDavHandler, self).setUp()
# Create a test file to be accessed via WebDAV.
self.dav_root = os.path.join(self.tmpdir, 'davroot')
os.mkdir(self.dav_root)
with open(os.path.join(self.dav_root, 'test.txt'), 'w') as fd:
fd.write('testdata')
# Make getpeercred() return the right user ID.
#self.gpc_patch = mock.patch(
# 'ai_webdav.auth_server.getpeercred', return_value=(1, TEST_UID, 1))
#self.gpc_patch.start()
# Mock the get_accounts() method so that it returns a
# path that we control.
self.ga_patch = mock.patch(
'ai_webdav.client.DavAuthClient.get_accounts', return_value={
u'/dav/test': {
'dn': TEST_DN,
'ftpname': TEST_FTPNAME,
'home': self.dav_root,
},
})
self.ga_patch.start()
self.auth_patch = mock.patch(
'ai_webdav.client.DavAuthClient.authenticate')
self.auth_patch.start()
self.app = dav_server.create_app(dict(
auth_socket=self.socketpath,
verbose=0))
# easywebdav uses the Requests package.
requests_intercept.install()
add_wsgi_intercept(DAV_HOST, DAV_PORT, lambda: fix_env(self.app))
self.dav_addr = '%s:%d' % (DAV_HOST, DAV_PORT)
def tearDown(self):
remove_wsgi_intercept()
requests_intercept.uninstall()
self.ga_patch.stop()
shutil.rmtree(self.tmpdir, ignore_errors=True)
super(TestDavHandler, self).tearDown()
@mock.patch('ai_webdav.client.DavAuthClient.authenticate', return_value='ok')
def test_dav_ok(self, auth):
client = easywebdav.connect(
DAV_HOST, path='dav/test',
username=TEST_FTPNAME, password=TEST_PASS)
results = client.ls()
files_found = sorted([os.path.basename(x.name) for x in results])
self.assertEquals(['', 'test.txt'], files_found)
self.assertEquals(
'testdata',
self._download(client, 'test.txt'))
@mock.patch('ai_webdav.client.DavAuthClient.authenticate', return_value='ok')
def test_dav_fails_access_to_bad_repo(self, auth):
# Another user (even if auth is successful) should not be able
# to access the dav/test repository.
client = easywebdav.connect(
DAV_HOST, path='dav/test',
username='another very ok user', password=TEST_PASS)
self.assertRaises(easywebdav.OperationFailed, client.ls)
@nottest
@mock.patch('ai_webdav.client.DavAuthClient.authenticate', return_value='ok')
def test_dav_utf8(self, auth):
# This test is broken in py2.
print('encodings: default=%s fs=%s' % (sys.getdefaultencoding(), sys.getfilesystemencoding()))
path = u'H\xe5llo'
os.mkdir(os.path.join(self.dav_root, 'utf8'))
with open(os.path.join(self.dav_root, 'utf8', path.encode('utf-8')), 'w') as fd:
fd.write('testdata')
client = easywebdav.connect(
DAV_HOST, path='dav/test/utf8',
username=TEST_FTPNAME, password=TEST_PASS)
results = client.ls()
files_found = sorted([os.path.basename(x.name) for x in results])
self.assertEquals(['', urllib.quote(path.encode('utf-8'))], files_found)
self.assertEquals(
'testdata',
self._download(client, path.encode('utf-8')))
def _download(self, client, path):
outbuf = StringIO()
client.download(path, outbuf)
outbuf.seek(0)
return outbuf.read().decode('utf-8')
@mock.patch('ai_webdav.client.DavAuthClient.authenticate', return_value='error')
def test_dav_fails_with_bad_password(self, ldapbind):
# We use the 'right' username/password to ensure that the
# authentication fails because we've mocked the right thing.
client = easywebdav.connect(
DAV_HOST, path='dav/test',
username=TEST_FTPNAME, password=TEST_PASS)
self.assertRaises(easywebdav.OperationFailed, client.ls)
setup.py 0 → 100644
#!/usr/bin/python
from setuptools import setup, find_packages
setup(
name="ai-webdav",
version="0.3",
description="WebDAV serving stack",
author="Autistici/Inventati",
author_email="info@autistici.org",
url="https://git.autistici.org/ai3/tools/webdav-server",
install_requires=["wsgidav>=3"],
setup_requires=[],
packages=find_packages(),
package_data={},
entry_points={
'console_scripts': [
'dav.fcgi = ai_webdav.dav_server:main',
],
}
)
tox.ini 0 → 100644
[tox]
envlist = py27,py3
[testenv]
deps=
wsgi_intercept==0.9.1
requests
nose
mock
coverage
commands=
/usr/bin/env LANG=en_US.UTF-8 \
nosetests \
--with-coverage --cover-package=ai_webdav \
[]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment