depot/go/journal2clickhouse/journal2clickhouse.go
Luke Granger-Brown 086f5fe597 journal2clickhouse: coerce things that look like strings back to strings
There's binary data sometimes, but on the whole I don't care about preserving
it properly (sorry), so let's just coerce it to a string if it is supposed to
go into a "proper" field.
2022-01-01 16:30:38 +00:00

540 lines
16 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
)
type transformFunc func(interface{}) (interface{}, error)
func syslogDate(x interface{}) (interface{}, error) {
dateStr, ok := x.(string)
if !ok {
return nil, fmt.Errorf("can't handle parsing type %T as string", x)
}
t, err := time.Parse(time.Stamp, strings.TrimSpace(dateStr))
if err != nil {
return nil, fmt.Errorf("parsing syslog %q timestamp as date: %w", dateStr, err)
}
// The year won't be set, so try to reconstruct it...
now := time.Now()
tYear := now.Year()
switch {
case now.Month() == time.December && t.Month() == time.January:
// If 'now' is December, but t is January, then t is next year.
tYear++
case now.Month() == time.January && t.Month() == time.December:
// Similarly, if 'now' is January but t is December, t is last year.
tYear--
}
// Set the year. We do this by reparsing the string to avoid... unexpected events around e.g. leap years.
t, err = time.Parse(time.Stamp+" 2006", fmt.Sprintf("%s %d", strings.TrimSpace(dateStr), tYear))
if err != nil {
return nil, fmt.Errorf("re-parsing syslog %q timestamp as date: %w", dateStr, err)
}
return t.Unix(), nil
}
func transformToInt(x interface{}) (interface{}, error) {
switch x := x.(type) {
case string:
return strconv.ParseInt(x, 10, 64)
case int:
return x, nil
}
return nil, fmt.Errorf("can't handle converting type %T to int", x)
}
var fieldTransform = map[string]transformFunc{
"priority": transformToInt,
"errno": transformToInt,
"syslog_facility": transformToInt,
"syslog_pid": transformToInt,
"tid": transformToInt,
"pid": transformToInt,
"uid": transformToInt,
"gid": transformToInt,
"audit_session": transformToInt,
"audit_loginuid": transformToInt,
"systemd_owner_uid": transformToInt,
"monotonic_timestamp": transformToInt,
"syslog_timestamp": syslogDate,
"source_realtime_timestamp": transformToInt, // microseconds
"realtime_timestamp": transformToInt, // microseconds
}
var fieldMap = map[string]string{
"MESSAGE": "message",
"PRIORITY": "priority",
"CODE_FILE": "code_file",
"CODE_LINE": "code_line",
"CODE_FUNC": "code_func",
"ERRNO": "errno",
"INVOCATION_ID": "invocation_id",
"USER_INVOCATION_ID": "user_invocation_id",
"SYSLOG_FACILITY": "syslog_facility",
"SYSLOG_IDENTIFIER": "syslog_identifier",
"SYSLOG_PID": "syslog_pid",
"SYSLOG_TIMESTAMP": "syslog_timestamp",
"TID": "tid",
"_PID": "pid",
"_UID": "uid",
"_GID": "gid",
"_COMM": "comm",
"_EXE": "exe",
"_CMDLINE": "cmdline",
// _CAP_EFFECTIVE
"_AUDIT_SESSION": "audit_session",
"_AUDIT_LOGINUID": "audit_loginuid",
"_SYSTEMD_CGROUP": "systemd_cgroup",
"_SYSTEMD_SLICE": "systemd_slice",
"_SYSTEMD_UNIT": "systemd_unit",
"_SYSTEMD_USER_UNIT": "systemd_user_unit",
"_SYSTEMD_USER_SLICE": "systemd_user_slice",
"_SYSTEMD_SESSION": "systemd_session",
"_SYSTEMD_OWNER_UID": "systemd_owner_uid",
"_SOURCE_REALTIME_TIMESTAMP": "source_realtime_timestamp",
"_BOOT_ID": "boot_id",
"_MACHINE_ID": "machine_id",
"_SYSTEMD_INVOCATION_ID": "systemd_invocation_id",
"_HOSTNAME": "hostname",
"_TRANSPORT": "transport",
"_STREAM_ID": "stream_id",
"_LINE_BREAK": "line_break",
"_NAMESPACE": "namespace",
"_KERNEL_DEVICE": "kernel_device",
"_KERNEL_SUBSYSTEM": "kernel_subsystem",
"_UDEV_SYSNAME": "udev_sysname",
"_UDEV_DEVNODE": "udev_devnode",
"_UDEV_DEVLINK": "udev_devlink",
"__CURSOR": "cursor",
"__REALTIME_TIMESTAMP": "realtime_timestamp",
"__MONOTONIC_TIMESTAMP": "monotonic_timestamp",
}
func doInsert(ctx context.Context, chAddr string, table string, rows []interface{}) error {
var reqPayload bytes.Buffer
reqEnc := json.NewEncoder(&reqPayload)
for _, r := range rows {
if err := reqEnc.Encode(r); err != nil {
return fmt.Errorf("marshalling row: %w", err)
}
}
req, err := http.NewRequestWithContext(ctx, "POST", chAddr, &reqPayload)
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
q := req.URL.Query()
q.Set("query", fmt.Sprintf("INSERT INTO %v FORMAT JSONEachRow", table))
req.URL.RawQuery = q.Encode()
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("do-ing request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
io.Copy(os.Stderr, resp.Body)
return fmt.Errorf("unexpected HTTP response %d", resp.StatusCode)
}
return nil
}
func insertIntoClickhouse(ctx context.Context, chAddr string, rows []interface{}) error {
mostRecent := map[string]string{}
for _, row := range rows {
row := row.(map[string]interface{})
hostname := row["scraped_hostname"].(string)
cursor := row["cursor"].(string)
mostRecent[hostname] = cursor
}
type mostRecentRow struct {
Hostname string `json:"hostname"`
LastCursor string `json:"last_cursor"`
}
var mostRecentRows []interface{}
for hostname, lastCursor := range mostRecent {
mostRecentRows = append(mostRecentRows, mostRecentRow{hostname, lastCursor})
}
if err := doInsert(ctx, chAddr, "logger.systemd", rows); err != nil {
return fmt.Errorf("inserting rows into logger.systemd: %w", err)
}
return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows)
}
func coerceToByteSlice(n []interface{}) ([]byte, bool) {
// Fast fail.
nInt, isInt := n[0].(int)
nFloat, isFloat := n[0].(float64)
_, isByte := n[0].(byte)
if !isInt && !isByte && !isFloat {
return nil, false
} else if isInt && (nInt >= 256 || nInt < 0) {
return nil, false
} else if isFloat && float64(byte(nFloat)) != nFloat {
return nil, false
}
out := make([]byte, len(n))
for idx, vInterface := range n {
vInt, okInt := vInterface.(int)
vByte, okByte := vInterface.(byte)
vFloat, okFloat := vInterface.(float64)
switch {
case okInt:
if vInt >= 256 || vInt < 0 {
return nil, false
}
out[idx] = byte(vInt)
case okByte:
out[idx] = vByte
case okFloat:
out[idx] = byte(vFloat)
if float64(out[idx]) != vFloat {
return nil, false
}
default:
return nil, false
}
}
return out, true
}
func parseLog(d map[string]interface{}) (map[string]interface{}, error) {
data := make(map[string]interface{})
extraData := make(map[string]interface{})
msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; __CURSOR: %v; MESSAGE: %v", d["_HOSTNAME"], d["__REALTIME_TIMESTAMP"], d["__CURSOR"], d["MESSAGE"])
for origField, newField := range fieldMap {
v, ok := d[origField]
if !ok {
continue
}
// Check if this is a slice...
if vs, ok := v.([]interface{}); ok {
// This might be a []byte, though.
vbytes, ok := coerceToByteSlice(vs)
if ok {
v = string(vbytes)
} else {
// extraData gets all values after the first one, we keep the first one.
log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField)
extraData[origField] = vs[1:]
v = vs[0]
}
}
if vs, ok := v.([]interface{}); ok {
// It's possible we had a [][]byte, so now coerce this again.
vbytes, ok := coerceToByteSlice(vs)
if ok {
v = string(vbytes)
}
}
transformer := fieldTransform[newField]
if transformer != nil {
var err error
ov := v
v, err = transformer(v)
if err != nil {
return nil, fmt.Errorf("[%v] transforming origField %v newField %v (value %v): %w", msgID, origField, newField, ov, err)
}
}
delete(d, origField)
data[newField] = v
}
// Anything left in d gets punted into extraData.
// Note that we might overwrite things: if we fail to transform a value that was originally in a slice, then we want to overwrite the value we originally put into extraData.
for fn, v := range d {
extraData[fn] = v
}
edJSON, err := json.Marshal(extraData)
if err != nil {
return nil, fmt.Errorf("[%v] encoding extraData as JSON: %w", msgID, err)
}
data["extra_data_json"] = string(edJSON)
return data, nil
}
func aggregator(ctx context.Context, flushEntries int, flushInterval time.Duration, chAddr string, inputCh <-chan interface{}) error {
log.Printf("aggregator starting, flushEntries=%d, flushInterval=%v", flushEntries, flushInterval)
defer log.Println("aggregator terminating")
t := time.NewTicker(flushInterval)
defer t.Stop()
var buf []interface{}
flushBuffer := func(ctx context.Context, reason string) error {
t.Reset(20 * time.Minute)
defer t.Reset(flushInterval)
if len(buf) == 0 {
// Fast return; nothing to do.
return nil
}
log.Printf("aggregator flushing buffer (%d entries) because %v", len(buf), reason)
if err := insertIntoClickhouse(ctx, chAddr, buf); err != nil {
log.Printf("aggregator failed to flush buffer: %v", err)
// For now, never return an error; we want to retry on either the next tick or the next log entry.
return nil
}
// Clear the buffer so we can start again.
buf = nil
return nil
}
aggLoop:
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
if err := flushBuffer(ctx, "interval"); err != nil {
return err
}
case d, ok := <-inputCh:
if !ok {
// Input closed, we're done here.
log.Printf("aggregator input closed, trying to shut down (%d events in buffer)", len(buf))
break aggLoop
}
buf = append(buf, d)
if len(buf) >= flushEntries {
if err := flushBuffer(ctx, "entries"); err != nil {
return err
}
}
}
}
for n := 0; n < 5; n++ {
if err := flushBuffer(ctx, "shutting down"); err == nil {
return nil
} else {
log.Printf("aggregator trying to shut down: flush failed: %v", err)
}
}
return flushBuffer(ctx, "shutting down - final attempt")
}
func mostRecentCursor(ctx context.Context, clickhouseAddr, hostname string) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", clickhouseAddr, nil)
if err != nil {
return "", fmt.Errorf("creating request: %w", err)
}
q := req.URL.Query()
q.Set("query", fmt.Sprintf("SELECT last_cursor FROM logger.systemd_scrape WHERE hostname='%v' ORDER BY last_scrape DESC LIMIT 1 FORMAT JSONEachRow", hostname))
req.URL.RawQuery = q.Encode()
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("do-ing request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected HTTP status %d", resp.StatusCode)
}
type lastCursor struct {
LastCursor string `json:"last_cursor"`
}
var lc lastCursor
err = json.NewDecoder(resp.Body).Decode(&lc)
switch {
case err == io.EOF:
return "", nil
case err != nil:
return "", err
default:
return lc.LastCursor, nil
}
}
func fetcher(ctx context.Context, hostname, remoteAddr string, retryMinDuration, retryMaxDuration time.Duration, clickhouseAddr string, aggCh chan<- interface{}) (err error) {
log.Printf("fetcher[%v] starting", hostname)
defer func() { log.Printf("fetcher[%v] terminating: %v", hostname, err) }()
var sleepDuration time.Duration
cursor, err := mostRecentCursor(ctx, clickhouseAddr, hostname)
if err != nil {
return fmt.Errorf("mostRecentCursor: %w", err)
}
// Continuously read JSON messages from the remote.
retryLoop:
for {
if ctx.Err() != nil {
return ctx.Err()
}
if sleepDuration > 0 {
log.Printf("fetcher[%v] waiting for %v before retry", hostname, sleepDuration)
time.Sleep(sleepDuration)
}
if ctx.Err() != nil {
return ctx.Err()
}
if sleepDuration == 0 {
sleepDuration = retryMinDuration
} else {
sleepDuration = 2 * sleepDuration
if sleepDuration > retryMaxDuration {
sleepDuration = retryMaxDuration
}
}
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%v/entries?follow", remoteAddr), nil)
if err != nil {
// ??? This seems un-recoverable.
return fmt.Errorf("creating HTTP request: %w", err)
}
if cursor != "" {
log.Printf("fetcher[%v] resuming from %v", hostname, cursor)
req.Header.Set("Range", fmt.Sprintf("entries=%v", cursor))
} else {
log.Printf("fetcher[%v] starting from scratch", hostname)
}
req.Header.Set("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("fetcher[%v] fetching log entries: %v", hostname, err)
continue retryLoop
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("fetcher[%v] unexpected status code %d", hostname, resp.StatusCode)
continue retryLoop
}
d := make(map[string]interface{})
j := json.NewDecoder(resp.Body)
for j.More() {
if err := j.Decode(&d); err != nil {
resp.Body.Close()
log.Printf("fetcher[%v] JSON decode: %v", hostname, err)
continue retryLoop
}
data, err := parseLog(d)
if err != nil {
resp.Body.Close()
log.Printf("fetcher[%v] parsing log: %v", hostname, err)
continue retryLoop
}
if cursor == data["cursor"].(string) {
// Skip over the last log entry we'd seen previously.
continue
}
sleepDuration = 0
data["scraped_hostname"] = hostname
cursor = data["cursor"].(string)
aggCh <- data
}
resp.Body.Close()
log.Printf("fetcher[%v] JSON decoder says there's no more content", hostname)
}
}
var (
flushBufferSizeFlag = flag.Int("flush_buffer_size", 10000, "Number of entries the buffer can reach before attempting to flush")
flushBufferIntervalFlag = flag.Duration("flush_buffer_duration", 500*time.Millisecond, "Interval between buffer flushes")
hostsFlag = flag.String("hosts", "porcorosso=localhost:19531", "Comma-separated <hostname>=<address> pairs to scrape logs from")
clickhouseAddrFlag = flag.String("clickhouse_addr", "http://localhost:8123", "Address of Clickhouse HTTP API")
fetcherRetryMin = flag.Duration("fetcher_retry_minimum", 20*time.Millisecond, "Minimum retry interval for fetching logs")
fetcherRetryMax = flag.Duration("fetcher_retry_maximum", 5*time.Minute, "Maximum retry interval for fetching logs (after backoff)")
)
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetchCtx, fetchCancel := signal.NotifyContext(ctx, os.Interrupt)
defer fetchCancel()
// Parse *hostsFlag.
fail := false
scrapeHosts := make(map[string]string)
for _, hostPair := range strings.Split(*hostsFlag, ",") {
// Ideally this would use strings.Cut, but I'm still on Go 1.16...
bits := strings.SplitN(hostPair, "=", 2)
if len(bits) != 2 {
log.Println("invalid host specification %q (expected <host>=<address>)", hostPair)
fail = true
}
scrapeHosts[bits[0]] = bits[1]
}
if fail {
os.Exit(1)
}
var wg sync.WaitGroup
var fetchWG sync.WaitGroup
aggCh := make(chan interface{})
go func() {
fetchWG.Wait()
close(aggCh)
}()
wg.Add(1)
go func() {
defer wg.Done()
aggregator(ctx, *flushBufferSizeFlag, *flushBufferIntervalFlag, *clickhouseAddrFlag, aggCh)
}()
for hostname, remoteAddr := range scrapeHosts {
hostname, remoteAddr := hostname, remoteAddr
wg.Add(1)
fetchWG.Add(1)
go func() {
defer wg.Done()
defer fetchWG.Done()
fetcher(fetchCtx, hostname, remoteAddr, *fetcherRetryMin, *fetcherRetryMax, *clickhouseAddrFlag, aggCh)
}()
}
<-fetchCtx.Done()
log.Println("context complete; waiting for fetcher goroutines to stop")
fetchWG.Wait()
log.Println("context complete; waiting for aggregator goroutine to stop")
wg.Wait()
log.Println("buffers flushed")
}