package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "io" "log" "net/http" "os" "os/signal" "strconv" "strings" "sync" "time" ) type transformFunc func(interface{}) (interface{}, error) func syslogDate(x interface{}) (interface{}, error) { dateStr, ok := x.(string) if !ok { return nil, fmt.Errorf("can't handle parsing type %T as string", x) } t, err := time.Parse(time.Stamp, strings.TrimSpace(dateStr)) if err != nil { return nil, fmt.Errorf("parsing syslog %q timestamp as date: %w", dateStr, err) } // The year won't be set, so try to reconstruct it... now := time.Now() tYear := now.Year() switch { case now.Month() == time.December && t.Month() == time.January: // If 'now' is December, but t is January, then t is next year. tYear++ case now.Month() == time.January && t.Month() == time.December: // Similarly, if 'now' is January but t is December, t is last year. tYear-- } // Set the year. We do this by reparsing the string to avoid... unexpected events around e.g. leap years. t, err = time.Parse(time.Stamp+" 2006", fmt.Sprintf("%s %d", strings.TrimSpace(dateStr), tYear)) if err != nil { return nil, fmt.Errorf("re-parsing syslog %q timestamp as date: %w", dateStr, err) } return t.Unix(), nil } func transformToInt(x interface{}) (interface{}, error) { switch x := x.(type) { case string: return strconv.ParseInt(x, 10, 64) case int: return x, nil } return nil, fmt.Errorf("can't handle converting type %T to int", x) } var fieldTransform = map[string]transformFunc{ "priority": transformToInt, "errno": transformToInt, "syslog_facility": transformToInt, "syslog_pid": transformToInt, "tid": transformToInt, "pid": transformToInt, "uid": transformToInt, "gid": transformToInt, "audit_session": transformToInt, "audit_loginuid": transformToInt, "systemd_owner_uid": transformToInt, "monotonic_timestamp": transformToInt, "syslog_timestamp": syslogDate, "source_realtime_timestamp": transformToInt, // microseconds "realtime_timestamp": transformToInt, // microseconds } var fieldMap = map[string]string{ "MESSAGE": "message", "PRIORITY": "priority", "CODE_FILE": "code_file", "CODE_LINE": "code_line", "CODE_FUNC": "code_func", "ERRNO": "errno", "INVOCATION_ID": "invocation_id", "USER_INVOCATION_ID": "user_invocation_id", "SYSLOG_FACILITY": "syslog_facility", "SYSLOG_IDENTIFIER": "syslog_identifier", "SYSLOG_PID": "syslog_pid", "SYSLOG_TIMESTAMP": "syslog_timestamp", "TID": "tid", "_PID": "pid", "_UID": "uid", "_GID": "gid", "_COMM": "comm", "_EXE": "exe", "_CMDLINE": "cmdline", // _CAP_EFFECTIVE "_AUDIT_SESSION": "audit_session", "_AUDIT_LOGINUID": "audit_loginuid", "_SYSTEMD_CGROUP": "systemd_cgroup", "_SYSTEMD_SLICE": "systemd_slice", "_SYSTEMD_UNIT": "systemd_unit", "_SYSTEMD_USER_UNIT": "systemd_user_unit", "_SYSTEMD_USER_SLICE": "systemd_user_slice", "_SYSTEMD_SESSION": "systemd_session", "_SYSTEMD_OWNER_UID": "systemd_owner_uid", "_SOURCE_REALTIME_TIMESTAMP": "source_realtime_timestamp", "_BOOT_ID": "boot_id", "_MACHINE_ID": "machine_id", "_SYSTEMD_INVOCATION_ID": "systemd_invocation_id", "_HOSTNAME": "hostname", "_TRANSPORT": "transport", "_STREAM_ID": "stream_id", "_LINE_BREAK": "line_break", "_NAMESPACE": "namespace", "_KERNEL_DEVICE": "kernel_device", "_KERNEL_SUBSYSTEM": "kernel_subsystem", "_UDEV_SYSNAME": "udev_sysname", "_UDEV_DEVNODE": "udev_devnode", "_UDEV_DEVLINK": "udev_devlink", "__CURSOR": "cursor", "__REALTIME_TIMESTAMP": "realtime_timestamp", "__MONOTONIC_TIMESTAMP": "monotonic_timestamp", } func doInsert(ctx context.Context, chAddr string, table string, rows []interface{}) error { var reqPayload bytes.Buffer reqEnc := json.NewEncoder(&reqPayload) for _, r := range rows { if err := reqEnc.Encode(r); err != nil { return fmt.Errorf("marshalling row: %w", err) } } req, err := http.NewRequestWithContext(ctx, "POST", chAddr, &reqPayload) if err != nil { return fmt.Errorf("creating request: %w", err) } q := req.URL.Query() q.Set("query", fmt.Sprintf("INSERT INTO %v FORMAT JSONEachRow", table)) req.URL.RawQuery = q.Encode() resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("do-ing request: %w", err) } defer resp.Body.Close() if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { io.Copy(os.Stderr, resp.Body) return fmt.Errorf("unexpected HTTP response %d", resp.StatusCode) } return nil } func insertIntoClickhouse(ctx context.Context, chAddr string, rows []interface{}) error { mostRecent := map[string]string{} for _, row := range rows { row := row.(map[string]interface{}) hostname := row["scraped_hostname"].(string) cursor := row["cursor"].(string) mostRecent[hostname] = cursor } type mostRecentRow struct { Hostname string `json:"hostname"` LastCursor string `json:"last_cursor"` } var mostRecentRows []interface{} for hostname, lastCursor := range mostRecent { mostRecentRows = append(mostRecentRows, mostRecentRow{hostname, lastCursor}) } if err := doInsert(ctx, chAddr, "logger.systemd", rows); err != nil { return fmt.Errorf("inserting rows into logger.systemd: %w", err) } return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows) } func coerceToByteSlice(n []interface{}) ([]byte, bool) { // Fast fail. nInt, isInt := n[0].(int) nFloat, isFloat := n[0].(float64) _, isByte := n[0].(byte) if !isInt && !isByte && !isFloat { return nil, false } else if isInt && (nInt >= 256 || nInt < 0) { return nil, false } else if isFloat && float64(byte(nFloat)) != nFloat { return nil, false } out := make([]byte, len(n)) for idx, vInterface := range n { vInt, okInt := vInterface.(int) vByte, okByte := vInterface.(byte) vFloat, okFloat := vInterface.(float64) switch { case okInt: if vInt >= 256 || vInt < 0 { return nil, false } out[idx] = byte(vInt) case okByte: out[idx] = vByte case okFloat: out[idx] = byte(vFloat) if float64(out[idx]) != vFloat { return nil, false } default: return nil, false } } return out, true } func parseLog(d map[string]interface{}) (map[string]interface{}, error) { data := make(map[string]interface{}) extraData := make(map[string]interface{}) msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; __CURSOR: %v; MESSAGE: %v", d["_HOSTNAME"], d["__REALTIME_TIMESTAMP"], d["__CURSOR"], d["MESSAGE"]) for origField, newField := range fieldMap { v, ok := d[origField] if !ok { continue } // Check if this is a slice... if vs, ok := v.([]interface{}); ok { // This might be a []byte, though. vbytes, ok := coerceToByteSlice(vs) if ok { v = string(vbytes) } else { // extraData gets all values after the first one, we keep the first one. log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField) extraData[origField] = vs[1:] v = vs[0] } } if vs, ok := v.([]interface{}); ok { // It's possible we had a [][]byte, so now coerce this again. vbytes, ok := coerceToByteSlice(vs) if ok { v = string(vbytes) } } transformer := fieldTransform[newField] if transformer != nil { var err error ov := v v, err = transformer(v) if err != nil { return nil, fmt.Errorf("[%v] transforming origField %v newField %v (value %v): %w", msgID, origField, newField, ov, err) } } delete(d, origField) data[newField] = v } // Anything left in d gets punted into extraData. // Note that we might overwrite things: if we fail to transform a value that was originally in a slice, then we want to overwrite the value we originally put into extraData. for fn, v := range d { extraData[fn] = v } edJSON, err := json.Marshal(extraData) if err != nil { return nil, fmt.Errorf("[%v] encoding extraData as JSON: %w", msgID, err) } data["extra_data_json"] = string(edJSON) return data, nil } func aggregator(ctx context.Context, flushEntries int, flushInterval time.Duration, chAddr string, inputCh <-chan interface{}) error { log.Printf("aggregator starting, flushEntries=%d, flushInterval=%v", flushEntries, flushInterval) defer log.Println("aggregator terminating") t := time.NewTicker(flushInterval) defer t.Stop() var buf []interface{} flushBuffer := func(ctx context.Context, reason string) error { t.Reset(20 * time.Minute) defer t.Reset(flushInterval) if len(buf) == 0 { // Fast return; nothing to do. return nil } log.Printf("aggregator flushing buffer (%d entries) because %v", len(buf), reason) if err := insertIntoClickhouse(ctx, chAddr, buf); err != nil { log.Printf("aggregator failed to flush buffer: %v", err) // For now, never return an error; we want to retry on either the next tick or the next log entry. return nil } // Clear the buffer so we can start again. buf = nil return nil } aggLoop: for { select { case <-ctx.Done(): return ctx.Err() case <-t.C: if err := flushBuffer(ctx, "interval"); err != nil { return err } case d, ok := <-inputCh: if !ok { // Input closed, we're done here. log.Printf("aggregator input closed, trying to shut down (%d events in buffer)", len(buf)) break aggLoop } buf = append(buf, d) if len(buf) >= flushEntries { if err := flushBuffer(ctx, "entries"); err != nil { return err } } } } for n := 0; n < 5; n++ { if err := flushBuffer(ctx, "shutting down"); err == nil { return nil } else { log.Printf("aggregator trying to shut down: flush failed: %v", err) } } return flushBuffer(ctx, "shutting down - final attempt") } func mostRecentCursor(ctx context.Context, clickhouseAddr, hostname string) (string, error) { req, err := http.NewRequestWithContext(ctx, "GET", clickhouseAddr, nil) if err != nil { return "", fmt.Errorf("creating request: %w", err) } q := req.URL.Query() q.Set("query", fmt.Sprintf("SELECT last_cursor FROM logger.systemd_scrape WHERE hostname='%v' ORDER BY last_scrape DESC LIMIT 1 FORMAT JSONEachRow", hostname)) req.URL.RawQuery = q.Encode() resp, err := http.DefaultClient.Do(req) if err != nil { return "", fmt.Errorf("do-ing request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("unexpected HTTP status %d", resp.StatusCode) } type lastCursor struct { LastCursor string `json:"last_cursor"` } var lc lastCursor err = json.NewDecoder(resp.Body).Decode(&lc) switch { case err == io.EOF: return "", nil case err != nil: return "", err default: return lc.LastCursor, nil } } func fetcher(ctx context.Context, hostname, remoteAddr string, retryMinDuration, retryMaxDuration time.Duration, clickhouseAddr string, aggCh chan<- interface{}) (err error) { log.Printf("fetcher[%v] starting", hostname) defer func() { log.Printf("fetcher[%v] terminating: %v", hostname, err) }() var sleepDuration time.Duration cursor, err := mostRecentCursor(ctx, clickhouseAddr, hostname) if err != nil { return fmt.Errorf("mostRecentCursor: %w", err) } // Continuously read JSON messages from the remote. retryLoop: for { if ctx.Err() != nil { return ctx.Err() } if sleepDuration > 0 { log.Printf("fetcher[%v] waiting for %v before retry", hostname, sleepDuration) time.Sleep(sleepDuration) } if ctx.Err() != nil { return ctx.Err() } if sleepDuration == 0 { sleepDuration = retryMinDuration } else { sleepDuration = 2 * sleepDuration if sleepDuration > retryMaxDuration { sleepDuration = retryMaxDuration } } req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%v/entries?follow", remoteAddr), nil) if err != nil { // ??? This seems un-recoverable. return fmt.Errorf("creating HTTP request: %w", err) } if cursor != "" { log.Printf("fetcher[%v] resuming from %v", hostname, cursor) req.Header.Set("Range", fmt.Sprintf("entries=%v", cursor)) } else { log.Printf("fetcher[%v] starting from scratch", hostname) } req.Header.Set("Accept", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { log.Printf("fetcher[%v] fetching log entries: %v", hostname, err) continue retryLoop } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Printf("fetcher[%v] unexpected status code %d", hostname, resp.StatusCode) continue retryLoop } d := make(map[string]interface{}) j := json.NewDecoder(resp.Body) for j.More() { if err := j.Decode(&d); err != nil { resp.Body.Close() log.Printf("fetcher[%v] JSON decode: %v", hostname, err) continue retryLoop } data, err := parseLog(d) if err != nil { resp.Body.Close() log.Printf("fetcher[%v] parsing log: %v", hostname, err) continue retryLoop } if cursor == data["cursor"].(string) { // Skip over the last log entry we'd seen previously. continue } sleepDuration = 0 data["scraped_hostname"] = hostname cursor = data["cursor"].(string) aggCh <- data } resp.Body.Close() log.Printf("fetcher[%v] JSON decoder says there's no more content", hostname) } } var ( flushBufferSizeFlag = flag.Int("flush_buffer_size", 10000, "Number of entries the buffer can reach before attempting to flush") flushBufferIntervalFlag = flag.Duration("flush_buffer_duration", 500*time.Millisecond, "Interval between buffer flushes") hostsFlag = flag.String("hosts", "porcorosso=localhost:19531", "Comma-separated =
pairs to scrape logs from") clickhouseAddrFlag = flag.String("clickhouse_addr", "http://localhost:8123", "Address of Clickhouse HTTP API") fetcherRetryMin = flag.Duration("fetcher_retry_minimum", 20*time.Millisecond, "Minimum retry interval for fetching logs") fetcherRetryMax = flag.Duration("fetcher_retry_maximum", 5*time.Minute, "Maximum retry interval for fetching logs (after backoff)") ) func main() { flag.Parse() ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetchCtx, fetchCancel := signal.NotifyContext(ctx, os.Interrupt) defer fetchCancel() // Parse *hostsFlag. fail := false scrapeHosts := make(map[string]string) for _, hostPair := range strings.Split(*hostsFlag, ",") { // Ideally this would use strings.Cut, but I'm still on Go 1.16... bits := strings.SplitN(hostPair, "=", 2) if len(bits) != 2 { log.Println("invalid host specification %q (expected =
)", hostPair) fail = true } scrapeHosts[bits[0]] = bits[1] } if fail { os.Exit(1) } var wg sync.WaitGroup var fetchWG sync.WaitGroup aggCh := make(chan interface{}) go func() { fetchWG.Wait() close(aggCh) }() wg.Add(1) go func() { defer wg.Done() aggregator(ctx, *flushBufferSizeFlag, *flushBufferIntervalFlag, *clickhouseAddrFlag, aggCh) }() for hostname, remoteAddr := range scrapeHosts { hostname, remoteAddr := hostname, remoteAddr wg.Add(1) fetchWG.Add(1) go func() { defer wg.Done() defer fetchWG.Done() fetcher(fetchCtx, hostname, remoteAddr, *fetcherRetryMin, *fetcherRetryMax, *clickhouseAddrFlag, aggCh) }() } <-fetchCtx.Done() log.Println("context complete; waiting for fetcher goroutines to stop") fetchWG.Wait() log.Println("context complete; waiting for aggregator goroutine to stop") wg.Wait() log.Println("buffers flushed") }