journal2clickhouse: init
This commit is contained in:
9 changed files with 597 additions and 1 deletions
@ -9,4 +9,5 @@ args: {
streetworks = import ./streetworks args;
trains = import ./trains args;
nhsenglandtests = import ./nhsenglandtests args;
journal2clickhouse = import ./journal2clickhouse args;
Normal file
Normal file
@ -0,0 +1,10 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <>
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.program {
name = "journal2clickhouse";
srcs = [ ./journal2clickhouse.go ];
deps = with depot.third_party; [];
Normal file
Normal file
@ -0,0 +1,488 @@
package main
import (
type transformFunc func(interface{}) (interface{}, error)
func syslogDate(x interface{}) (interface{}, error) {
dateStr, ok := x.(string)
if !ok {
return nil, fmt.Errorf("can't handle parsing type %T as string", x)
t, err := time.Parse(time.Stamp, strings.TrimSpace(dateStr))
if err != nil {
return nil, fmt.Errorf("parsing syslog %q timestamp as date: %w", dateStr, err)
// The year won't be set, so try to reconstruct it...
now := time.Now()
tYear := now.Year()
switch {
case now.Month() == time.December && t.Month() == time.January:
// If 'now' is December, but t is January, then t is next year.
case now.Month() == time.January && t.Month() == time.December:
// Similarly, if 'now' is January but t is December, t is last year.
// Set the year. We do this by reparsing the string to avoid... unexpected events around e.g. leap years.
t, err = time.Parse(time.Stamp+" 2006", fmt.Sprintf("%s %d", strings.TrimSpace(dateStr), tYear))
if err != nil {
return nil, fmt.Errorf("re-parsing syslog %q timestamp as date: %w", dateStr, err)
return t.Unix(), nil
func transformToInt(x interface{}) (interface{}, error) {
switch x := x.(type) {
case string:
return strconv.ParseInt(x, 10, 64)
case int:
return x, nil
return nil, fmt.Errorf("can't handle converting type %T to int", x)
var fieldTransform = map[string]transformFunc{
"priority": transformToInt,
"errno": transformToInt,
"syslog_facility": transformToInt,
"syslog_pid": transformToInt,
"tid": transformToInt,
"pid": transformToInt,
"uid": transformToInt,
"gid": transformToInt,
"audit_session": transformToInt,
"audit_loginuid": transformToInt,
"systemd_owner_uid": transformToInt,
"monotonic_timestamp": transformToInt,
"syslog_timestamp": syslogDate,
"source_realtime_timestamp": transformToInt, // microseconds
"realtime_timestamp": transformToInt, // microseconds
var fieldMap = map[string]string{
"MESSAGE": "message",
"PRIORITY": "priority",
"CODE_FILE": "code_file",
"CODE_LINE": "code_line",
"CODE_FUNC": "code_func",
"ERRNO": "errno",
"INVOCATION_ID": "invocation_id",
"USER_INVOCATION_ID": "user_invocation_id",
"SYSLOG_FACILITY": "syslog_facility",
"SYSLOG_IDENTIFIER": "syslog_identifier",
"SYSLOG_PID": "syslog_pid",
"SYSLOG_TIMESTAMP": "syslog_timestamp",
"TID": "tid",
"_PID": "pid",
"_UID": "uid",
"_GID": "gid",
"_COMM": "comm",
"_EXE": "exe",
"_CMDLINE": "cmdline",
"_AUDIT_SESSION": "audit_session",
"_AUDIT_LOGINUID": "audit_loginuid",
"_SYSTEMD_CGROUP": "systemd_cgroup",
"_SYSTEMD_SLICE": "systemd_slice",
"_SYSTEMD_UNIT": "systemd_unit",
"_SYSTEMD_USER_UNIT": "systemd_user_unit",
"_SYSTEMD_USER_SLICE": "systemd_user_slice",
"_SYSTEMD_SESSION": "systemd_session",
"_SYSTEMD_OWNER_UID": "systemd_owner_uid",
"_SOURCE_REALTIME_TIMESTAMP": "source_realtime_timestamp",
"_BOOT_ID": "boot_id",
"_MACHINE_ID": "machine_id",
"_SYSTEMD_INVOCATION_ID": "systemd_invocation_id",
"_HOSTNAME": "hostname",
"_TRANSPORT": "transport",
"_STREAM_ID": "stream_id",
"_LINE_BREAK": "line_break",
"_NAMESPACE": "namespace",
"_KERNEL_DEVICE": "kernel_device",
"_KERNEL_SUBSYSTEM": "kernel_subsystem",
"_UDEV_SYSNAME": "udev_sysname",
"_UDEV_DEVNODE": "udev_devnode",
"_UDEV_DEVLINK": "udev_devlink",
"__CURSOR": "cursor",
"__REALTIME_TIMESTAMP": "realtime_timestamp",
"__MONOTONIC_TIMESTAMP": "monotonic_timestamp",
func doInsert(ctx context.Context, chAddr string, table string, rows []interface{}) error {
var reqPayload bytes.Buffer
reqEnc := json.NewEncoder(&reqPayload)
for _, r := range rows {
if err := reqEnc.Encode(r); err != nil {
return fmt.Errorf("marshalling row: %w", err)
req, err := http.NewRequestWithContext(ctx, "POST", chAddr, &reqPayload)
if err != nil {
return fmt.Errorf("creating request: %w", err)
q := req.URL.Query()
q.Set("query", fmt.Sprintf("INSERT INTO %v FORMAT JSONEachRow", table))
req.URL.RawQuery = q.Encode()
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("do-ing request: %w", err)
defer resp.Body.Close()
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
io.Copy(os.Stderr, resp.Body)
return fmt.Errorf("unexpected HTTP response %d", resp.StatusCode)
return nil
func insertIntoClickhouse(ctx context.Context, chAddr string, rows []interface{}) error {
mostRecent := map[string]string{}
for _, row := range rows {
row := row.(map[string]interface{})
hostname := row["scraped_hostname"].(string)
cursor := row["cursor"].(string)
mostRecent[hostname] = cursor
type mostRecentRow struct {
Hostname string `json:"hostname"`
LastCursor string `json:"last_cursor"`
var mostRecentRows []interface{}
for hostname, lastCursor := range mostRecent {
mostRecentRows = append(mostRecentRows, mostRecentRow{hostname, lastCursor})
if err := doInsert(ctx, chAddr, "logger.systemd", rows); err != nil {
return fmt.Errorf("inserting rows into logger.systemd: %w", err)
return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows)
func parseLog(d map[string]interface{}) (map[string]interface{}, error) {
data := make(map[string]interface{})
extraData := make(map[string]interface{})
msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; MESSAGE: %v", data["_HOSTNAME"], data["__REALTIME_TIMESTAMP"], data["MESSAGE"])
for origField, newField := range fieldMap {
v, ok := d[origField]
if !ok {
// Check if this is a slice...
if vs, ok := v.([]interface{}); ok {
// extraData gets all values after the first one, we keep the first one.
log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField)
extraData[origField] = vs[1:]
v = vs[0]
transformer := fieldTransform[newField]
if transformer != nil {
var err error
ov := v
v, err = transformer(v)
if err != nil {
return nil, fmt.Errorf("[%v] transforming origField %v newField %v (value %v): %w", msgID, origField, newField, ov, err)
delete(d, origField)
data[newField] = v
// Anything left in d gets punted into extraData.
// Note that we might overwrite things: if we fail to transform a value that was originally in a slice, then we want to overwrite the value we originally put into extraData.
for fn, v := range d {
extraData[fn] = v
edJSON, err := json.Marshal(extraData)
if err != nil {
return nil, fmt.Errorf("[%v] encoding extraData as JSON: %w", msgID, err)
data["extra_data_json"] = string(edJSON)
return data, nil
func aggregator(ctx context.Context, flushEntries int, flushInterval time.Duration, chAddr string, inputCh <-chan interface{}) error {
log.Printf("aggregator starting, flushEntries=%d, flushInterval=%v", flushEntries, flushInterval)
defer log.Println("aggregator terminating")
t := time.NewTicker(flushInterval)
defer t.Stop()
var buf []interface{}
flushBuffer := func(ctx context.Context, reason string) error {
t.Reset(20 * time.Minute)
defer t.Reset(flushInterval)
if len(buf) == 0 {
// Fast return; nothing to do.
return nil
log.Printf("aggregator flushing buffer (%d entries) because %v", len(buf), reason)
if err := insertIntoClickhouse(ctx, chAddr, buf); err != nil {
log.Printf("aggregator failed to flush buffer: %v", err)
// For now, never return an error; we want to retry on either the next tick or the next log entry.
return nil
// Clear the buffer so we can start again.
buf = nil
return nil
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
if err := flushBuffer(ctx, "interval"); err != nil {
return err
case d, ok := <-inputCh:
if !ok {
// Input closed, we're done here.
log.Printf("aggregator input closed, trying to shut down (%d events in buffer)", len(buf))
break aggLoop
buf = append(buf, d)
if len(buf) >= flushEntries {
if err := flushBuffer(ctx, "entries"); err != nil {
return err
for n := 0; n < 5; n++ {
if err := flushBuffer(ctx, "shutting down"); err == nil {
return nil
} else {
log.Printf("aggregator trying to shut down: flush failed: %v", err)
return flushBuffer(ctx, "shutting down - final attempt")
func mostRecentCursor(ctx context.Context, clickhouseAddr, hostname string) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", clickhouseAddr, nil)
if err != nil {
return "", fmt.Errorf("creating request: %w", err)
q := req.URL.Query()
q.Set("query", fmt.Sprintf("SELECT last_cursor FROM logger.systemd_scrape WHERE hostname='%v' ORDER BY last_scrape DESC LIMIT 1 FORMAT JSONEachRow", hostname))
req.URL.RawQuery = q.Encode()
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("do-ing request: %w", err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected HTTP status %d", resp.StatusCode)
type lastCursor struct {
LastCursor string `json:"last_cursor"`
var lc lastCursor
err = json.NewDecoder(resp.Body).Decode(&lc)
switch {
case err == io.EOF:
return "", nil
case err != nil:
return "", err
return lc.LastCursor, nil
func fetcher(ctx context.Context, hostname, remoteAddr string, retryMinDuration, retryMaxDuration time.Duration, clickhouseAddr string, aggCh chan<- interface{}) (err error) {
log.Printf("fetcher[%v] starting", hostname)
defer func() { log.Printf("fetcher[%v] terminating: %v", hostname, err) }()
var sleepDuration time.Duration
cursor, err := mostRecentCursor(ctx, clickhouseAddr, hostname)
if err != nil {
return fmt.Errorf("mostRecentCursor: %w", err)
// Continuously read JSON messages from the remote.
for {
if ctx.Err() != nil {
return ctx.Err()
if sleepDuration > 0 {
log.Printf("fetcher[%v] waiting for %v before retry", hostname, sleepDuration)
if ctx.Err() != nil {
return ctx.Err()
if sleepDuration == 0 {
sleepDuration = retryMinDuration
} else {
sleepDuration = 2 * sleepDuration
if sleepDuration > retryMaxDuration {
sleepDuration = retryMaxDuration
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%v/entries?follow", remoteAddr), nil)
if err != nil {
// ??? This seems un-recoverable.
return fmt.Errorf("creating HTTP request: %w", err)
if cursor != "" {
log.Printf("fetcher[%v] resuming from %v", hostname, cursor)
req.Header.Set("Range", fmt.Sprintf("entries=%v", cursor))
} else {
log.Printf("fetcher[%v] starting from scratch", hostname)
req.Header.Set("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("fetcher[%v] fetching log entries: %v", hostname, err)
continue retryLoop
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("fetcher[%v] unexpected status code %d", hostname, resp.StatusCode)
continue retryLoop
d := make(map[string]interface{})
j := json.NewDecoder(resp.Body)
for j.More() {
if err := j.Decode(&d); err != nil {
log.Printf("fetcher[%v] JSON decode: %v", hostname, err)
continue retryLoop
data, err := parseLog(d)
if err != nil {
log.Printf("fetcher[%v] parsing log: %v", hostname, err)
continue retryLoop
if cursor == data["cursor"].(string) {
// Skip over the last log entry we'd seen previously.
sleepDuration = 0
data["scraped_hostname"] = hostname
cursor = data["cursor"].(string)
aggCh <- data
log.Printf("fetcher[%v] JSON decoder says there's no more content", hostname)
var (
flushBufferSizeFlag = flag.Int("flush_buffer_size", 10000, "Number of entries the buffer can reach before attempting to flush")
flushBufferIntervalFlag = flag.Duration("flush_buffer_duration", 500*time.Millisecond, "Interval between buffer flushes")
hostsFlag = flag.String("hosts", "porcorosso=localhost:19531", "Comma-separated <hostname>=<address> pairs to scrape logs from")
clickhouseAddrFlag = flag.String("clickhouse_addr", "http://localhost:8123", "Address of Clickhouse HTTP API")
fetcherRetryMin = flag.Duration("fetcher_retry_minimum", 20*time.Millisecond, "Minimum retry interval for fetching logs")
fetcherRetryMax = flag.Duration("fetcher_retry_maximum", 5*time.Minute, "Maximum retry interval for fetching logs (after backoff)")
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetchCtx, fetchCancel := signal.NotifyContext(ctx, os.Interrupt)
defer fetchCancel()
// Parse *hostsFlag.
fail := false
scrapeHosts := make(map[string]string)
for _, hostPair := range strings.Split(*hostsFlag, ",") {
// Ideally this would use strings.Cut, but I'm still on Go 1.16...
bits := strings.SplitN(hostPair, "=", 2)
if len(bits) != 2 {
log.Println("invalid host specification %q (expected <host>=<address>)", hostPair)
fail = true
scrapeHosts[bits[0]] = bits[1]
if fail {
var wg sync.WaitGroup
var fetchWG sync.WaitGroup
aggCh := make(chan interface{})
go func() {
go func() {
defer wg.Done()
aggregator(ctx, *flushBufferSizeFlag, *flushBufferIntervalFlag, *clickhouseAddrFlag, aggCh)
for hostname, remoteAddr := range scrapeHosts {
go func() {
defer wg.Done()
defer fetchWG.Done()
fetcher(fetchCtx, hostname, remoteAddr, *fetcherRetryMin, *fetcherRetryMax, *clickhouseAddrFlag, aggCh)
log.Println("context complete; waiting for fetcher goroutines to stop")
log.Println("context complete; waiting for aggregator goroutine to stop")
log.Println("buffers flushed")
Normal file
Normal file
@ -0,0 +1,67 @@
CREATE TABLE systemd_scrape
`hostname` String,
`last_scrape` DateTime DEFAULT now(),
`last_cursor` String
) ENGINE = ReplacingMergeTree(last_scrape)
ORDER BY hostname;
`uuid` UUID DEFAULT generateUUIDv4(),
`scraped_hostname` String DEFAULT '',
`cursor` String DEFAULT '',
`message` String DEFAULT '',
`priority` Int8 DEFAULT -1,
`code_file` String DEFAULT '',
`code_line` String DEFAULT '',
`code_func` String DEFAULT '',
`errno` Int8 DEFAULT -1,
`invocation_id` String DEFAULT '',
`user_invocation_id` String DEFAULT '',
`syslog_facility` Int8 DEFAULT -1,
`syslog_identifier` LowCardinality(String) DEFAULT '',
`syslog_pid` Int64 DEFAULT -1,
`syslog_timestamp` DateTime('UTC'),
`tid` Int64 DEFAULT -1,
`pid` Int64 DEFAULT -1,
`uid` Int64 DEFAULT -1,
`gid` Int64 DEFAULT -1,
`comm` LowCardinality(String) DEFAULT '',
`exe` LowCardinality(String) DEFAULT '',
`cmdline` LowCardinality(String) DEFAULT '',
`audit_session` UInt32 DEFAULT -1,
`audit_loginuid` Int64 DEFAULT -1,
`systemd_cgroup` LowCardinality(String) DEFAULT '',
`systemd_slice` LowCardinality(String) DEFAULT '',
`systemd_unit` LowCardinality(String) DEFAULT '',
`systemd_user_slice` LowCardinality(String) DEFAULT '',
`systemd_user_unit` LowCardinality(String) DEFAULT '',
`systemd_session` LowCardinality(String) DEFAULT '',
`systemd_owner_uid` Int64 DEFAULT -1,
`source_realtime_timestamp` DateTime64(6, 'UTC'),
`boot_id` LowCardinality(String) DEFAULT '',
`machine_id` LowCardinality(String) DEFAULT '',
`systemd_invocation_id` LowCardinality(String) DEFAULT '',
`hostname` LowCardinality(String) DEFAULT '',
`transport` LowCardinality(String) DEFAULT '',
`stream_id` LowCardinality(String) DEFAULT '',
`line_break` LowCardinality(String) DEFAULT '',
`namespace` LowCardinality(String) DEFAULT '',
`kernel_device` LowCardinality(String) DEFAULT '',
`kernel_subsystem` LowCardinality(String) DEFAULT '',
`udev_sysname` LowCardinality(String) DEFAULT '',
`udev_devnode` LowCardinality(String) DEFAULT '',
`udev_devlink` LowCardinality(String) DEFAULT '',
`realtime_timestamp` DateTime64(6, 'UTC'),
`monotonic_timestamp` UInt64,
`extra_data_json` String DEFAULT ''
) ENGINE = MergeTree()
ORDER BY (scraped_hostname, realtime_timestamp, cursor)
PARTITION BY toYYYYMM(realtime_timestamp)
TTL toDate(realtime_timestamp) + INTERVAL 90 DAY;
@ -31,5 +31,17 @@ in {
services.clickhouse.enable = true;
time.timeZone = "Etc/UTC";
|||| = {
enable = true;
after = [ "" ];
serviceConfig = {
ExecStart = let
hosts = lib.concatStringsSep "," (lib.mapAttrsToList (k: v: "${k}=${v.addr}") depot.ops.nixos.scrapeJournalHosts);
"${depot.go.journal2clickhouse}/bin/journal2clickhouse -clickhouse_addr http://localhost:8123 -hosts '${hosts}'";
User = "journal2clickhouse";
system.stateVersion = "21.11";
@ -69,6 +69,8 @@ let
stockExporters ++ customExporters;
builtins.listToAttrs (builtins.concatLists (lib.mapAttrsToList exportersForSystem evaledSystems));
scrapeJournalHosts =
lib.filterAttrs (n: v: v.enable) (lib.mapAttrs (n: v: evaledSystems);
netbootSystem = systemFor "netboot" (import ./netboot);
installcdSystem = systemFor "installcd" (import ./installcd);
@ -77,6 +79,7 @@ in systemDrvs // {
systemConfigs = evaledSystems;
systemExporters = systemExporters;
tailscaleIPs = systemTailscaleIPs;
scrapeJournalHosts = scrapeJournalHosts;
netboot =;
installcd =;
@ -204,6 +204,8 @@ in {
mandatoryFeatures = [ ];
my.scrapeJournal.enable = false; # Laptop, don't pull too much.
# This value determines the NixOS release with which your system is to be
# compatible, in order to avoid breaking some software such as database
# servers. You should change this only after NixOS release notes say you
@ -56,6 +56,15 @@ in
default = "";
|||| = lib.mkOption {
type = lib.types.bool;
default = != null;
|||| = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = "${}:19531";
config = {
hardware.enableRedistributableFirmware = true;
@ -251,9 +260,11 @@ in
services.fwupd.enable = true;
# This is enabled independently of my.scrapeJournal.enable.
services.journald.enableHttpGateway = != null;
systemd.sockets.systemd-journal-gatewayd.socketConfig = lib.optionalAttrs ( != null) {
ListenStream = [ "" "${}:19531" "" "[::1]:19531" ];
ListenStream = [ "" "${}:19531" ];
FreeBind = true;
@ -328,6 +328,8 @@ in {
my.scrapeJournal.enable = false; # Laptop, don't pull too much.
# This value determines the NixOS release with which your system is to be
# compatible, in order to avoid breaking some software such as database
# servers. You should change this only after NixOS release notes say you
Reference in a new issue