diff --git a/roles/float-infra-log-collector/files/clickhouse.bootstrap.sql b/roles/float-infra-log-collector/files/clickhouse.bootstrap.sql
index c3b11224de20d4a103e848aed8dab96f3c109d22..9a4da0d00bc19627afbb36c6675bb43d9e4baa68 100644
--- a/roles/float-infra-log-collector/files/clickhouse.bootstrap.sql
+++ b/roles/float-infra-log-collector/files/clickhouse.bootstrap.sql
@@ -2,33 +2,27 @@ CREATE DATABASE IF NOT EXISTS `logs`;
 
 SET allow_experimental_object_type=1;
 
-CREATE TABLE IF NOT EXISTS logs.structured (
-  timestamp Date,
-  event JSON
-) ENGINE = MergeTree()
-  PARTITION BY toYYYYMMDD(timestamp)
-  ORDER BY (timestamp);
-
 CREATE TABLE IF NOT EXISTS logs.syslog (
-  timestamp Date,
+  timestamp DateTime,
   facility LowCardinality(String),
   severity LowCardinality(String),
   hostname LowCardinality(String),
   program String,
   tag String,
-  message String
+  message String,
+  data JSON
 ) ENGINE = MergeTree()
   PARTITION BY toYYYYMMDD(timestamp)
   ORDER BY (timestamp);
 
 CREATE TABLE IF NOT EXISTS logs.http (
-  timestamp Date,
+  timestamp DateTime,
   hostname LowCardinality(String),
   method LowCardinality(String),
   vhost String,
   uri String,
-  status Integer,
-  bytes Integer,
+  status UInt16,
+  bytes UInt64,
   referer String,
   user_agent String
 ) ENGINE = MergeTree()
