Commit 2c2be876 authored by ale's avatar ale

removed old SOLR api files

parent 2603d002
import logging
import os
import solr
import threading
import time
from lens2 import utils
SOLR_TIMEOUT = 600
SOLR_RETRIES = 3
SOLR_DATEFMT = '%Y-%m-%dT%H:%M:%SZ'
INSERT_BATCH_SIZE = 100
SEARCH_BATCH_SIZE = 1000
log = logging.getLogger(__name__)
class CommitThread(threading.Thread):
def __init__(self, client, interval):
self._client = client
self._interval = interval
threading.Thread.__init__(self)
def run(self):
log.info('starting background commit every %d seconds' % self._interval)
conn = self._client._solr()
while True:
time.sleep(self._interval)
try:
conn.commit()
except Exception, e:
log.error('exception in commit: %s' % str(e))
class LensClient(object):
def __init__(self, url, http_user=None, http_pass=None):
self._url = url
self._http_user = http_user
self._http_pass = http_pass
self._local = threading.local()
self._commit_thread = None
def _solr(self):
if not hasattr(self._local, 'solr'):
self._local.solr = solr.Solr(
self._url, persistent=True, timeout=SOLR_TIMEOUT,
http_user=self._http_user, http_pass=self._http_pass,
max_retries=SOLR_RETRIES)
return self._local.solr
def insert(self, line_or_lines, parser, filters=None,
commit_every=None, batch_size=INSERT_BATCH_SIZE):
if isinstance(line_or_lines, basestring):
line_or_lines = [line_or_lines]
start = time.time()
conn = self._solr()
# Insert lines (SOLR documents) in batches.
def to_doc(stream):
for line in stream:
try:
doc = parser.parse(line)
if filters:
# 'doc' is modified in-place.
map(lambda f: f(doc), filters)
yield doc
except (UnicodeDecodeError, ValueError):
pass
# Only start a background commit thread once.
if commit_every and not self._commit_thread:
self._commit_thread = CommitThread(self, commit_every)
self._commit_thread.setDaemon(True)
self._commit_thread.start()
# Add all documents to the SOLR index.
n_recs = 0
chunks = utils.batch(to_doc(line_or_lines), batch_size)
for docs in chunks:
n_recs += len(docs)
conn.add_many(docs, commit=False)
conn.commit(wait_flush=False, wait_searcher=False)
elapsed = time.time() - start
log.info('inserted %d docs in %g seconds, %.5g docs/s' % (
n_recs, elapsed, n_recs / elapsed))
def search(self, query, time_range=None, offset=0, limit=None,
sort='timestamp asc'):
"""Search for 'query', yield results."""
pos = offset
solr_page_size = SEARCH_BATCH_SIZE
end = None
if limit is not None:
end = offset + limit
# Add a range query on 'timestamp' if either start or end
# times are specified.
if time_range:
tstart, tend = time_range
time_start = tstart and tstart.strftime(SOLR_DATEFMT) or '*'
time_end = tend and tend.strftime(SOLR_DATEFMT) or 'NOW'
query = '(%s) AND (timestamp:[%s TO %s])' % (
query, time_start, time_end)
# Read data in batches of 'page_size' items at a time.
conn = self._solr()
while True:
n_rows = solr_page_size
if end is not None:
n_rows = min(end - pos, solr_page_size)
result = conn.select(q=query, sort=sort, fields='*',
start=pos, rows=n_rows)
for doc in result.results:
yield doc
if end is None:
end = result.numFound
pos += solr_page_size
if pos >= end:
break
def expire(self, limit):
"""Remove all records older than 'limit'."""
query = 'timestamp:[* TO %s]' % limit.strftime(SOLR_DATEFMT)
conn = self._solr()
conn.delete_query(query)
conn.commit()
from jinja2 import Environment, FileSystemLoader
from werkzeug import Request, Response, SharedDataMiddleware
from werkzeug.exceptions import HTTPException, NotFound, BadRequest, MethodNotAllowed
from werkzeug import run_simple
from datetime import datetime
from lens2 import utils
import json
import logging
import os
import re
import time
import traceback
log = logging.getLogger(__name__)
_jinja = Environment(loader=FileSystemLoader(
os.path.join(os.path.dirname(__file__), 'templates')))
_static_root = os.path.join(os.path.dirname(__file__), 'static')
# Add a JSON serializator for datetime objects.
def _json_default_handler(obj):
if isinstance(obj, datetime):
return int(time.mktime(obj.timetuple()))
else:
raise TypeError('Object of type %s with value of %s is not JSON serializable' % (type(Obj), repr(Obj)))
class Page(object):
"""Base class for a URL handler."""
def __init__(self, request, api):
self.request = request
self.api = api
def __call__(self):
raise NotFound()
def response(self, data, content_type):
"""Create a Response object with some custom headers."""
return Response(data, status=200, headers={
'Content-Type': content_type,
'Cache-control': 'no-cache',
'Expires': '-1'})
def render(self, template_name, **args):
"""Render an HTML template."""
tmpl = _jinja.get_template(template_name)
return self.response(tmpl.render(**args),
content_type='text/html; charset=utf-8')
def to_json(self, data):
"""Return JSON-encoded data."""
enc_data = json.dumps(data, default=_json_default_handler)
return self.response(enc_data,
content_type='application/json')
class IndexPage(Page):
def __call__(self):
query = self.request.args.get('q')
return self.render('search.html', query=query)
class SearchPage(Page):
def __call__(self):
query = self.request.args.get('q')
format = self.request.args.get('fmt', 'json')
offset = int(self.request.args.get('offset', '0'))
sort_order = self.request.args.get('sort', 'desc')
limit = self.request.args.get('limit')
if limit:
limit = int(limit)
range = self.request.args.get('range')
if range:
range = utils.parse_time_range(range)
if query:
results = self.api.search(query, time_range=range, offset=offset,
limit=limit, sort='timestamp %s' % sort_order)
else:
results = []
if format == 'json':
return self.to_json({'results': list(results)})
elif format == 'raw':
def format_results():
for log in results:
yield utils.format_log(log)
return self.response(format_results(), content_type='text/plain')
else:
raise BadRequest()
class WSGIServer(object):
URLS = []
def __init__(self, api):
self._api = api
self._urls = [(re.compile('^%s$' % u[0]), u[1]) for u in self.URLS]
self._dispatch = SharedDataMiddleware(self._dispatch, {
'/static': _static_root})
def _dispatch(self, environ, start_response):
try:
request = Request(environ)
for pattern, view_class in self._urls:
match = pattern.match(request.path)
if match:
view = view_class(request, self._api)
args = match.groups()
response = view(*args)
break
else:
raise NotFound()
except HTTPException, e:
response = e
result = response(environ, start_response)
return result
def __call__(self, environ, start_response):
return self._dispatch(environ, start_response)
def serve_forever(self, port=8081):
run_simple('0.0.0.0', int(port), self)
class LensWSGIServer(WSGIServer):
URLS = [
('/search', SearchPage),
('/', IndexPage),
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment