diff --git a/README.md b/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..3953f95490ae62b32b689ab549e51baeab5fac4e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,179 @@
+# pqlog
+
+*pqlog* is an indexing and retrieval service for structured and
+unstructured logs (e.g. *syslog* and more). Its primary capability is
+its ability to *scale down* for small installations, using very few
+resources while maintaining a certain level of usefulness, offering an
+alternative to heavyweight stacks like ELK in this scenario.
+
+## Overview
+
+The system's functionality is split into two parts:
+
+* *Ingestion* (or *indexing*), processing incoming logs and writing
+  them to log storage;
+* *Query*, the ability to interrogate the log storage analytically.
+
+### Ingestion
+
+Ingestion presents an interface for external clients (syslog daemons,
+fluentd, *etc*) to send logs for indexing, in batches. Incoming log
+entries are converted to a unified "flattened" schema.
+
+The flattened schema maps structured attributes to a flat, one-level
+structure by using their paths (dot-separated) as field names. As an
+example, an input record such as:
+
+```json
+{
+    "a": {
+        "foo": "bar",
+        "baz": "arg"
+    },
+    "b": 42
+}
+```
+
+would get internally converted to a "flat" structure like the following:
+
+```json
+{
+    "a.foo": "bar",
+    "a.baz": "arg",
+    "b": 42
+}
+```
+
+So, when querying the database, one should refer to the *foo* field as
+*a.foo*.
+
+The flattened schema supports lists of scalar values (numbers,
+strings), but it does not support nesting structures within lists, as
+it would not know how to unroll them. For instance, the following
+record has a problem:
+
+```json
+{
+    "a": [
+        {
+            "foo": "bar"
+        },
+        {
+            "baz": "arg"
+        }
+    ]
+}
+```
+
+In this situation the ingestion stage will just ignore the problematic
+field (*a* in the above example).
+
+The flattening is ultimately necessary because of an implementation
+detail: the underlying query engine (DuckDB), although it fully
+supports structured schemas, can have trouble reconciling different
+STRUCTs across different Parquet files. But it's a TODO to get rid of
+this limitation.
+
+The flattened records are then written to [Parquet]() files, which are
+rotated periodically (and when they reach a certain size). These files
+can be stored remotely, although the current implementation only
+supports local filesystem.
+
+The ingestion API endpoint is at */ingest*, and it expects a POST
+request with a ND-JSON request body: newline-delimited JSON-encoded
+records, no additional headers or footers.
+
+### Schema unification
+
+As it ingests records, *pqlog* learns the data types of the incoming
+attributes, determining an aggregate schema for all the records. Since
+in the "flat" schema all nodes are leafs, it knows about the following
+fundamental data types:
+
+* `STRING`
+* `NUMBER`
+* `TIMESTAMP` is a RFC3339-formatted string
+* `LIST` is just a repeated `STRING`
+
+The mere existence of an aggregate schema, necessary in order to query
+the data, imposes some restrictions on the structure of the data. It
+is not after all unreasonable to expect the data to have some level of
+consistency, if you want to be able to query it in aggregate.
+
+When the schema detector finds conflicting data types for a field, it
+will try to promote them to a common type where possible, handling
+common cases such as STRING / NUMBER conflicts by upgrading the values
+to STRING. But where such type promotions aren't possible, the
+problematic field will be dropped from the schema.
+
+### Querying
+
+The query engine is [DuckDB](https://duckdb.org), which can just [read
+the Parquet files, even remotely]() and run fast analytical queries on
+them.
+
+One thing of note is that, in the current implementation, it is only
+possible to query fully written Parquet files. The implication is that
+you won't see logs until the ingestion server decides it's time to
+finalize the current Parquet file. For this reason, it might be
+sensible to set the *--rotation-interval* option to a few minutes.
+
+The query API is at */query* and it takes a full SQL query as the *q*
+parameter. The response will be JSON-encoded. Since the table to query
+is created on-the-fly with every request, its name is not known in
+advance to the caller: the SQL query should contain the placeholder
+`$table`, which will be replaced at execution time with the name of
+the temporary table to use.
+
+## Running
+
+The service ships as a single binary, *pqlogd*.
+
+From an operational perspective, the two components of the service
+(ingestion and query) can be run independently, and are controlled by
+the command-line options *--enable-ingest* and *--enable-query*,
+respectively. In all cases, the server will listen on the HTTP address
+specified by *--addr*.
+
+### Ingestion service
+
+The ingestion service needs to know where to store its output. This is
+controlled by the *--dir* command-line option, which currently only
+accepts a local path (S3 support TODO).
+
+Log files be written in this directory, rotated every
+*--rotation-interval*.
+
+The schema detector works better if it can persist its internal state
+and recover it on startup: to make it do so, specify the
+*--state-file* option.
+
+An example:
+
+```
+$ pqlogd --dir ./data --enable-ingest --addr :3000 &
+$ curl -s -XPOST --data-binary '{"a": "foo", "b": 42}' http://localhost:3000/ingest
+$ kill %1 # Causes the Parquet files to be finalized
+```
+
+### Query service
+
+The query service also needs to know where the Parquet data files are
+located, with the *--dir* option.
+
+Continuing the example above:
+
+```
+$ pqlogd --dir ./data --enable-query --addr :3001 &
+$ curl -s -XPOST -d "q=SELECT COUNT(*) AS c FROM \$table WHERE a='foo'" http://localhost:3001/query
+[{"c": 1}]
+```
+
+### Debugging
+
+The server offers some debugging endpoints which might be useful to
+understand what it is doing:
+
+* */schema* will return the current schema in JSON format
+* */debug/schema* will return a human-readable dump of the internal
+  state of the schema guesser
diff --git a/cmd/pqlogd/main.go b/cmd/pqlogd/main.go
index 98fa7f99d13443c30ce407814e46bd21442710c4..3d0efdefb9d823743cb0539c0931fb4efa7da319 100644
--- a/cmd/pqlogd/main.go
+++ b/cmd/pqlogd/main.go
@@ -49,7 +49,7 @@ func main() {
 			log.Fatal(err)
 		}
 
