diff --git a/README.md b/README.md index 523e7b6b1052f4d8b755d6f70f1d7872c9b12db5..26bf2b8dd896baeafd214119ef96e511d313e28e 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,38 @@ The system's functionality is split into two parts: them to log storage; * *Query*, the ability to interrogate the log storage analytically. +These two parts don't need to communicate with each other, they simply +correspond to separate *read* and *write* paths to the logs database, +which is where it all comes together. This is pretty much the default +architecture of most log analysis systems, where all the work is +ultimately delegated to "the database". *Pqlog* twists this +traditional architecture a little: + +* there is no "database", log storage is just plain files; +* the *database engine* is spun up on-demand, on every query. + +These choices have the interesting consequence that *pqlog* only needs +a storage backend (local filesystem, object storage...) as a +dependency rather than a fully-fledged database. It also allows for +resource isolation between the (presumably highly reliable, constantly +running) ingestion layer and the (operator-facing, occasionally used) +query/analysis layer, which helps small installations manage their +"idle" footprint more optimally. + +The idea is, roughly: why pay the high memory cost of database +indexing if you're not using those resources all the time? The *pqlog* +ingestion layer runs with a more or less constant memory usage that +does not depend on data throughput or size. The challenge, of course, +is to implement this vision while maintaining acceptable performance. +*Pqlog* attempts to achieve this by: + +* using a fast, columnar data layout for log storage + ([Parquet](https://parquet.apache.org)), optimized for the database + engine; +* using a ridiculously performant embedded database engine + ([DuckDB](https://duckdb.org)). + + ### Ingestion Ingestion presents an interface for external clients (syslog daemons, @@ -77,8 +109,7 @@ this limitation. The flattened records are then written to [Parquet](https://parquet.apache.org/) files, which are rotated periodically (and when they reach a certain size). These files can be -stored remotely, although the current implementation only supports -local filesystem. +stored remotely, on S3-like backends. The ingestion API endpoint is at */ingest*, and it expects a POST request with a ND-JSON request body: newline-delimited JSON-encoded @@ -127,6 +158,8 @@ advance to the caller: the SQL query should contain the placeholder `$table`, which will be replaced at execution time with the name of the temporary table to use. +No UI is currently available for querying. + ## Running The service ships as a single binary, *pqlogd*. @@ -171,6 +204,26 @@ $ curl -s -XPOST -d "q=SELECT COUNT(*) AS c FROM \$table WHERE a='foo'" http://l [{"c": 1}] ``` +### Connecting to remote storage + +The *--dir* command-line flag accepts a URI-like syntax to access +remote storage backends. The specific backend type is selected using +the URI scheme: + +* *local* - Local filesystem, same as using a plain path +* *minio* - Generic S3-like API support. Use standard environment + variables (MINIO_ACCESS_KEY etc) for credentials, URIs should have + this form: +> minio://*hostname*/*bucket*/*path* + +* *s3* - AWS S3 (not ready yet). Supports URIs of the form: +> s3://*bucket*/*path* + +* *gcs* - Google Cloud Storage (not ready yet). Supports URIs of the + form: +> gcs://*project_id*/*bucket*/*path* + + ### Debugging The server offers some debugging endpoints which might be useful to @@ -179,3 +232,34 @@ understand what it is doing: * */schema* will return the current schema in JSON format * */debug/schema* will return a human-readable dump of the internal state of the schema guesser + +### Performance and Scaling + +The ingestion service is single-writer by design, meaning that each +*pqlogd* instance will be, at any given time, writing its output to a +single Parquet file. As a consequence, it will only process one +incoming */ingest* request at a time: the idea is to apply +backpressure upstream, and let the client (presumably a syslog server +of some sort) handle with queues and retries. + +The memory usage of the ingestion server is approximately proportional +to the number of records that are being held in memory, waiting to be +written to the Parquet output. There are a few parameters that have an +influence on this number: + +* the number of fields in your schema +* the Parquet page size (controllable via the *--parquet-page-size* + command-line option) +* the Parquet writer concurrency (controllable via the + *--parquet-concurrency* command-line option) +* the Parquet row group size (controllable via the + *--parquet-row-group-size* command-line option, note that the value + is in MB) + +There's generally not much point in increasing the writer concurrency +much because the ingestion server is likely to hit a CPU bottleneck in +decoding the JSON input before anything else. If that's the case, it +is certainly possible to run multiple instances of *pqlogd* in +parallel, pointing them at the same storage: generated filenames are +unique, so the query layer will maintain the aggregate view of all +logs. diff --git a/integrationtest/.gitignore b/integrationtest/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..c627000badbf880f7eb806f8e4888e1a89c0fc80 --- /dev/null +++ b/integrationtest/.gitignore @@ -0,0 +1 @@ +*.prof diff --git a/writer/source.go b/writer/source.go index 85a48dc9ba6b74658070418cb27b65fb1df4b10f..bc601a635a0c930501310567d13cfc88bb5efe38 100644 --- a/writer/source.go +++ b/writer/source.go @@ -27,7 +27,8 @@ type SourceURI struct { *url.URL } -// ParseRemoteURI splits a URI into scheme and a path. +// ParseRemoteURI splits a URI into scheme and a path, checking that +// the scheme corresponds to a known Source. func ParseRemoteURI(uri string) (*SourceURI, error) { u, err := url.Parse(uri) if err != nil { @@ -52,7 +53,8 @@ func (u *SourceURI) Source() (Source, error) { // A Source is an abstraction over remote storage that extends // parquet-go-source with the Rename operation, and provides a file -// creation factory function. +// creation factory function. It also provides hooks for the query +// layer to access those files when building its aggregate SQL view. type Source interface { // Create a new file. Create(string) (source.ParquetFile, error) diff --git a/writer/writer.go b/writer/writer.go index 03532bda1e84a3921b5849b1196484704642442f..5783c824302c0872585e078c917863b75e6ca15a 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -1,6 +1,7 @@ package writer import ( + "flag" "fmt" "log" "time" @@ -11,6 +12,8 @@ import ( pqwriter "github.com/xitongsys/parquet-go/writer" ) +var parquetConcurrency = flag.Int("parquet-concurrency", 2, "concurrency of the Parquet writer") + // Writer is the interface for our restartable ParquetWriter. type Writer interface { Reopen(*schema.ParquetSchemaHandler) error @@ -80,7 +83,7 @@ func (w *ParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error } w.renameTo = path - w.pw, err = newTuplesWriter(schemaHandler, w.lw, 4) + w.pw, err = newTuplesWriter(schemaHandler, w.lw, int64(*parquetConcurrency)) if err != nil { return fmt.Errorf("can't create Parquet writer: %w", err) }