Skip to content
Snippets Groups Projects
ale's avatar
ale authored
Update module github.com/prometheus/client_golang to v1.20.3

See merge request !17
568dcc7c
History

pqlog

pqlog is an indexing and retrieval service for structured and unstructured logs (e.g. syslog and more). Its primary capability is its ability to scale down for small installations, using very few resources while maintaining a certain level of usefulness, offering an alternative to heavyweight stacks like ELK in this scenario.

Overview

The system's functionality is split into two parts:

  • Ingestion (or indexing), processing incoming logs and writing them to log storage;
  • Query, the ability to interrogate the log storage analytically.

These two parts don't need to communicate with each other, they simply correspond to separate read and write paths to the logs database, which is where it all comes together. This is pretty much the default architecture of most log analysis systems, where all the work is ultimately delegated to "the database". Pqlog twists this traditional architecture a little:

  • there is no "database", log storage is just plain files;
  • the database engine is spun up on-demand, on every query.

These choices have the interesting consequence that pqlog only needs a storage backend (local filesystem, object storage...) as a dependency rather than a fully-fledged database. It also allows for resource isolation between the (presumably highly reliable, constantly running) ingestion layer and the (operator-facing, occasionally used) query/analysis layer, which helps small installations manage their idle footprint more optimally.

The idea is, roughly: why pay the high memory cost of database indexing if you're not using those resources all the time? The pqlog ingestion layer runs with a more or less constant memory usage that does not depend on data throughput or size. The challenge, of course, is to implement this vision while maintaining acceptable performance. Pqlog attempts to achieve this by:

  • using a fast, columnar data layout for log storage (Parquet), optimized for the database engine;
  • using a ridiculously performant embedded database engine (DuckDB).

Ingestion

Ingestion presents an interface for external clients (syslog daemons, fluentd, etc) to send logs for indexing, in batches. Incoming log entries are converted to a unified "flattened" schema.

The flattened schema maps structured attributes to a flat, one-level structure by using their paths (dot-separated) as field names. As an example, an input record such as:

{
    "a": {
        "foo": "bar",
        "baz": "arg"
    },
    "b": 42
}

would get internally converted to a "flat" structure like the following:

{
    "a.foo": "bar",
    "a.baz": "arg",
    "b": 42
}

So, when querying the database, one should refer to the foo field as a.foo.

The flattened schema supports lists of scalar values (numbers, strings), but it does not support nesting structures within lists, as it would not know how to unroll them. For instance, the following record has a problem:

{
    "a": [
        {"foo": "bar"},
        {"baz": "arg"}
    ]
}

In this situation the ingestion stage will just ignore the problematic field (a in the above example).

The flattening is ultimately necessary because of an implementation detail: the underlying query engine (DuckDB), although it fully supports structured schemas, can have trouble reconciling different STRUCTs across different Parquet files. But it's a TODO to get rid of this limitation.

The flattened records are then written to Parquet files, which are rotated periodically (and when they reach a certain size). These files can be stored remotely, on S3-like backends.

The ingestion API endpoint is at /ingest, and it expects a POST request with a ND-JSON request body (newline-delimited JSON-encoded records, no additional headers or footers).

Nested JSON records

The "message" field is handled specially: in many logging systems this field might end up containing nested JSON-encoded data, so if pqlog detects this to be the case, it will unmarshal the JSON string and replace it with the encoded object, e.g.:

{
    "message": "{\"foo\":\"bar\",\"baz\":\"123\"}"
}

will result in the following flattened record:

{
    "message.foo": "bar",
    "message.baz": "123"
}

Schema unification

As it ingests records, pqlog learns the data types of the incoming attributes, determining an aggregate schema for all the records. Since in the "flat" schema all nodes are leafs, it knows about the following fundamental data types:

  • STRING
  • NUMBER
  • TIMESTAMP is a RFC3339-formatted string
  • LIST is just a repeated STRING

The mere existence of an aggregate schema, necessary in order to query the data, imposes some restrictions on the structure of the data. It is not after all unreasonable to expect the data to have some level of consistency, if you want to be able to query it in aggregate.

