From 110ab48ddedbf32e59a60fb748c9d8515c861018 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Tue, 26 Jul 2011 10:27:36 +0100 Subject: [PATCH] minor robustness improvements; added some debug logging to notifiers --- client/djrandom_client/client.py | 38 +++++++++++++++++++------ client/djrandom_client/linux_watcher.py | 5 ++++ client/djrandom_client/osx_watcher.py | 5 ++++ client/djrandom_client/upload.py | 35 ++++++++++++----------- 4 files changed, 59 insertions(+), 24 deletions(-) diff --git a/client/djrandom_client/client.py b/client/djrandom_client/client.py index 5337fa3..a3c4255 100644 --- a/client/djrandom_client/client.py +++ b/client/djrandom_client/client.py @@ -4,37 +4,58 @@ import logging import optparse import os import platform +import signal import threading +import time from djrandom_client import daemonize from djrandom_client import filescan from djrandom_client import upload from djrandom_client import utils + +# Detect platform and selectively enable inotify/fsevents watchers. +enable_watcher = True if platform.system() == 'Darwin': import djrandom_client.osx_watcher as watcher elif platform.system() == 'Linux': import djrandom_client.linux_watcher as watcher else: - raise NotImplementedError('unsupported platform') + enable_watcher = False + +log = logging.getLogger(__name__) -class InitialScan(threading.Thread): +class FullScan(threading.Thread): """Do a recursive directory scan when starting.""" - def __init__(self, basedir, queue): + def __init__(self, basedir, queue, exit_when_done=False): threading.Thread.__init__(self) self.basedir = basedir self.queue = queue + self.exit_when_done = exit_when_done def run(self): - filescan.recursive_scan(self.basedir, self.queue) + while True: + filescan.recursive_scan(self.basedir, self.queue) + if self.exit_when_done: + self.queue.put(None) + break + # Do a full scan every 3 days. + time.sleep(86400 * 3) -def run_client(server_url, music_dir, api_key): +def run_client(server_url, music_dir, api_key, run_once): upl = upload.Uploader(server_url.rstrip('/'), api_key) - wtch = watcher.Watcher(music_dir, upl.queue) - scan = InitialScan(music_dir, upl.queue) + + scan = FullScan(music_dir, upl.queue, run_once) scan.setDaemon(True) scan.start() + + if not run_once: + if enable_watcher: + wtch = watcher.Watcher(music_dir, upl.queue) + else: + log.warn('inotify/fsevents support not enabled on this platform') + upl.run() @@ -58,7 +79,8 @@ def main(): parser.error('Too many arguments') daemonize.daemonize(opts, run_client, - (opts.server_url, opts.music_dir, opts.api_key)) + (opts.server_url, opts.music_dir, opts.api_key, + opts.once)) if __name__ == '__main__': diff --git a/client/djrandom_client/linux_watcher.py b/client/djrandom_client/linux_watcher.py index 83c7603..b3f59d4 100644 --- a/client/djrandom_client/linux_watcher.py +++ b/client/djrandom_client/linux_watcher.py @@ -1,5 +1,8 @@ +import logging import pyinotify +log = logging.getLogger(__name__) + class Watcher(object): @@ -7,8 +10,10 @@ class Watcher(object): self.wm = pyinotify.WatchManager() self.wm.add_watch('/tmp', pyinotify.IN_CLOSE_WRITE, rec=True) self.notifier = pyinotify.ThreadedNotifier(self.wm, self) + self.notifier.setDaemon(True) self.notifier.start() def process_IN_CLOSE(self, event): if event.pathname.lower().endswith('.mp3'): + log.debug('event in %s: %x' % (event.pathname, event.mask)) self.queue.put(event.pathname) diff --git a/client/djrandom_client/osx_watcher.py b/client/djrandom_client/osx_watcher.py index beb54ce..580f05a 100644 --- a/client/djrandom_client/osx_watcher.py +++ b/client/djrandom_client/osx_watcher.py @@ -1,6 +1,9 @@ +import logging from fsevents import Observer, Stream from djrandom_client import filescan +log = logging.getLogger(__name__) + class Watcher(object): @@ -8,7 +11,9 @@ class Watcher(object): self.queue = queue self.observer = Observer() self.observer.schedule(Stream(self.callback, base)) + self.observer.setDaemon(True) self.observer.start() def callback(self, subpath, mask): + log.debug('event in %s: %x' % (subpath, mask)) filescan.directory_scan(subpath, self.queue) diff --git a/client/djrandom_client/upload.py b/client/djrandom_client/upload.py index edcf2b9..27880d3 100644 --- a/client/djrandom_client/upload.py +++ b/client/djrandom_client/upload.py @@ -77,19 +77,22 @@ class Uploader(object): log.info('successfully uploaded %s (%s)' % (path, sha1)) def run(self): - while True: - stats.data.set('uploading', None) - path = self.queue.get() - if path is None: - break - if self.db.has(path): - continue - stats.data.set('uploading', path) - try: - self.upload(path) - self.db.add(path) - except Exception, e: - log.error('error uploading %s: %s' % (path, str(e))) - stats.data.incr('errors') - stats.data.set('last_error', str(e)) - self.db.close() + try: + while True: + stats.data.set('uploading', None) + path = self.queue.get() + if path is None: + break + if self.db.has(path): + continue + stats.data.set('uploading', path) + try: + self.upload(path) + self.db.add(path) + except Exception, e: + log.error('error uploading %s: %s' % (path, str(e))) + stats.data.incr('errors') + stats.data.set('last_error', str(e)) + finally: + log.debug('uploader thread exiting') + self.db.close() -- GitLab