import logging import threading try: import Queue except ImportError: import queue as Queue from cheroot import wsgi from flask import Flask, request, make_response, abort from .rebuild import rebuild queue = Queue.Queue() def _process_request(gl, project_deps, data): pipeline_status = data['object_attributes']['status'] branch_name = data['object_attributes']['ref'] project_path = data['project']['path_with_namespace'] action = 'none' if pipeline_status == 'success': deps = project_deps.get((project_path, branch_name), []) built_projects = [] for dep_path, dep_branch in deps: try: rebuild(gl, dep_path, dep_branch) built_projects.append(f'{dep_path}:{dep_branch}') except Exception as e: logging.error('error rebuilding project %s:%s: %s' % ( dep_path, dep_branch, str(e))) action = 'rebuilt %s' % (', '.join(built_projects),) logging.info('pipeline for %s@%s: %s, action=%s', project_path, branch_name, pipeline_status, action) def worker_thread(gl, project_deps): while True: data = queue.get() try: _process_request(gl, project_deps, data) except Exception as e: logging.error('error processing request: %s', str(e)) app = Flask(__name__) def run_app(gl, project_deps, bind_host, bind_port, webhook_token, num_workers=3): app.config.update({ 'WEBHOOK_AUTH_TOKEN': webhook_token, }) # Start the worker threads that will process the requests in the # background. for i in range(num_workers): wt = threading.Thread( target=worker_thread, args=(gl, project_deps), name='Worker %d' % (i+1)) wt.setDaemon(True) wt.start() # Start the HTTP server to receive webhook requests. server = wsgi.Server((bind_host, bind_port), app) server.start() @app.route('/', methods=('POST',)) def app_webhook(): # Authenticate the request, if configured to do so. if app.config['WEBHOOK_AUTH_TOKEN']: token = request.headers.get('X-Gitlab-Token') if token != app.config['WEBHOOK_AUTH_TOKEN']: abort(401) # Ignore non-pipeline events. ev_type = request.headers.get('X-Gitlab-Event') if ev_type != 'Pipeline Hook': app.logger.info('ignored non-pipeline event with type %s' % ev_type) else: # Send request to worker threads for processing. queue.put(request.json) return make_response('{}') @app.route('/healthz') def healthz(): return make_response('OK')