From a1627ffb73ab119b3a1f6108f7b16501cf1c77d5 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sat, 30 Dec 2023 10:21:57 +0000 Subject: [PATCH] Improve documentation, add --parquet-concurrency option --- README.md | 88 +++++++++++++++++++++++++++++++++++++- integrationtest/.gitignore | 1 + writer/source.go | 6 ++- writer/writer.go | 5 ++- 4 files changed, 95 insertions(+), 5 deletions(-) create mode 100644 integrationtest/.gitignore diff --git a/README.md b/README.md index 523e7b6..26bf2b8 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,38 @@ The system's functionality is split into two parts: them to log storage; * *Query*, the ability to interrogate the log storage analytically. +These two parts don't need to communicate with each other, they simply +correspond to separate *read* and *write* paths to the logs database, +which is where it all comes together. This is pretty much the default +architecture of most log analysis systems, where all the work is +ultimately delegated to "the database". *Pqlog* twists this +traditional architecture a little: + +* there is no "database", log storage is just plain files; +* the *database engine* is spun up on-demand, on every query. + +These choices have the interesting consequence that *pqlog* only needs +a storage backend (local filesystem, object storage...) as a +dependency rather than a fully-fledged database. It also allows for +resource isolation between the (presumably highly reliable, constantly +running) ingestion layer and the (operator-facing, occasionally used) +query/analysis layer, which helps small installations manage their +"idle" footprint more optimally. + +The idea is, roughly: why pay the high memory cost of database +indexing if you're not using those resources all the time? The *pqlog* +ingestion layer runs with a more or less constant memory usage that +does not depend on data throughput or size. The challenge, of course, +is to implement this vision while maintaining acceptable performance. +*Pqlog* attempts to achieve this by: + +* using a fast, columnar data layout for log storage + ([Parquet](https://parquet.apache.org)), optimized for the database + engine; +* using a ridiculously performant embedded database engine + ([DuckDB](https://duckdb.org)). + + ### Ingestion Ingestion presents an interface for external clients (syslog daemons, @@ -77,8 +109,7 @@ this limitation. The flattened records are then written to [Parquet](https://parquet.apache.org/) files, which are rotated periodically (and when they reach a certain size). These files can be -stored remotely, although the current implementation only supports -local filesystem. +stored remotely, on S3-like backends. The ingestion API endpoint is at */ingest*, and it expects a POST request with a ND-JSON request body: newline-delimited JSON-encoded @@ -127,6 +158,8 @@ advance to the caller: the SQL query should contain the placeholder `$table`, which will be replaced at execution time with the name of the temporary table to use. +No UI is currently available for querying. + ## Running The service ships as a single binary, *pqlogd*. @@ -171,6 +204,26 @@ $ curl -s -XPOST -d "q=SELECT COUNT(*) AS c FROM \$table WHERE a='foo'" http://l [{"c": 1}] ``` +### Connecting to remote storage + +The *--dir* command-line flag accepts a URI-like syntax to access +remote storage backends. The specific backend type is selected using +the URI scheme: + +* *local* - Local filesystem, same as using a plain path +* *minio* - Generic S3-like API support. Use standard environment + variables (MINIO_ACCESS_KEY etc) for credentials, URIs should have + this form: +> minio://*hostname*/*bucket*/*path* + +* *s3* - AWS S3 (not ready yet). Supports URIs of the form: +> s3://*bucket*/*path* + +* *gcs* - Google Cloud Storage (not ready yet). Supports URIs of the + form: +> gcs://*project_id*/*bucket*/*path* + + ### Debugging The server offers some debugging endpoints which might be useful to @@ -179,3 +232,34 @@ understand what it is doing: * */schema* will return the current schema in JSON format * */debug/schema* will return a human-readable dump of the internal state of the schema guesser + +### Performance and Scaling + +The ingestion service is single-writer by design, meaning that each +*pqlogd* instance will be, at any given time, writing its output to a +single Parquet file. As a consequence, it will only process one +incoming */ingest* request at a time: the idea is to apply +backpressure upstream, and let the client (presumably a syslog server +of some sort) handle with queues and retries. + +The memory usage of the ingestion server is approximately proportional +to the number of records that are being held in memory, waiting to be +written to the Parquet output. There are a few parameters that have an +influence on this number: + +* the number of fields in your schema +* the Parquet page size (controllable via the *--parquet-page-size* + command-line option) +* the Parquet writer concurrency (controllable via the + *--parquet-concurrency* command-line option) +* the Parquet row group size (controllable via the + *--parquet-row-group-size* command-line option, note that the value + is in MB) + +There's generally not much point in increasing the writer concurrency +much because the ingestion server is likely to hit a CPU bottleneck in +decoding the JSON input before anything else. If that's the case, it +is certainly possible to run multiple instances of *pqlogd* in +parallel, pointing them at the same storage: generated filenames are +unique, so the query layer will maintain the aggregate view of all +logs. diff --git a/integrationtest/.gitignore b/integrationtest/.gitignore new file mode 100644 index 0000000..c627000 --- /dev/null +++ b/integrationtest/.gitignore @@ -0,0 +1 @@ +*.prof diff --git a/writer/source.go b/writer/source.go index 85a48dc..bc601a6 100644 --- a/writer/source.go +++ b/writer/source.go @@ -27,7 +27,8 @@ type SourceURI struct { *url.URL } -// ParseRemoteURI splits a URI into scheme and a path. +// ParseRemoteURI splits a URI into scheme and a path, checking that +// the scheme corresponds to a known Source. func ParseRemoteURI(uri string) (*SourceURI, error) { u, err := url.Parse(uri) if err != nil { @@ -52,7 +53,8 @@ func (u *SourceURI) Source() (Source, error) { // A Source is an abstraction over remote storage that extends // parquet-go-source with the Rename operation, and provides a file -// creation factory function. +// creation factory function. It also provides hooks for the query +// layer to access those files when building its aggregate SQL view. type Source interface { // Create a new file. Create(string) (source.ParquetFile, error) diff --git a/writer/writer.go b/writer/writer.go index 03532bd..5783c82 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -1,6 +1,7 @@ package writer import ( + "flag" "fmt" "log" "time" @@ -11,6 +12,8 @@ import ( pqwriter "github.com/xitongsys/parquet-go/writer" ) +var parquetConcurrency = flag.Int("parquet-concurrency", 2, "concurrency of the Parquet writer") + // Writer is the interface for our restartable ParquetWriter. type Writer interface { Reopen(*schema.ParquetSchemaHandler) error @@ -80,7 +83,7 @@ func (w *ParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error } w.renameTo = path - w.pw, err = newTuplesWriter(schemaHandler, w.lw, 4) + w.pw, err = newTuplesWriter(schemaHandler, w.lw, int64(*parquetConcurrency)) if err != nil { return fmt.Errorf("can't create Parquet writer: %w", err) } -- GitLab