Luke Granger-Brown
086f5fe597
There's binary data sometimes, but on the whole I don't care about preserving it properly (sorry), so let's just coerce it to a string if it is supposed to go into a "proper" field.
540 lines
16 KiB
Go
540 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type transformFunc func(interface{}) (interface{}, error)
|
|
|
|
func syslogDate(x interface{}) (interface{}, error) {
|
|
dateStr, ok := x.(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("can't handle parsing type %T as string", x)
|
|
}
|
|
|
|
t, err := time.Parse(time.Stamp, strings.TrimSpace(dateStr))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parsing syslog %q timestamp as date: %w", dateStr, err)
|
|
}
|
|
|
|
// The year won't be set, so try to reconstruct it...
|
|
now := time.Now()
|
|
tYear := now.Year()
|
|
switch {
|
|
case now.Month() == time.December && t.Month() == time.January:
|
|
// If 'now' is December, but t is January, then t is next year.
|
|
tYear++
|
|
case now.Month() == time.January && t.Month() == time.December:
|
|
// Similarly, if 'now' is January but t is December, t is last year.
|
|
tYear--
|
|
}
|
|
|
|
// Set the year. We do this by reparsing the string to avoid... unexpected events around e.g. leap years.
|
|
t, err = time.Parse(time.Stamp+" 2006", fmt.Sprintf("%s %d", strings.TrimSpace(dateStr), tYear))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("re-parsing syslog %q timestamp as date: %w", dateStr, err)
|
|
}
|
|
|
|
return t.Unix(), nil
|
|
}
|
|
|
|
func transformToInt(x interface{}) (interface{}, error) {
|
|
switch x := x.(type) {
|
|
case string:
|
|
return strconv.ParseInt(x, 10, 64)
|
|
case int:
|
|
return x, nil
|
|
}
|
|
return nil, fmt.Errorf("can't handle converting type %T to int", x)
|
|
}
|
|
|
|
var fieldTransform = map[string]transformFunc{
|
|
"priority": transformToInt,
|
|
"errno": transformToInt,
|
|
"syslog_facility": transformToInt,
|
|
"syslog_pid": transformToInt,
|
|
"tid": transformToInt,
|
|
"pid": transformToInt,
|
|
"uid": transformToInt,
|
|
"gid": transformToInt,
|
|
"audit_session": transformToInt,
|
|
"audit_loginuid": transformToInt,
|
|
"systemd_owner_uid": transformToInt,
|
|
"monotonic_timestamp": transformToInt,
|
|
|
|
"syslog_timestamp": syslogDate,
|
|
"source_realtime_timestamp": transformToInt, // microseconds
|
|
"realtime_timestamp": transformToInt, // microseconds
|
|
}
|
|
|
|
var fieldMap = map[string]string{
|
|
"MESSAGE": "message",
|
|
"PRIORITY": "priority",
|
|
"CODE_FILE": "code_file",
|
|
"CODE_LINE": "code_line",
|
|
"CODE_FUNC": "code_func",
|
|
"ERRNO": "errno",
|
|
"INVOCATION_ID": "invocation_id",
|
|
"USER_INVOCATION_ID": "user_invocation_id",
|
|
"SYSLOG_FACILITY": "syslog_facility",
|
|
"SYSLOG_IDENTIFIER": "syslog_identifier",
|
|
"SYSLOG_PID": "syslog_pid",
|
|
"SYSLOG_TIMESTAMP": "syslog_timestamp",
|
|
"TID": "tid",
|
|
|
|
"_PID": "pid",
|
|
"_UID": "uid",
|
|
"_GID": "gid",
|
|
"_COMM": "comm",
|
|
"_EXE": "exe",
|
|
"_CMDLINE": "cmdline",
|
|
// _CAP_EFFECTIVE
|
|
"_AUDIT_SESSION": "audit_session",
|
|
"_AUDIT_LOGINUID": "audit_loginuid",
|
|
"_SYSTEMD_CGROUP": "systemd_cgroup",
|
|
"_SYSTEMD_SLICE": "systemd_slice",
|
|
"_SYSTEMD_UNIT": "systemd_unit",
|
|
"_SYSTEMD_USER_UNIT": "systemd_user_unit",
|
|
"_SYSTEMD_USER_SLICE": "systemd_user_slice",
|
|
"_SYSTEMD_SESSION": "systemd_session",
|
|
"_SYSTEMD_OWNER_UID": "systemd_owner_uid",
|
|
"_SOURCE_REALTIME_TIMESTAMP": "source_realtime_timestamp",
|
|
"_BOOT_ID": "boot_id",
|
|
"_MACHINE_ID": "machine_id",
|
|
"_SYSTEMD_INVOCATION_ID": "systemd_invocation_id",
|
|
"_HOSTNAME": "hostname",
|
|
"_TRANSPORT": "transport",
|
|
"_STREAM_ID": "stream_id",
|
|
"_LINE_BREAK": "line_break",
|
|
"_NAMESPACE": "namespace",
|
|
|
|
"_KERNEL_DEVICE": "kernel_device",
|
|
"_KERNEL_SUBSYSTEM": "kernel_subsystem",
|
|
"_UDEV_SYSNAME": "udev_sysname",
|
|
"_UDEV_DEVNODE": "udev_devnode",
|
|
"_UDEV_DEVLINK": "udev_devlink",
|
|
|
|
"__CURSOR": "cursor",
|
|
"__REALTIME_TIMESTAMP": "realtime_timestamp",
|
|
"__MONOTONIC_TIMESTAMP": "monotonic_timestamp",
|
|
}
|
|
|
|
func doInsert(ctx context.Context, chAddr string, table string, rows []interface{}) error {
|
|
var reqPayload bytes.Buffer
|
|
reqEnc := json.NewEncoder(&reqPayload)
|
|
for _, r := range rows {
|
|
if err := reqEnc.Encode(r); err != nil {
|
|
return fmt.Errorf("marshalling row: %w", err)
|
|
}
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", chAddr, &reqPayload)
|
|
if err != nil {
|
|
return fmt.Errorf("creating request: %w", err)
|
|
}
|
|
q := req.URL.Query()
|
|
q.Set("query", fmt.Sprintf("INSERT INTO %v FORMAT JSONEachRow", table))
|
|
req.URL.RawQuery = q.Encode()
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("do-ing request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
|
|
io.Copy(os.Stderr, resp.Body)
|
|
return fmt.Errorf("unexpected HTTP response %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func insertIntoClickhouse(ctx context.Context, chAddr string, rows []interface{}) error {
|
|
mostRecent := map[string]string{}
|
|
for _, row := range rows {
|
|
row := row.(map[string]interface{})
|
|
hostname := row["scraped_hostname"].(string)
|
|
cursor := row["cursor"].(string)
|
|
mostRecent[hostname] = cursor
|
|
}
|
|
type mostRecentRow struct {
|
|
Hostname string `json:"hostname"`
|
|
LastCursor string `json:"last_cursor"`
|
|
}
|
|
var mostRecentRows []interface{}
|
|
for hostname, lastCursor := range mostRecent {
|
|
mostRecentRows = append(mostRecentRows, mostRecentRow{hostname, lastCursor})
|
|
}
|
|
|
|
if err := doInsert(ctx, chAddr, "logger.systemd", rows); err != nil {
|
|
return fmt.Errorf("inserting rows into logger.systemd: %w", err)
|
|
}
|
|
return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows)
|
|
}
|
|
|
|
func coerceToByteSlice(n []interface{}) ([]byte, bool) {
|
|
// Fast fail.
|
|
nInt, isInt := n[0].(int)
|
|
nFloat, isFloat := n[0].(float64)
|
|
_, isByte := n[0].(byte)
|
|
if !isInt && !isByte && !isFloat {
|
|
return nil, false
|
|
} else if isInt && (nInt >= 256 || nInt < 0) {
|
|
return nil, false
|
|
} else if isFloat && float64(byte(nFloat)) != nFloat {
|
|
return nil, false
|
|
}
|
|
|
|
out := make([]byte, len(n))
|
|
for idx, vInterface := range n {
|
|
vInt, okInt := vInterface.(int)
|
|
vByte, okByte := vInterface.(byte)
|
|
vFloat, okFloat := vInterface.(float64)
|
|
switch {
|
|
case okInt:
|
|
if vInt >= 256 || vInt < 0 {
|
|
return nil, false
|
|
}
|
|
out[idx] = byte(vInt)
|
|
case okByte:
|
|
out[idx] = vByte
|
|
case okFloat:
|
|
out[idx] = byte(vFloat)
|
|
if float64(out[idx]) != vFloat {
|
|
return nil, false
|
|
}
|
|
default:
|
|
return nil, false
|
|
}
|
|
}
|
|
return out, true
|
|
}
|
|
|
|
func parseLog(d map[string]interface{}) (map[string]interface{}, error) {
|
|
data := make(map[string]interface{})
|
|
extraData := make(map[string]interface{})
|
|
|
|
msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; __CURSOR: %v; MESSAGE: %v", d["_HOSTNAME"], d["__REALTIME_TIMESTAMP"], d["__CURSOR"], d["MESSAGE"])
|
|
|
|
for origField, newField := range fieldMap {
|
|
v, ok := d[origField]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Check if this is a slice...
|
|
if vs, ok := v.([]interface{}); ok {
|
|
// This might be a []byte, though.
|
|
vbytes, ok := coerceToByteSlice(vs)
|
|
if ok {
|
|
v = string(vbytes)
|
|
} else {
|
|
// extraData gets all values after the first one, we keep the first one.
|
|
log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField)
|
|
extraData[origField] = vs[1:]
|
|
v = vs[0]
|
|
}
|
|
}
|
|
if vs, ok := v.([]interface{}); ok {
|
|
// It's possible we had a [][]byte, so now coerce this again.
|
|
vbytes, ok := coerceToByteSlice(vs)
|
|
if ok {
|
|
v = string(vbytes)
|
|
}
|
|
}
|
|
|
|
transformer := fieldTransform[newField]
|
|
if transformer != nil {
|
|
var err error
|
|
ov := v
|
|
v, err = transformer(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("[%v] transforming origField %v newField %v (value %v): %w", msgID, origField, newField, ov, err)
|
|
}
|
|
}
|
|
|
|
delete(d, origField)
|
|
data[newField] = v
|
|
}
|
|
|
|
// Anything left in d gets punted into extraData.
|
|
// Note that we might overwrite things: if we fail to transform a value that was originally in a slice, then we want to overwrite the value we originally put into extraData.
|
|
for fn, v := range d {
|
|
extraData[fn] = v
|
|
}
|
|
|
|
edJSON, err := json.Marshal(extraData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("[%v] encoding extraData as JSON: %w", msgID, err)
|
|
}
|
|
data["extra_data_json"] = string(edJSON)
|
|
return data, nil
|
|
}
|
|
|
|
func aggregator(ctx context.Context, flushEntries int, flushInterval time.Duration, chAddr string, inputCh <-chan interface{}) error {
|
|
log.Printf("aggregator starting, flushEntries=%d, flushInterval=%v", flushEntries, flushInterval)
|
|
defer log.Println("aggregator terminating")
|
|
|
|
t := time.NewTicker(flushInterval)
|
|
defer t.Stop()
|
|
|
|
var buf []interface{}
|
|
flushBuffer := func(ctx context.Context, reason string) error {
|
|
t.Reset(20 * time.Minute)
|
|
defer t.Reset(flushInterval)
|
|
|
|
if len(buf) == 0 {
|
|
// Fast return; nothing to do.
|
|
return nil
|
|
}
|
|
|
|
log.Printf("aggregator flushing buffer (%d entries) because %v", len(buf), reason)
|
|
|
|
if err := insertIntoClickhouse(ctx, chAddr, buf); err != nil {
|
|
log.Printf("aggregator failed to flush buffer: %v", err)
|
|
// For now, never return an error; we want to retry on either the next tick or the next log entry.
|
|
return nil
|
|
}
|
|
|
|
// Clear the buffer so we can start again.
|
|
buf = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
aggLoop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-t.C:
|
|
if err := flushBuffer(ctx, "interval"); err != nil {
|
|
return err
|
|
}
|
|
case d, ok := <-inputCh:
|
|
if !ok {
|
|
// Input closed, we're done here.
|
|
log.Printf("aggregator input closed, trying to shut down (%d events in buffer)", len(buf))
|
|
break aggLoop
|
|
}
|
|
|
|
buf = append(buf, d)
|
|
if len(buf) >= flushEntries {
|
|
if err := flushBuffer(ctx, "entries"); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for n := 0; n < 5; n++ {
|
|
if err := flushBuffer(ctx, "shutting down"); err == nil {
|
|
return nil
|
|
} else {
|
|
log.Printf("aggregator trying to shut down: flush failed: %v", err)
|
|
}
|
|
}
|
|
return flushBuffer(ctx, "shutting down - final attempt")
|
|
}
|
|
|
|
func mostRecentCursor(ctx context.Context, clickhouseAddr, hostname string) (string, error) {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", clickhouseAddr, nil)
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating request: %w", err)
|
|
}
|
|
q := req.URL.Query()
|
|
q.Set("query", fmt.Sprintf("SELECT last_cursor FROM logger.systemd_scrape WHERE hostname='%v' ORDER BY last_scrape DESC LIMIT 1 FORMAT JSONEachRow", hostname))
|
|
req.URL.RawQuery = q.Encode()
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return "", fmt.Errorf("do-ing request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return "", fmt.Errorf("unexpected HTTP status %d", resp.StatusCode)
|
|
}
|
|
|
|
type lastCursor struct {
|
|
LastCursor string `json:"last_cursor"`
|
|
}
|
|
var lc lastCursor
|
|
err = json.NewDecoder(resp.Body).Decode(&lc)
|
|
switch {
|
|
case err == io.EOF:
|
|
return "", nil
|
|
case err != nil:
|
|
return "", err
|
|
default:
|
|
return lc.LastCursor, nil
|
|
}
|
|
}
|
|
|
|
func fetcher(ctx context.Context, hostname, remoteAddr string, retryMinDuration, retryMaxDuration time.Duration, clickhouseAddr string, aggCh chan<- interface{}) (err error) {
|
|
log.Printf("fetcher[%v] starting", hostname)
|
|
defer func() { log.Printf("fetcher[%v] terminating: %v", hostname, err) }()
|
|
|
|
var sleepDuration time.Duration
|
|
|
|
cursor, err := mostRecentCursor(ctx, clickhouseAddr, hostname)
|
|
if err != nil {
|
|
return fmt.Errorf("mostRecentCursor: %w", err)
|
|
}
|
|
|
|
// Continuously read JSON messages from the remote.
|
|
retryLoop:
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if sleepDuration > 0 {
|
|
log.Printf("fetcher[%v] waiting for %v before retry", hostname, sleepDuration)
|
|
time.Sleep(sleepDuration)
|
|
}
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if sleepDuration == 0 {
|
|
sleepDuration = retryMinDuration
|
|
} else {
|
|
sleepDuration = 2 * sleepDuration
|
|
if sleepDuration > retryMaxDuration {
|
|
sleepDuration = retryMaxDuration
|
|
}
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%v/entries?follow", remoteAddr), nil)
|
|
if err != nil {
|
|
// ??? This seems un-recoverable.
|
|
return fmt.Errorf("creating HTTP request: %w", err)
|
|
}
|
|
if cursor != "" {
|
|
log.Printf("fetcher[%v] resuming from %v", hostname, cursor)
|
|
req.Header.Set("Range", fmt.Sprintf("entries=%v", cursor))
|
|
} else {
|
|
log.Printf("fetcher[%v] starting from scratch", hostname)
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
log.Printf("fetcher[%v] fetching log entries: %v", hostname, err)
|
|
continue retryLoop
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
log.Printf("fetcher[%v] unexpected status code %d", hostname, resp.StatusCode)
|
|
continue retryLoop
|
|
}
|
|
|
|
d := make(map[string]interface{})
|
|
j := json.NewDecoder(resp.Body)
|
|
for j.More() {
|
|
if err := j.Decode(&d); err != nil {
|
|
resp.Body.Close()
|
|
log.Printf("fetcher[%v] JSON decode: %v", hostname, err)
|
|
continue retryLoop
|
|
}
|
|
|
|
data, err := parseLog(d)
|
|
if err != nil {
|
|
resp.Body.Close()
|
|
log.Printf("fetcher[%v] parsing log: %v", hostname, err)
|
|
continue retryLoop
|
|
}
|
|
|
|
if cursor == data["cursor"].(string) {
|
|
// Skip over the last log entry we'd seen previously.
|
|
continue
|
|
}
|
|
|
|
sleepDuration = 0
|
|
data["scraped_hostname"] = hostname
|
|
cursor = data["cursor"].(string)
|
|
aggCh <- data
|
|
}
|
|
resp.Body.Close()
|
|
log.Printf("fetcher[%v] JSON decoder says there's no more content", hostname)
|
|
}
|
|
}
|
|
|
|
var (
|
|
flushBufferSizeFlag = flag.Int("flush_buffer_size", 10000, "Number of entries the buffer can reach before attempting to flush")
|
|
flushBufferIntervalFlag = flag.Duration("flush_buffer_duration", 500*time.Millisecond, "Interval between buffer flushes")
|
|
hostsFlag = flag.String("hosts", "porcorosso=localhost:19531", "Comma-separated <hostname>=<address> pairs to scrape logs from")
|
|
clickhouseAddrFlag = flag.String("clickhouse_addr", "http://localhost:8123", "Address of Clickhouse HTTP API")
|
|
fetcherRetryMin = flag.Duration("fetcher_retry_minimum", 20*time.Millisecond, "Minimum retry interval for fetching logs")
|
|
fetcherRetryMax = flag.Duration("fetcher_retry_maximum", 5*time.Minute, "Maximum retry interval for fetching logs (after backoff)")
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
fetchCtx, fetchCancel := signal.NotifyContext(ctx, os.Interrupt)
|
|
defer fetchCancel()
|
|
|
|
// Parse *hostsFlag.
|
|
fail := false
|
|
scrapeHosts := make(map[string]string)
|
|
for _, hostPair := range strings.Split(*hostsFlag, ",") {
|
|
// Ideally this would use strings.Cut, but I'm still on Go 1.16...
|
|
bits := strings.SplitN(hostPair, "=", 2)
|
|
if len(bits) != 2 {
|
|
log.Println("invalid host specification %q (expected <host>=<address>)", hostPair)
|
|
fail = true
|
|
}
|
|
scrapeHosts[bits[0]] = bits[1]
|
|
}
|
|
if fail {
|
|
os.Exit(1)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
var fetchWG sync.WaitGroup
|
|
aggCh := make(chan interface{})
|
|
go func() {
|
|
fetchWG.Wait()
|
|
close(aggCh)
|
|
}()
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
aggregator(ctx, *flushBufferSizeFlag, *flushBufferIntervalFlag, *clickhouseAddrFlag, aggCh)
|
|
}()
|
|
|
|
for hostname, remoteAddr := range scrapeHosts {
|
|
hostname, remoteAddr := hostname, remoteAddr
|
|
wg.Add(1)
|
|
fetchWG.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
defer fetchWG.Done()
|
|
fetcher(fetchCtx, hostname, remoteAddr, *fetcherRetryMin, *fetcherRetryMax, *clickhouseAddrFlag, aggCh)
|
|
}()
|
|
}
|
|
|
|
<-fetchCtx.Done()
|
|
log.Println("context complete; waiting for fetcher goroutines to stop")
|
|
fetchWG.Wait()
|
|
log.Println("context complete; waiting for aggregator goroutine to stop")
|
|
wg.Wait()
|
|
log.Println("buffers flushed")
|
|
}
|