2022-01-01 15:08:52 +00:00
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
)
type transformFunc func ( interface { } ) ( interface { } , error )
func syslogDate ( x interface { } ) ( interface { } , error ) {
dateStr , ok := x . ( string )
if ! ok {
return nil , fmt . Errorf ( "can't handle parsing type %T as string" , x )
}
t , err := time . Parse ( time . Stamp , strings . TrimSpace ( dateStr ) )
if err != nil {
return nil , fmt . Errorf ( "parsing syslog %q timestamp as date: %w" , dateStr , err )
}
// The year won't be set, so try to reconstruct it...
now := time . Now ( )
tYear := now . Year ( )
switch {
case now . Month ( ) == time . December && t . Month ( ) == time . January :
// If 'now' is December, but t is January, then t is next year.
tYear ++
case now . Month ( ) == time . January && t . Month ( ) == time . December :
// Similarly, if 'now' is January but t is December, t is last year.
tYear --
}
// Set the year. We do this by reparsing the string to avoid... unexpected events around e.g. leap years.
t , err = time . Parse ( time . Stamp + " 2006" , fmt . Sprintf ( "%s %d" , strings . TrimSpace ( dateStr ) , tYear ) )
if err != nil {
return nil , fmt . Errorf ( "re-parsing syslog %q timestamp as date: %w" , dateStr , err )
}
return t . Unix ( ) , nil
}
func transformToInt ( x interface { } ) ( interface { } , error ) {
switch x := x . ( type ) {
case string :
return strconv . ParseInt ( x , 10 , 64 )
case int :
return x , nil
}
return nil , fmt . Errorf ( "can't handle converting type %T to int" , x )
}
var fieldTransform = map [ string ] transformFunc {
"priority" : transformToInt ,
"errno" : transformToInt ,
"syslog_facility" : transformToInt ,
"syslog_pid" : transformToInt ,
"tid" : transformToInt ,
"pid" : transformToInt ,
"uid" : transformToInt ,
"gid" : transformToInt ,
"audit_session" : transformToInt ,
"audit_loginuid" : transformToInt ,
"systemd_owner_uid" : transformToInt ,
"monotonic_timestamp" : transformToInt ,
"syslog_timestamp" : syslogDate ,
"source_realtime_timestamp" : transformToInt , // microseconds
"realtime_timestamp" : transformToInt , // microseconds
}
var fieldMap = map [ string ] string {
"MESSAGE" : "message" ,
"PRIORITY" : "priority" ,
"CODE_FILE" : "code_file" ,
"CODE_LINE" : "code_line" ,
"CODE_FUNC" : "code_func" ,
"ERRNO" : "errno" ,
"INVOCATION_ID" : "invocation_id" ,
"USER_INVOCATION_ID" : "user_invocation_id" ,
"SYSLOG_FACILITY" : "syslog_facility" ,
"SYSLOG_IDENTIFIER" : "syslog_identifier" ,
"SYSLOG_PID" : "syslog_pid" ,
"SYSLOG_TIMESTAMP" : "syslog_timestamp" ,
"TID" : "tid" ,
"_PID" : "pid" ,
"_UID" : "uid" ,
"_GID" : "gid" ,
"_COMM" : "comm" ,
"_EXE" : "exe" ,
"_CMDLINE" : "cmdline" ,
// _CAP_EFFECTIVE
"_AUDIT_SESSION" : "audit_session" ,
"_AUDIT_LOGINUID" : "audit_loginuid" ,
"_SYSTEMD_CGROUP" : "systemd_cgroup" ,
"_SYSTEMD_SLICE" : "systemd_slice" ,
"_SYSTEMD_UNIT" : "systemd_unit" ,
"_SYSTEMD_USER_UNIT" : "systemd_user_unit" ,
"_SYSTEMD_USER_SLICE" : "systemd_user_slice" ,
"_SYSTEMD_SESSION" : "systemd_session" ,
"_SYSTEMD_OWNER_UID" : "systemd_owner_uid" ,
"_SOURCE_REALTIME_TIMESTAMP" : "source_realtime_timestamp" ,
"_BOOT_ID" : "boot_id" ,
"_MACHINE_ID" : "machine_id" ,
"_SYSTEMD_INVOCATION_ID" : "systemd_invocation_id" ,
"_HOSTNAME" : "hostname" ,
"_TRANSPORT" : "transport" ,
"_STREAM_ID" : "stream_id" ,
"_LINE_BREAK" : "line_break" ,
"_NAMESPACE" : "namespace" ,
"_KERNEL_DEVICE" : "kernel_device" ,
"_KERNEL_SUBSYSTEM" : "kernel_subsystem" ,
"_UDEV_SYSNAME" : "udev_sysname" ,
"_UDEV_DEVNODE" : "udev_devnode" ,
"_UDEV_DEVLINK" : "udev_devlink" ,
"__CURSOR" : "cursor" ,
"__REALTIME_TIMESTAMP" : "realtime_timestamp" ,
"__MONOTONIC_TIMESTAMP" : "monotonic_timestamp" ,
}
func doInsert ( ctx context . Context , chAddr string , table string , rows [ ] interface { } ) error {
var reqPayload bytes . Buffer
reqEnc := json . NewEncoder ( & reqPayload )
for _ , r := range rows {
if err := reqEnc . Encode ( r ) ; err != nil {
return fmt . Errorf ( "marshalling row: %w" , err )
}
}
req , err := http . NewRequestWithContext ( ctx , "POST" , chAddr , & reqPayload )
if err != nil {
return fmt . Errorf ( "creating request: %w" , err )
}
q := req . URL . Query ( )
q . Set ( "query" , fmt . Sprintf ( "INSERT INTO %v FORMAT JSONEachRow" , table ) )
req . URL . RawQuery = q . Encode ( )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return fmt . Errorf ( "do-ing request: %w" , err )
}
defer resp . Body . Close ( )
if resp . StatusCode < http . StatusOK || resp . StatusCode >= http . StatusMultipleChoices {
io . Copy ( os . Stderr , resp . Body )
return fmt . Errorf ( "unexpected HTTP response %d" , resp . StatusCode )
}
return nil
}
func insertIntoClickhouse ( ctx context . Context , chAddr string , rows [ ] interface { } ) error {
mostRecent := map [ string ] string { }
for _ , row := range rows {
row := row . ( map [ string ] interface { } )
hostname := row [ "scraped_hostname" ] . ( string )
cursor := row [ "cursor" ] . ( string )
mostRecent [ hostname ] = cursor
}
type mostRecentRow struct {
Hostname string ` json:"hostname" `
LastCursor string ` json:"last_cursor" `
}
var mostRecentRows [ ] interface { }
for hostname , lastCursor := range mostRecent {
mostRecentRows = append ( mostRecentRows , mostRecentRow { hostname , lastCursor } )
}
if err := doInsert ( ctx , chAddr , "logger.systemd" , rows ) ; err != nil {
return fmt . Errorf ( "inserting rows into logger.systemd: %w" , err )
}
return doInsert ( ctx , chAddr , "logger.systemd_scrape" , mostRecentRows )
}
2022-01-01 16:30:38 +00:00
func coerceToByteSlice ( n [ ] interface { } ) ( [ ] byte , bool ) {
// Fast fail.
nInt , isInt := n [ 0 ] . ( int )
nFloat , isFloat := n [ 0 ] . ( float64 )
_ , isByte := n [ 0 ] . ( byte )
if ! isInt && ! isByte && ! isFloat {
return nil , false
} else if isInt && ( nInt >= 256 || nInt < 0 ) {
return nil , false
} else if isFloat && float64 ( byte ( nFloat ) ) != nFloat {
return nil , false
}
out := make ( [ ] byte , len ( n ) )
for idx , vInterface := range n {
vInt , okInt := vInterface . ( int )
vByte , okByte := vInterface . ( byte )
vFloat , okFloat := vInterface . ( float64 )
switch {
case okInt :
if vInt >= 256 || vInt < 0 {
return nil , false
}
out [ idx ] = byte ( vInt )
case okByte :
out [ idx ] = vByte
case okFloat :
out [ idx ] = byte ( vFloat )
if float64 ( out [ idx ] ) != vFloat {
return nil , false
}
default :
return nil , false
}
}
return out , true
}
2022-01-01 15:08:52 +00:00
func parseLog ( d map [ string ] interface { } ) ( map [ string ] interface { } , error ) {
data := make ( map [ string ] interface { } )
extraData := make ( map [ string ] interface { } )
2022-01-01 16:30:38 +00:00
msgID := fmt . Sprintf ( "_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; __CURSOR: %v; MESSAGE: %v" , d [ "_HOSTNAME" ] , d [ "__REALTIME_TIMESTAMP" ] , d [ "__CURSOR" ] , d [ "MESSAGE" ] )
2022-01-01 15:08:52 +00:00
for origField , newField := range fieldMap {
v , ok := d [ origField ]
if ! ok {
continue
}
// Check if this is a slice...
if vs , ok := v . ( [ ] interface { } ) ; ok {
2022-01-01 16:30:38 +00:00
// This might be a []byte, though.
vbytes , ok := coerceToByteSlice ( vs )
if ok {
v = string ( vbytes )
} else {
// extraData gets all values after the first one, we keep the first one.
log . Printf ( "[%v] origField %v newField %v got a repeated message; keeping only the first element" , msgID , origField , newField )
extraData [ origField ] = vs [ 1 : ]
v = vs [ 0 ]
}
}
if vs , ok := v . ( [ ] interface { } ) ; ok {
// It's possible we had a [][]byte, so now coerce this again.
vbytes , ok := coerceToByteSlice ( vs )
if ok {
v = string ( vbytes )
}
2022-01-01 15:08:52 +00:00
}
transformer := fieldTransform [ newField ]
if transformer != nil {
var err error
ov := v
v , err = transformer ( v )
if err != nil {
return nil , fmt . Errorf ( "[%v] transforming origField %v newField %v (value %v): %w" , msgID , origField , newField , ov , err )
}
}
delete ( d , origField )
data [ newField ] = v
}
// Anything left in d gets punted into extraData.
// Note that we might overwrite things: if we fail to transform a value that was originally in a slice, then we want to overwrite the value we originally put into extraData.
for fn , v := range d {
extraData [ fn ] = v
}
edJSON , err := json . Marshal ( extraData )
if err != nil {
return nil , fmt . Errorf ( "[%v] encoding extraData as JSON: %w" , msgID , err )
}
data [ "extra_data_json" ] = string ( edJSON )
return data , nil
}
func aggregator ( ctx context . Context , flushEntries int , flushInterval time . Duration , chAddr string , inputCh <- chan interface { } ) error {
log . Printf ( "aggregator starting, flushEntries=%d, flushInterval=%v" , flushEntries , flushInterval )
defer log . Println ( "aggregator terminating" )
t := time . NewTicker ( flushInterval )
defer t . Stop ( )
var buf [ ] interface { }
flushBuffer := func ( ctx context . Context , reason string ) error {
t . Reset ( 20 * time . Minute )
defer t . Reset ( flushInterval )
if len ( buf ) == 0 {
// Fast return; nothing to do.
return nil
}
log . Printf ( "aggregator flushing buffer (%d entries) because %v" , len ( buf ) , reason )
if err := insertIntoClickhouse ( ctx , chAddr , buf ) ; err != nil {
log . Printf ( "aggregator failed to flush buffer: %v" , err )
// For now, never return an error; we want to retry on either the next tick or the next log entry.
return nil
}
// Clear the buffer so we can start again.
buf = nil
return nil
}
aggLoop :
for {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- t . C :
if err := flushBuffer ( ctx , "interval" ) ; err != nil {
return err
}
case d , ok := <- inputCh :
if ! ok {
// Input closed, we're done here.
log . Printf ( "aggregator input closed, trying to shut down (%d events in buffer)" , len ( buf ) )
break aggLoop
}
buf = append ( buf , d )
if len ( buf ) >= flushEntries {
if err := flushBuffer ( ctx , "entries" ) ; err != nil {
return err
}
}
}
}
for n := 0 ; n < 5 ; n ++ {
if err := flushBuffer ( ctx , "shutting down" ) ; err == nil {
return nil
} else {
log . Printf ( "aggregator trying to shut down: flush failed: %v" , err )
}
}
return flushBuffer ( ctx , "shutting down - final attempt" )
}
func mostRecentCursor ( ctx context . Context , clickhouseAddr , hostname string ) ( string , error ) {
req , err := http . NewRequestWithContext ( ctx , "GET" , clickhouseAddr , nil )
if err != nil {
return "" , fmt . Errorf ( "creating request: %w" , err )
}
q := req . URL . Query ( )
q . Set ( "query" , fmt . Sprintf ( "SELECT last_cursor FROM logger.systemd_scrape WHERE hostname='%v' ORDER BY last_scrape DESC LIMIT 1 FORMAT JSONEachRow" , hostname ) )
req . URL . RawQuery = q . Encode ( )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return "" , fmt . Errorf ( "do-ing request: %w" , err )
}
defer resp . Body . Close ( )
if resp . StatusCode != http . StatusOK {
return "" , fmt . Errorf ( "unexpected HTTP status %d" , resp . StatusCode )
}
type lastCursor struct {
LastCursor string ` json:"last_cursor" `
}
var lc lastCursor
err = json . NewDecoder ( resp . Body ) . Decode ( & lc )
switch {
case err == io . EOF :
return "" , nil
case err != nil :
return "" , err
default :
return lc . LastCursor , nil
}
}
func fetcher ( ctx context . Context , hostname , remoteAddr string , retryMinDuration , retryMaxDuration time . Duration , clickhouseAddr string , aggCh chan <- interface { } ) ( err error ) {
log . Printf ( "fetcher[%v] starting" , hostname )
defer func ( ) { log . Printf ( "fetcher[%v] terminating: %v" , hostname , err ) } ( )
var sleepDuration time . Duration
cursor , err := mostRecentCursor ( ctx , clickhouseAddr , hostname )
if err != nil {
return fmt . Errorf ( "mostRecentCursor: %w" , err )
}
// Continuously read JSON messages from the remote.
retryLoop :
for {
if ctx . Err ( ) != nil {
return ctx . Err ( )
}
if sleepDuration > 0 {
log . Printf ( "fetcher[%v] waiting for %v before retry" , hostname , sleepDuration )
time . Sleep ( sleepDuration )
}
if ctx . Err ( ) != nil {
return ctx . Err ( )
}
if sleepDuration == 0 {
sleepDuration = retryMinDuration
} else {
sleepDuration = 2 * sleepDuration
if sleepDuration > retryMaxDuration {
sleepDuration = retryMaxDuration
}
}
req , err := http . NewRequestWithContext ( ctx , "GET" , fmt . Sprintf ( "http://%v/entries?follow" , remoteAddr ) , nil )
if err != nil {
// ??? This seems un-recoverable.
return fmt . Errorf ( "creating HTTP request: %w" , err )
}
if cursor != "" {
log . Printf ( "fetcher[%v] resuming from %v" , hostname , cursor )
req . Header . Set ( "Range" , fmt . Sprintf ( "entries=%v" , cursor ) )
} else {
log . Printf ( "fetcher[%v] starting from scratch" , hostname )
}
req . Header . Set ( "Accept" , "application/json" )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
log . Printf ( "fetcher[%v] fetching log entries: %v" , hostname , err )
continue retryLoop
}
defer resp . Body . Close ( )
if resp . StatusCode != http . StatusOK {
log . Printf ( "fetcher[%v] unexpected status code %d" , hostname , resp . StatusCode )
continue retryLoop
}
d := make ( map [ string ] interface { } )
j := json . NewDecoder ( resp . Body )
for j . More ( ) {
if err := j . Decode ( & d ) ; err != nil {
resp . Body . Close ( )
log . Printf ( "fetcher[%v] JSON decode: %v" , hostname , err )
continue retryLoop
}
data , err := parseLog ( d )
if err != nil {
resp . Body . Close ( )
log . Printf ( "fetcher[%v] parsing log: %v" , hostname , err )
continue retryLoop
}
if cursor == data [ "cursor" ] . ( string ) {
// Skip over the last log entry we'd seen previously.
continue
}
sleepDuration = 0
data [ "scraped_hostname" ] = hostname
cursor = data [ "cursor" ] . ( string )
aggCh <- data
}
resp . Body . Close ( )
log . Printf ( "fetcher[%v] JSON decoder says there's no more content" , hostname )
}
}
var (
flushBufferSizeFlag = flag . Int ( "flush_buffer_size" , 10000 , "Number of entries the buffer can reach before attempting to flush" )
flushBufferIntervalFlag = flag . Duration ( "flush_buffer_duration" , 500 * time . Millisecond , "Interval between buffer flushes" )
hostsFlag = flag . String ( "hosts" , "porcorosso=localhost:19531" , "Comma-separated <hostname>=<address> pairs to scrape logs from" )
clickhouseAddrFlag = flag . String ( "clickhouse_addr" , "http://localhost:8123" , "Address of Clickhouse HTTP API" )
fetcherRetryMin = flag . Duration ( "fetcher_retry_minimum" , 20 * time . Millisecond , "Minimum retry interval for fetching logs" )
fetcherRetryMax = flag . Duration ( "fetcher_retry_maximum" , 5 * time . Minute , "Maximum retry interval for fetching logs (after backoff)" )
)
func main ( ) {
flag . Parse ( )
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
fetchCtx , fetchCancel := signal . NotifyContext ( ctx , os . Interrupt )
defer fetchCancel ( )
// Parse *hostsFlag.
fail := false
scrapeHosts := make ( map [ string ] string )
for _ , hostPair := range strings . Split ( * hostsFlag , "," ) {
// Ideally this would use strings.Cut, but I'm still on Go 1.16...
bits := strings . SplitN ( hostPair , "=" , 2 )
if len ( bits ) != 2 {
log . Println ( "invalid host specification %q (expected <host>=<address>)" , hostPair )
fail = true
}
scrapeHosts [ bits [ 0 ] ] = bits [ 1 ]
}
if fail {
os . Exit ( 1 )
}
var wg sync . WaitGroup
var fetchWG sync . WaitGroup
aggCh := make ( chan interface { } )
go func ( ) {
fetchWG . Wait ( )
close ( aggCh )
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
aggregator ( ctx , * flushBufferSizeFlag , * flushBufferIntervalFlag , * clickhouseAddrFlag , aggCh )
} ( )
for hostname , remoteAddr := range scrapeHosts {
2022-01-01 15:31:47 +00:00
hostname , remoteAddr := hostname , remoteAddr
2022-01-01 15:08:52 +00:00
wg . Add ( 1 )
fetchWG . Add ( 1 )
go func ( ) {
defer wg . Done ( )
defer fetchWG . Done ( )
fetcher ( fetchCtx , hostname , remoteAddr , * fetcherRetryMin , * fetcherRetryMax , * clickhouseAddrFlag , aggCh )
} ( )
}
<- fetchCtx . Done ( )
log . Println ( "context complete; waiting for fetcher goroutines to stop" )
fetchWG . Wait ( )
log . Println ( "context complete; waiting for aggregator goroutine to stop" )
wg . Wait ( )
log . Println ( "buffers flushed" )
}