When the schema detector finds conflicting data types for a field, it will try to promote them to a common type where possible, handling common cases such as STRING / NUMBER conflicts by upgrading the values to STRING. But where such type promotions aren't possible, the problematic field will be dropped from the schema.

Querying

The query engine is DuckDB, which can just read the Parquet files, even remotely and run fast analytical queries on them.

One thing of note is that, in the current implementation, it is only possible to query fully written Parquet files. The implication is that you won't see logs until the ingestion server decides it's time to finalize the current Parquet file. For this reason, it might be sensible to set the --rotation-interval option to a few minutes.

The query API is at /query and it takes a full SQL query as the q parameter. The response will be JSON-encoded. Since the table to query is created on-the-fly with every request, its name is not known in advance to the caller: the SQL query should contain the placeholder $table, which will be replaced at execution time with the name of the temporary table to use.

No UI is currently available for querying.

Running

The service ships as a single binary, pqlogd.

From an operational perspective, the two components of the service (ingestion and query) can be run independently, and are controlled by the command-line options --enable-ingest and --enable-query, respectively. In all cases, the server will listen on the HTTP address specified by --addr.

Ingestion service

The ingestion service needs to know where to store its output. This is controlled by the --dir command-line option, which currently only accepts a local path (S3 support TODO).

Log files be written in this directory, rotated every --rotation-interval.

The schema detector works better if it can persist its internal state and recover it on startup: to make it do so, specify the --state-file option.

An example:

$ pqlogd --dir ./data --enable-ingest --addr :3000 &
$ curl -s -XPOST --data-binary '{"a": "foo", "b": 42}' http://localhost:3000/ingest
$ kill %1 # Causes the Parquet files to be finalized

Query service

The query service also needs to know where the Parquet data files are located, with the --dir option.

Continuing the example above:

$ pqlogd --dir ./data --enable-query --addr :3001 &
$ curl -s -XPOST -d "q=SELECT COUNT(*) AS c FROM \$table WHERE a='foo'" http://localhost:3001/query
[{"c": 1}]

Connecting to remote storage

The --dir command-line flag accepts a URI-like syntax to access remote storage backends. The specific backend type is selected using the URI scheme:

  • local - Local filesystem, same as using a plain path
  • minio - Generic S3-like API support. Use standard environment variables (MINIO_ACCESS_KEY etc) for credentials, URIs should have this form: minio://hostname/bucket/path
  • s3 - AWS S3 (not ready yet). Supports URIs like s3://bucket/path
  • gcs - Google Cloud Storage (not ready yet). Supports URIs of the form gcs://project_id/bucket/path

Debugging

The server offers some debugging endpoints which might be useful to understand what it is doing:

  • /schema will return the current schema in JSON format
  • /debug/schema will return a human-readable dump of the internal state of the schema guesser, where you can find a report on the errors encountered

Performance and Scaling

The ingestion service is single-writer by design, meaning that each pqlogd instance will be, at any given time, writing its output to a single Parquet file. As a consequence, it will only process one incoming /ingest request at a time: the idea is to apply backpressure upstream, and let the client (presumably a syslog server of some sort) handle with queues and retries.

The memory usage of the ingestion server is approximately proportional to the number of records that are being held in memory, waiting to be written to the Parquet output. There are a few parameters that have an influence on this number:

  • the number of fields in your schema
  • the Parquet page size (controllable via the --parquet-page-size command-line option)
  • the Parquet writer concurrency (controllable via the --parquet-concurrency command-line option)
  • the Parquet row group size (controllable via the --parquet-row-group-size command-line option, note that the value is in MB)

There's generally not much point in increasing the writer concurrency much because the ingestion server is likely to hit a CPU bottleneck in decoding the JSON input before anything else. If that's the case, it is certainly possible to run multiple instances of pqlogd in parallel, pointing them at the same storage: generated filenames are unique, so the query layer will maintain the aggregate view of all logs.

Note that multiple instances of the indexer will each run their own, independent schema analysis, which can potentially result in different schemas depending on the input. This is not an issue, because what matters is that the schema is consistent within each individual Parquet file: the database engine can easily merge those together at query time.