-		w := writer.NewRotating(pathGen, *rotationInterval)
+		w := writer.NewRotating(writer.NewLocalSource(), pathGen, *rotationInterval)
 		defer w.Close()
 
 		isrv := pqhttp.NewIngest(w, *stateFile)
diff --git a/http/ingest.go b/http/ingest.go
index 5e874d86669d3f65294b3511fef94bc613a8aff3..98821661581532d2b194a103b053c2d89d99ea5c 100644
--- a/http/ingest.go
+++ b/http/ingest.go
@@ -6,7 +6,6 @@ import (
 	"io"
 	"log"
 	"net/http"
-	"strings"
 	"sync"
 
 	"git.autistici.org/ai3/attic/pqlog/schema"
@@ -105,7 +104,7 @@ func (s *IngestServer) ingestBatch(r io.Reader) (int, error) {
 		s.numSchemaChanges++
 		s.schema = s.guesser.NewSchema()
 		s.pschema = s.schema.ParquetSchemaHandler()
-		if err := s.writer.Reopen(s.pschema.SchemaHandler); err != nil {
+		if err := s.writer.Reopen(s.pschema); err != nil {
 			return 0, err
 		}
 		if s.stateFile != "" {
@@ -130,10 +129,6 @@ func (s *IngestServer) IngestHandler(w http.ResponseWriter, req *http.Request) {
 		http.Error(w, "Bad method", http.StatusMethodNotAllowed)
 		return
 	}
-	if !strings.HasPrefix(req.Header.Get("Content-Type"), "application/json") {
-		http.Error(w, "Bad Content-Type", http.StatusBadRequest)
-		return
-	}
 
 	n, err := s.ingestBatch(req.Body)
 	if err != nil {
@@ -160,12 +155,9 @@ func (s *IngestServer) DebugHandler(w http.ResponseWriter, req *http.Request) {
 
 func (s *IngestServer) SchemaHandler(w http.ResponseWriter, req *http.Request) {
 	s.mx.Lock()
-	out := struct {
-		Version int64             `json:"version"`
-		Fields  map[string]string `json:"fields"`
-	}{
-		Version: s.schema.Version,
-		Fields:  s.schema.Debug(),
+	var out *schema.SchemaDebugInfo
+	if s.schema != nil {
+		out = s.schema.Debug()
 	}
 	s.mx.Unlock()
 
diff --git a/http/query.go b/http/query.go
index b614ed0d2da35dcf4d743e986f8f7def186f8fde..555133615e64ec9ee92abc80ca19ca750766ad79 100644
--- a/http/query.go
+++ b/http/query.go
@@ -1,12 +1,14 @@
 package pqhttp
 
 import (
+	"bufio"
 	"context"
 	"database/sql"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
+	"log"
 	"math/rand"
 	"net/http"
 	"path/filepath"
@@ -45,18 +47,18 @@ func (q *QueryServer) Close() {
 }
 
 func (q *QueryServer) runQuery(ctx context.Context, w io.Writer, query string) error {
-	table := fmt.Sprintf("logs-%08x", rand.Int63())
+	table := fmt.Sprintf("logs%08x", rand.Int63())
 	query = strings.Replace(query, "$table", table, -1)
 
 	tx, err := q.db.BeginTx(ctx, nil)
 	if err != nil {
-		return err
+		return fmt.Errorf("failed to create transaction: %w", err)
 	}
 	defer tx.Rollback()
 
 	_, err = tx.Exec(fmt.Sprintf("CREATE VIEW %s AS SELECT * FROM read_parquet('%s', union_by_name=true)", table, q.glob))
 	if err != nil {
-		return err
+		return fmt.Errorf("failed to create DuckDB view: %w", err)
 	}
 	defer func() {
 		// Well, ok, we have the Rollback() but.
@@ -67,30 +69,41 @@ func (q *QueryServer) runQuery(ctx context.Context, w io.Writer, query string) e
 	rows, err := tx.Query(query)
 	if err != nil {
 		// Report this as a client-originated error.
-		return newQueryError(err)
+		return newQueryError(fmt.Errorf("query execution error: %w", err))
 	}
 	defer rows.Close()
 
-	io.WriteString(w, "{\"status\":\"ok\",\"results\":[")
+	bw := bufio.NewWriter(w)
+
+	_, _ = io.WriteString(bw, "{\"status\":\"ok\",\"results\":[")
 	cols, _ := rows.ColumnTypes()
+	first := true
 	for rows.Next() {
+		if first {
+			first = false
+		} else {
+			_, _ = io.WriteString(bw, ",")
+		}
+
 		values := make([]any, len(cols))
 		for i := 0; i < len(cols); i++ {
-			values[i] = reflect.Zero(cols[i].ScanType()).Interface()
+			value := reflect.Zero(cols[i].ScanType()).Interface()
+			values[i] = &value
 		}
 		if err := rows.Scan(values...); err != nil {
-			return err
+			return fmt.Errorf("Scan error: %w", err)
 		}
 		row := make(map[string]any, len(cols))
 		for i := 0; i < len(cols); i++ {
 			row[cols[i].Name()] = values[i]
 		}
-		if err := json.NewEncoder(w).Encode(row); err != nil {
-			return err
+		if err := json.NewEncoder(bw).Encode(row); err != nil {
+			return fmt.Errorf("JSON encode error: %w", err)
 		}
-		io.WriteString(w, ",\n")
 	}
-	io.WriteString(w, "]}\n")
+	_, _ = io.WriteString(bw, "]}\n")
+
+	bw.Flush()
 
 	return rows.Err()
 }
@@ -115,6 +128,7 @@ func (q *QueryServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 		if errors.Is(err, &queryError{}) {
 			status = http.StatusBadRequest
 		}
+		log.Printf("query error: %v", err)
 		http.Error(w, err.Error(), status)
 	}
 }
diff --git a/schema/parquet.go b/schema/parquet.go
index 4ed5189c526fae66532246afaef8ac94ffdf37d7..cb2149f1555656510eeb8e036818f657300719ec 100644
--- a/schema/parquet.go
+++ b/schema/parquet.go
@@ -92,7 +92,7 @@ func (s *Schema) ParquetSchemaHandler() *ParquetSchemaHandler {
 
 func (s *ParquetSchemaHandler) TuplesToRow(tuples []Tuple) []any {
 	row := make([]any, s.numFields)
-	copy(row, s.zeros)
+	//copy(row, s.zeros)
 	for _, t := range tuples {
 		idx := s.fieldIndexMap[t.key]
 		value := t.value
@@ -103,3 +103,7 @@ func (s *ParquetSchemaHandler) TuplesToRow(tuples []Tuple) []any {
 	}
 	return row
 }
+
+func (s *ParquetSchemaHandler) Zero(i int) any {
+	return s.zeros[i]
+}
diff --git a/schema/schema.go b/schema/schema.go
index 031f2b56db700cc52cb144e85d213522ecb11068..64d0da042dd2fb16dc92bf7c4b4950485d052587 100644
--- a/schema/schema.go
+++ b/schema/schema.go
@@ -135,26 +135,6 @@ func (g typeGuesser) datatype() (Type, upcastFunc, bool) {
 	return t, nil, true
 }
 
-func (g typeGuesser) debugState() ColumnDebugState {
-	var n int
-	var ds ColumnDebugState
-	for i := 0; i < maxTypeBits; i++ {
-		j := Type(1 << i)
-		if g.isSet(j) {
-			n++
-			ds.Types = append(ds.Types, j.String())
-		}
-	}
-	if n > 1 {
-		s := "type conflict"
-		if up, _, ok := g.supportedUpcast(); ok {
-			s = fmt.Sprintf("upcast to %s", up.String())
-		}
-		ds.Notes = s
-	}
-	return ds
-}
-
 // The Guesser keeps track of the columns we see (after
 // flattening the input) and their associated data types, and infers a
 // schema over time, to the extent where it is possible to do so.
@@ -220,32 +200,6 @@ func (g *Guesser) Save(path string) error {
 	return json.NewEncoder(f).Encode(&sg)
 }
 
-// ColumnDebugState holds information about a specific column.
-type ColumnDebugState struct {
-	Types []string `json:"types"`
-	Notes string   `json:"notes,omitempty"`
-}
-
-// GuesserDebugState represents the state of the Guesser in human-readable form.
-type GuesserDebugState struct {
-	Columns map[string]ColumnDebugState `json:"fields"`
-	Errors  map[string]string           `json:"errors,omitempty"`
-}
-
-func (g *Guesser) Debug() *GuesserDebugState {
-	state := GuesserDebugState{
-		Columns: make(map[string]ColumnDebugState),
-		Errors:  make(map[string]string),
-	}
-	for name, col := range g.columns {
-		state.Columns[name] = col.debugState()
-	}
-	for name, err := range g.errors {
-		state.Errors[name] = err
-	}
-	return &state
-}
-
 // This structure holds the inferred schema of the datastream at a
 // specific point in time.
 type Schema struct {
@@ -254,15 +208,6 @@ type Schema struct {
 	upcast  map[string]upcastFunc
 }
 
-// Debug returns a map representation of the schema.
-func (s *Schema) Debug() map[string]string {
-	deb := make(map[string]string)
-	for name, col := range s.columns {
-		deb[name] = col.String()
-	}
-	return deb
-}
-
 // GetSchema returns the best possible version of a schema given our
 // current data type guesses.
 func (g *Guesser) NewSchema() *Schema {
diff --git a/schema/schema_debug.go b/schema/schema_debug.go
new file mode 100644
index 0000000000000000000000000000000000000000..d12d452f26d5d2537cc1a5fe0f6ebefcb6d22821
--- /dev/null
+++ b/schema/schema_debug.go
@@ -0,0 +1,73 @@
+package schema
+
+import "fmt"
+
+func (g typeGuesser) debugState() ColumnDebugState {
+	var n int
+	var ds ColumnDebugState
+	for i := 0; i < maxTypeBits; i++ {
+		j := Type(1 << i)
+		if g.isSet(j) {
+			n++
+			ds.Types = append(ds.Types, j.String())
+		}
+	}
+	if n > 1 {
+		s := "type conflict"
+		if up, _, ok := g.supportedUpcast(); ok {
+			s = fmt.Sprintf("upcast to %s", up.String())
+		}
+		ds.Notes = s
+	}
+	return ds
+}
+
+// ColumnDebugState holds information about a specific column.
+type ColumnDebugState struct {
+	Types []string `json:"types"`
+	Notes string   `json:"notes,omitempty"`
+}
+
+// GuesserDebugState represents the state of the Guesser in human-readable form.
+type GuesserDebugState struct {
+	Columns map[string]ColumnDebugState `json:"fields"`
+	Errors  map[string]string           `json:"errors,omitempty"`
+}
+
+// Debug returns information on the Guesser internals.
+func (g *Guesser) Debug() *GuesserDebugState {
+	state := GuesserDebugState{
+		Columns: make(map[string]ColumnDebugState),
+		Errors:  make(map[string]string),
+	}
+	for name, col := range g.columns {
+		state.Columns[name] = col.debugState()
+	}
+	for name, err := range g.errors {
+		state.Errors[name] = err
+	}
+	return &state
+}
+
+type SchemaFieldDebugInfo struct {
+	Name string `json:"name"`
+	Type string `json:"type"`
+}
+
+type SchemaDebugInfo struct {
+	Version int64                  `json:"version"`
+	Fields  []SchemaFieldDebugInfo `json:"fields"`
+}
+
+// Debug returns a human-friendly representation of the schema.
+func (s *Schema) Debug() *SchemaDebugInfo {
+	var info SchemaDebugInfo
+	info.Version = s.Version
+	for name, col := range s.columns {
+		info.Fields = append(info.Fields, SchemaFieldDebugInfo{
+			Name: name,
+			Type: col.String(),
+		})
+	}
+	return &info
+}
diff --git a/writer/marshal.go b/writer/marshal.go
index 726b8046c23f63cb4a9cbca1205fd70f867b482d..13b765e541f17c480641a205a94adf6505eadc8a 100644
--- a/writer/marshal.go
+++ b/writer/marshal.go
@@ -1,6 +1,7 @@
 package writer
 
 import (
+	"git.autistici.org/ai3/attic/pqlog/schema"
 	"github.com/xitongsys/parquet-go/common"
 	"github.com/xitongsys/parquet-go/layout"
 	"github.com/xitongsys/parquet-go/parquet"
@@ -31,13 +32,13 @@ import (
 // reason NewSchemaHandlerFromSchemaHandler (which gets called in that
 // case) is not 'transparent' and returns something different than the
 // original schema...
-func newTuplesWriter(schemaHandler *pqschema.SchemaHandler, pfile source.ParquetFile, np int64) (*pqwriter.ParquetWriter, error) {
+func newTuplesWriter(schemaHandler *schema.ParquetSchemaHandler, pfile source.ParquetFile, np int64) (*pqwriter.ParquetWriter, error) {
 	res := new(pqwriter.ParquetWriter)
-	res.SchemaHandler = schemaHandler
+	res.SchemaHandler = schemaHandler.SchemaHandler
 	res.PFile = pfile
-	res.PageSize = 8 * 1024              //8K
-	res.RowGroupSize = 128 * 1024 * 1024 //128M
-	res.CompressionType = parquet.CompressionCodec_SNAPPY
+	res.PageSize = 8 * 1024             //8K
+	res.RowGroupSize = 32 * 1024 * 1024 //32M
+	res.CompressionType = parquet.CompressionCodec_ZSTD
 	res.PagesMapBuf = make(map[string][]*layout.Page)
 	res.DictRecs = make(map[string]*layout.DictRecType)
 	res.NP = np
@@ -46,75 +47,88 @@ func newTuplesWriter(schemaHandler *pqschema.SchemaHandler, pfile source.Parquet
 	res.Footer.Schema = append(res.Footer.Schema, res.SchemaHandler.SchemaElements...)
 	res.Offset = 4
 	_, err := res.PFile.Write([]byte("PAR1"))
-	res.MarshalFunc = marshalTuples
+	res.MarshalFunc = genMarshalTuples(schemaHandler)
 	return res, err
 }
 
 // marshalTuples is the Parquet marshaler for the tuplesToRow() format.
-func marshalTuples(records []interface{}, schemaHandler *pqschema.SchemaHandler) (*map[string]*layout.Table, error) {
-	res := make(map[string]*layout.Table)
-	if ln := len(records); ln <= 0 {
-		return &res, nil
-	}
+func genMarshalTuples(pschema *schema.ParquetSchemaHandler) func([]interface{}, *pqschema.SchemaHandler) (*map[string]*layout.Table, error) {
+	return func(records []interface{}, schemaHandler *pqschema.SchemaHandler) (*map[string]*layout.Table, error) {
+		res := make(map[string]*layout.Table)
+		if ln := len(records); ln <= 0 {
+			return &res, nil
+		}
 
-	// Iterate over columns.
-	for i := 0; i < len(records[0].([]interface{})); i++ {
-		pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName
-		table := layout.NewEmptyTable()
-		res[pathStr] = table
-		table.Path = common.StrToPath(pathStr)
+		// Iterate over columns.
+		for i := 0; i < len(records[0].([]interface{})); i++ {
+			pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName
+			table := layout.NewEmptyTable()
+			res[pathStr] = table
+			table.Path = common.StrToPath(pathStr)
 
-		schema := schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
-		isOptional := true
-		if *schema.RepetitionType != parquet.FieldRepetitionType_OPTIONAL {
-			isOptional = false
-		}
+			schema := schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
 
-		if isOptional {
-			table.MaxDefinitionLevel = 1
-		} else {
-			table.MaxDefinitionLevel = 0
-		}
+			table.MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(table.Path)
+			table.MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(table.Path)
+			table.RepetitionType = schema.GetRepetitionType()
+			table.Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
+			table.Info = schemaHandler.Infos[i+1]
+			// Pre-allocate these arrays for efficiency
+			table.Values = make([]interface{}, 0, len(records))
+			table.RepetitionLevels = make([]int32, 0, len(records))
+			table.DefinitionLevels = make([]int32, 0, len(records))
 
-		table.MaxRepetitionLevel = 0
-		table.RepetitionType = parquet.FieldRepetitionType_OPTIONAL
-		table.Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
-		table.Info = schemaHandler.Infos[i+1]
-		// Pre-allocate these arrays for efficiency
-		table.Values = make([]interface{}, 0, len(records))
-		table.RepetitionLevels = make([]int32, 0, len(records))
-		table.DefinitionLevels = make([]int32, 0, len(records))
+			//log.Printf("table %s: max_dl=%d, max_rl=%d", pathStr, table.MaxDefinitionLevel, table.MaxRepetitionLevel)
 
-		// Iterate over records.
-		for j := 0; j < len(records); j++ {
-			var val any
-			if rec := records[j].([]interface{})[i]; rec != nil {
-				//val = types.InterfaceToParquetType(rec, table.Schema.Type)
-				val = rec
-			}
-			switch tval := val.(type) {
-			case []any:
-				// "Unroll" lists into repeated string values.
-				var rl int32
-				if len(tval) < 1 {
-					tval = []any{""}
-				}
-				for _, lval := range tval {
-					table.Values = append(table.Values, lval)
-					table.DefinitionLevels = append(table.DefinitionLevels, 1)
-					table.RepetitionLevels = append(table.RepetitionLevels, rl)
-					rl++
-				}
-			default:
-				table.Values = append(table.Values, val)
-				table.RepetitionLevels = append(table.RepetitionLevels, 0)
-				if val == nil || !isOptional {
-					table.DefinitionLevels = append(table.DefinitionLevels, 0)
+			// Iterate over records.
+			var dl int32 = 1
+			var rl int32 = 0
+			for j := 0; j < len(records); j++ {
+				val := records[j].([]interface{})[i]
+
+				if val == nil {
+					encodeNil(table, pschema.Zero(i), dl, rl)
 				} else {
-					table.DefinitionLevels = append(table.DefinitionLevels, 1)
+					encode(table, val, dl, rl)
 				}
 			}
 		}
+		return &res, nil
+	}
+}
+
+func encode(table *layout.Table, value any, dl, rl int32) {
+	switch tvalue := value.(type) {
+	case []any:
+		encodeSlice(table, tvalue, dl, rl)
+	default:
+		encodeValue(table, value, dl, rl)
+	}
+}
+
+func encodeValue(table *layout.Table, value any, dl, rl int32) {
+	//value = types.InterfaceToParquetType(value, table.Schema.Type)
+	table.Values = append(table.Values, value)
+	table.DefinitionLevels = append(table.DefinitionLevels, dl)
+	table.RepetitionLevels = append(table.RepetitionLevels, rl)
+}
+
+func encodeNil(table *layout.Table, value any, dl, rl int32) {
+	table.Values = append(table.Values, value)
+	table.DefinitionLevels = append(table.DefinitionLevels, dl-1)
+	table.RepetitionLevels = append(table.RepetitionLevels, rl)
+}
+
+func encodeSlice(table *layout.Table, l []any, dl, rl int32) {
+	if len(l) == 0 {
+		encode(table, nil, dl, rl)
+		return
+	}
+	for i := 0; i < len(l); i++ {
+		var nodeRL = rl + 1
+		if i == 0 {
+			nodeRL = rl
+		}
+		encodeValue(table, l[i], dl, nodeRL)
 	}
-	return &res, nil
 }
diff --git a/writer/source.go b/writer/source.go
new file mode 100644
index 0000000000000000000000000000000000000000..b449dfcfd2d308319d66b889593a3e804d36f826
--- /dev/null
+++ b/writer/source.go
@@ -0,0 +1,27 @@
+package writer
+
+import (
+	"os"
+
+	"github.com/xitongsys/parquet-go-source/local"
+	"github.com/xitongsys/parquet-go/source"
+)
+
+type Source interface {
+	Create(string) (source.ParquetFile, error)
+	Rename(string, string) error
+}
+
+type localSource struct{}
+
+func NewLocalSource() Source {
+	return new(localSource)
+}
+
+func (l *localSource) Create(path string) (source.ParquetFile, error) {
+	return local.NewLocalFileWriter(path)
+}
+
+func (l *localSource) Rename(old, new string) error {
+	return os.Rename(old, new)
+}
diff --git a/writer/writer.go b/writer/writer.go
index 223338a71b1e31355a83dccfd3c88563e4bc89c3..fdc8173efab55bb30863e35350af64792174407a 100644
--- a/writer/writer.go
+++ b/writer/writer.go
@@ -5,16 +5,15 @@ import (
 	"log"
 	"time"
 
+	"git.autistici.org/ai3/attic/pqlog/schema"
 	"git.autistici.org/ai3/attic/pqlog/writer/pathgen"
-	"github.com/xitongsys/parquet-go-source/local"
-	pqschema "github.com/xitongsys/parquet-go/schema"
 	"github.com/xitongsys/parquet-go/source"
 	pqwriter "github.com/xitongsys/parquet-go/writer"
 )
 
 // Writer is the interface for our restartable ParquetWriter.
 type Writer interface {
-	Reopen(*pqschema.SchemaHandler) error
+	Reopen(*schema.ParquetSchemaHandler) error
 	Write(any) error
 	Close()
 }
@@ -23,15 +22,18 @@ type Writer interface {
 // Parquet files, switching to a new one whenever the schema changes.
 // It guarantees the creation of consistent Parquet files.
 type ParquetWriter struct {
-	pg *pathgen.Generator
+	pg  *pathgen.Generator
+	src Source
 
-	lw source.ParquetFile
-	pw *pqwriter.ParquetWriter
+	lw       source.ParquetFile
+	pw       *pqwriter.ParquetWriter
+	renameTo string
 }
 
-func New(pg *pathgen.Generator) *ParquetWriter {
+func New(src Source, pg *pathgen.Generator) *ParquetWriter {
 	return &ParquetWriter{
-		pg: pg,
+		src: src,
+		pg:  pg,
 	}
 }
 
@@ -42,14 +44,22 @@ func (w *ParquetWriter) Write(obj any) error {
 func (w *ParquetWriter) Close() {
 	if w.pw != nil {
 		w.pw.WriteStop()
+		w.pw = nil
 	}
 	if w.lw != nil {
 		w.lw.Close()
+		w.lw = nil
+	}
+	if w.renameTo != "" {
+		if err := w.src.Rename(w.renameTo+".tmp", w.renameTo); err != nil {
+			log.Printf("warning: could not finalize parquet file %s: %v", w.renameTo, err)
+		}
+		w.renameTo = ""
 	}
 }
 
 // (Re)open the output with a new schema.
-func (w *ParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error {
+func (w *ParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error {
 	w.Close()
 
 	path, err := w.pg.Next()
@@ -59,10 +69,11 @@ func (w *ParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error {
 
 	log.Printf("creating parquet file %s (%d fields)", path, len(schemaHandler.Infos)-1)
 
-	w.lw, err = local.NewLocalFileWriter(path)
+	w.lw, err = w.src.Create(path + ".tmp")
 	if err != nil {
 		return fmt.Errorf("can't create Parquet file: %w", err)
 	}
+	w.renameTo = path
 
 	w.pw, err = newTuplesWriter(schemaHandler, w.lw, 4)
 	if err != nil {
@@ -71,40 +82,47 @@ func (w *ParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error {
 	return nil
 }
 
+const maxRecordsPerFile = 1000000
+
 // A RotatingParquetWriter will rotate the output file on a specific period.
 type RotatingParquetWriter struct {
 	*ParquetWriter
+
+	counter       int64
 	period        time.Duration
 	deadline      time.Time
-	schemaHandler *pqschema.SchemaHandler
+	schemaHandler *schema.ParquetSchemaHandler
 }
 
-func NewRotating(pg *pathgen.Generator, period time.Duration) *RotatingParquetWriter {
+func NewRotating(src Source, pg *pathgen.Generator, period time.Duration) *RotatingParquetWriter {
 	return &RotatingParquetWriter{
-		ParquetWriter: New(pg),
+		ParquetWriter: New(src, pg),
 		period:        period,
 	}
 }
 
 func (w *RotatingParquetWriter) Write(obj any) error {
 	now := time.Now()
+	w.counter++
 
-	if now.After(w.deadline) {
+	if now.After(w.deadline) || w.counter > maxRecordsPerFile {
 		if err := w.Reopen(w.schemaHandler); err != nil {
 			return err
 		}
 
-		// Add one minute to avoid hitting the same value.
-		w.deadline = now.Add(1 * time.Minute).Round(w.period)
+		// Round to period for more accurate alignment.
+		w.deadline = now.Add(w.period).Round(w.period)
+		log.Printf("rotating output, new deadline=%v", w.deadline)
 	}
 
 	return w.ParquetWriter.Write(obj)
 }
 
-func (w *RotatingParquetWriter) Reopen(schemaHandler *pqschema.SchemaHandler) error {
+func (w *RotatingParquetWriter) Reopen(schemaHandler *schema.ParquetSchemaHandler) error {
 	if err := w.ParquetWriter.Reopen(schemaHandler); err != nil {
 		return err
 	}
 	w.schemaHandler = schemaHandler
+	w.counter = 0
 	return nil
 }