diff --git a/roles/float-infra-log-collector/templates/rsyslog-collector.conf.j2 b/roles/float-infra-log-collector/templates/rsyslog-collector.conf.j2
index 374dbd7f69abab479d56cfc8fac041f6e1901e5c..68cf5318d403fc259ea865f0bb46089bdda0fe83 100644
--- a/roles/float-infra-log-collector/templates/rsyslog-collector.conf.j2
+++ b/roles/float-infra-log-collector/templates/rsyslog-collector.conf.j2
@@ -45,7 +45,7 @@ module(
   load="omclickhouse"
 )
 template(name="clickhouseSyslog" type="list" option.stdsql="on") {
-  constant(value="INSERT INTO logs.syslog (timestamp, hostname, facility, severity, program, tag, message) VALUES ('")
+  constant(value="INSERT INTO logs.syslog (timestamp, hostname, facility, severity, program, tag, message, data) VALUES ('")
   property(name="timereported" dateFormat="pgsql" date.inUTC="on")
   constant(value="','")
   property(name="hostname")
@@ -59,6 +59,8 @@ template(name="clickhouseSyslog" type="list" option.stdsql="on") {
   property(name="syslogtag")
   constant(value="','")
   property(name="msg")
+  constant(value="','")
+  property(name="$!")
   constant(value="')")
 }
 
@@ -85,6 +87,10 @@ template(name="clickhouseHTTP" type="list" option.stdsql="on") {
 }
 {% endif %}
 
+module(
+  load="mmrm1stspace"
+)
+
 module(
   load="mmjsonparse"
 )
@@ -109,6 +115,8 @@ include(
 # - autodetect Lumberjack structured logs and parse them
 # - forward everything to Elasticsearch
 ruleset(name="incoming"){
+  action(type="mmrm1stspace")
+
   # Anonymize logs here.
   # TODO: whitelist the log sources that need anonymization (mail services).
   action(type="mmanon"
@@ -129,26 +137,10 @@ ruleset(name="incoming"){
     stop
   }
 
-  if (substring($msg, 1, 5) == "@cee:") then {
-    action(type="mmjsonparse")
-    if ($syslogfacility-text == "auth" and $programname == "audit") then {
-      # Structured audit logs go to a dedicated Elasticsearch index.
-      stop
-    } else {
-      # Extension point for rules applying to structured logs.
-      include(
-        file="/etc/rsyslog-collector/rules-structured.d/*.conf"
-        mode="optional"
-      )
-
-      # Normal structured log present in the default syslog flow. Send
-      # straight to Elasticsearch, skipping the log normalization step.
-      stop
-    }
-  } else if ($syslogfacility-text == "local3") then {
-    # HTTP logs from the front-end. Run it through mmnormalize to
-    # convert the standard CommonLog format into JSON, then send it to
-    # Elasticsearch.
+  # HTTP logs from the front-end. Run it through mmnormalize to
+  # convert the standard CommonLog format into JSON, then send it to
+  # indexing.
+  if ($syslogfacility-text == "local3") then {
     action(type="mmnormalize"
            rulebase="/etc/rsyslog-collector-lognorm/http.rb")
     # Anonymize sso_login requests by dropping the query string.
@@ -171,11 +163,24 @@ ruleset(name="incoming"){
            queue.mindequeuebatchsize.timeout="3000"
            queue.filename="clickhouse-http"
            action.resumeretrycount="-1")
+    stop
+  }
+
+  # Structured logs and unstructured logs end up in the same indexed table,
+  # they differ just in the way the attributes are generated.
+  if (substring($msg, 0, 5) == "@cee:") then {
+    action(type="mmjsonparse")
+    unset $!msg;
+
+    # Extension point for rules applying to structured logs.
+    include(
+      file="/etc/rsyslog-collector/rules-structured.d/*.conf"
+      mode="optional"
+    )
   } else {
     # Traditional syslog message. Run it through mmnormalize to
     # extract interesting bits of metadata according to user-defined
-    # patterns (a bit like logstash), then send the result as JSON to
-    # Elasticsearch.
+    # patterns (a bit like logstash).
 
     # Apply any blacklists first.
 {% for expr in log_collector_filter_exprs|default([]) %}
@@ -196,30 +201,34 @@ ruleset(name="incoming"){
            rulebase="/etc/rsyslog-collector-lognorm/auth.rb")
     action(type="mmnormalize"
            rulebase="/etc/rsyslog-collector-lognorm/postfix.rb")
-    # Drop these fields as they're just duplicating the original message.
-    unset $!originalmsg;
-    unset $!unparsed-data;
-    # Slightly silly: we have to set a variable anyway in the
-    # resulting JSON otherwise the esTemplate won't be syntactially
-    # valid and ES will refuse it.
-    # set $!ignore = "1";
-    action(type="omclickhouse"
-           server="127.0.0.1"
-           port="9780"
-           usehttps="off"
-           timeout="10000"
-           template="clickhouseSyslog"
-           user="{{ clickhouse_username }}"
-           pwd="{{ clickhouse_password }}"
-           maxBytes="20M"
-           queue.type="linkedlist"
-           queue.size="1000"
-           queue.dequeuebatchsize="1000"
-           queue.mindequeuebatchsize="100"
-           queue.mindequeuebatchsize.timeout="3000"
-           queue.filename="clickhouse-syslog"
-           action.resumeretrycount="-1")
   }
+
+  # Drop these fields as they're just duplicating the original message.
+  unset $!originalmsg;
+  unset $!unparsed-data;
+
+  # Slightly silly: we have to set a variable anyway in the
+  # resulting JSON otherwise the esTemplate won't be syntactially
+  # valid and ES will refuse it.
+  # set $!ignore = "1";
+
+  # Send the log to the index.
+  action(type="omclickhouse"
+         server="127.0.0.1"
+         port="9780"
+         usehttps="off"
+         timeout="10000"
+         template="clickhouseSyslog"
+         user="{{ clickhouse_username }}"
+         pwd="{{ clickhouse_password }}"
+         maxBytes="20M"
+         queue.type="linkedlist"
+         queue.size="1000"
+         queue.dequeuebatchsize="1000"
+         queue.mindequeuebatchsize="100"
+         queue.mindequeuebatchsize.timeout="3000"
+         queue.filename="clickhouse-syslog"
+         action.resumeretrycount="-1")
 {% endif %}
 }
 
diff --git a/services.yml.default b/services.yml.default
index e5d69027e8277b544a4f15db88ca70767bcad717..e4403cdae6d40dc8ff5c68e0e08b9289338ec924 100644
--- a/services.yml.default
+++ b/services.yml.default
@@ -88,10 +88,14 @@ log-collector-e2e:
   scheduling_group: all
   containers:
     - name: prober
-      image: registry.git.autistici.org/ai3/tools/dye-injector:master
+      image: registry.git.autistici.org/ai3/tools/dye-injector:clickhouse
       port: 7094
       env:
         ADDR: ":7094"
+        DRIVER: "clickhouse"
+        CLICKHOUSE_ADDR: "log-collector.{{ domain }}:9700"
+        CLICKHOUSE_USER: "clickhouse"
+        CLICKHOUSE_PASSWORD: "{{ clickhouse_password }}"
   monitoring_endpoints:
     - name: log-collector-e2e-prober
       port: 7094