Skip to content
Snippets Groups Projects
Commit 110ab48d authored by ale's avatar ale
Browse files

minor robustness improvements; added some debug logging to notifiers

parent 829396b4
No related branches found
No related tags found
No related merge requests found
...@@ -4,37 +4,58 @@ import logging ...@@ -4,37 +4,58 @@ import logging
import optparse import optparse
import os import os
import platform import platform
import signal
import threading import threading
import time
from djrandom_client import daemonize from djrandom_client import daemonize
from djrandom_client import filescan from djrandom_client import filescan
from djrandom_client import upload from djrandom_client import upload
from djrandom_client import utils from djrandom_client import utils
# Detect platform and selectively enable inotify/fsevents watchers.
enable_watcher = True
if platform.system() == 'Darwin': if platform.system() == 'Darwin':
import djrandom_client.osx_watcher as watcher import djrandom_client.osx_watcher as watcher
elif platform.system() == 'Linux': elif platform.system() == 'Linux':
import djrandom_client.linux_watcher as watcher import djrandom_client.linux_watcher as watcher
else: else:
raise NotImplementedError('unsupported platform') enable_watcher = False
log = logging.getLogger(__name__)
class InitialScan(threading.Thread): class FullScan(threading.Thread):
"""Do a recursive directory scan when starting.""" """Do a recursive directory scan when starting."""
def __init__(self, basedir, queue): def __init__(self, basedir, queue, exit_when_done=False):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.basedir = basedir self.basedir = basedir
self.queue = queue self.queue = queue
self.exit_when_done = exit_when_done
def run(self): def run(self):
while True:
filescan.recursive_scan(self.basedir, self.queue) filescan.recursive_scan(self.basedir, self.queue)
if self.exit_when_done:
self.queue.put(None)
break
# Do a full scan every 3 days.
time.sleep(86400 * 3)
def run_client(server_url, music_dir, api_key): def run_client(server_url, music_dir, api_key, run_once):
upl = upload.Uploader(server_url.rstrip('/'), api_key) upl = upload.Uploader(server_url.rstrip('/'), api_key)
wtch = watcher.Watcher(music_dir, upl.queue)
scan = InitialScan(music_dir, upl.queue) scan = FullScan(music_dir, upl.queue, run_once)
scan.setDaemon(True) scan.setDaemon(True)
scan.start() scan.start()
if not run_once:
if enable_watcher:
wtch = watcher.Watcher(music_dir, upl.queue)
else:
log.warn('inotify/fsevents support not enabled on this platform')
upl.run() upl.run()
...@@ -58,7 +79,8 @@ def main(): ...@@ -58,7 +79,8 @@ def main():
parser.error('Too many arguments') parser.error('Too many arguments')
daemonize.daemonize(opts, run_client, daemonize.daemonize(opts, run_client,
(opts.server_url, opts.music_dir, opts.api_key)) (opts.server_url, opts.music_dir, opts.api_key,
opts.once))
if __name__ == '__main__': if __name__ == '__main__':
......
import logging
import pyinotify import pyinotify
log = logging.getLogger(__name__)
class Watcher(object): class Watcher(object):
...@@ -7,8 +10,10 @@ class Watcher(object): ...@@ -7,8 +10,10 @@ class Watcher(object):
self.wm = pyinotify.WatchManager() self.wm = pyinotify.WatchManager()
self.wm.add_watch('/tmp', pyinotify.IN_CLOSE_WRITE, rec=True) self.wm.add_watch('/tmp', pyinotify.IN_CLOSE_WRITE, rec=True)
self.notifier = pyinotify.ThreadedNotifier(self.wm, self) self.notifier = pyinotify.ThreadedNotifier(self.wm, self)
self.notifier.setDaemon(True)
self.notifier.start() self.notifier.start()
def process_IN_CLOSE(self, event): def process_IN_CLOSE(self, event):
if event.pathname.lower().endswith('.mp3'): if event.pathname.lower().endswith('.mp3'):
log.debug('event in %s: %x' % (event.pathname, event.mask))
self.queue.put(event.pathname) self.queue.put(event.pathname)
import logging
from fsevents import Observer, Stream from fsevents import Observer, Stream
from djrandom_client import filescan from djrandom_client import filescan
log = logging.getLogger(__name__)
class Watcher(object): class Watcher(object):
...@@ -8,7 +11,9 @@ class Watcher(object): ...@@ -8,7 +11,9 @@ class Watcher(object):
self.queue = queue self.queue = queue
self.observer = Observer() self.observer = Observer()
self.observer.schedule(Stream(self.callback, base)) self.observer.schedule(Stream(self.callback, base))
self.observer.setDaemon(True)
self.observer.start() self.observer.start()
def callback(self, subpath, mask): def callback(self, subpath, mask):
log.debug('event in %s: %x' % (subpath, mask))
filescan.directory_scan(subpath, self.queue) filescan.directory_scan(subpath, self.queue)
...@@ -77,6 +77,7 @@ class Uploader(object): ...@@ -77,6 +77,7 @@ class Uploader(object):
log.info('successfully uploaded %s (%s)' % (path, sha1)) log.info('successfully uploaded %s (%s)' % (path, sha1))
def run(self): def run(self):
try:
while True: while True:
stats.data.set('uploading', None) stats.data.set('uploading', None)
path = self.queue.get() path = self.queue.get()
...@@ -92,4 +93,6 @@ class Uploader(object): ...@@ -92,4 +93,6 @@ class Uploader(object):
log.error('error uploading %s: %s' % (path, str(e))) log.error('error uploading %s: %s' % (path, str(e)))
stats.data.incr('errors') stats.data.incr('errors')
stats.data.set('last_error', str(e)) stats.data.set('last_error', str(e))
finally:
log.debug('uploader thread exiting')
self.db.close() self.db.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment