diff --git a/go/journal2clickhouse/journal2clickhouse.go b/go/journal2clickhouse/journal2clickhouse.go index 6e5606babc..47069b5fff 100644 --- a/go/journal2clickhouse/journal2clickhouse.go +++ b/go/journal2clickhouse/journal2clickhouse.go @@ -185,11 +185,49 @@ func insertIntoClickhouse(ctx context.Context, chAddr string, rows []interface{} return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows) } +func coerceToByteSlice(n []interface{}) ([]byte, bool) { + // Fast fail. + nInt, isInt := n[0].(int) + nFloat, isFloat := n[0].(float64) + _, isByte := n[0].(byte) + if !isInt && !isByte && !isFloat { + return nil, false + } else if isInt && (nInt >= 256 || nInt < 0) { + return nil, false + } else if isFloat && float64(byte(nFloat)) != nFloat { + return nil, false + } + + out := make([]byte, len(n)) + for idx, vInterface := range n { + vInt, okInt := vInterface.(int) + vByte, okByte := vInterface.(byte) + vFloat, okFloat := vInterface.(float64) + switch { + case okInt: + if vInt >= 256 || vInt < 0 { + return nil, false + } + out[idx] = byte(vInt) + case okByte: + out[idx] = vByte + case okFloat: + out[idx] = byte(vFloat) + if float64(out[idx]) != vFloat { + return nil, false + } + default: + return nil, false + } + } + return out, true +} + func parseLog(d map[string]interface{}) (map[string]interface{}, error) { data := make(map[string]interface{}) extraData := make(map[string]interface{}) - msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; MESSAGE: %v", data["_HOSTNAME"], data["__REALTIME_TIMESTAMP"], data["MESSAGE"]) + msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; __CURSOR: %v; MESSAGE: %v", d["_HOSTNAME"], d["__REALTIME_TIMESTAMP"], d["__CURSOR"], d["MESSAGE"]) for origField, newField := range fieldMap { v, ok := d[origField] @@ -199,10 +237,23 @@ func parseLog(d map[string]interface{}) (map[string]interface{}, error) { // Check if this is a slice... if vs, ok := v.([]interface{}); ok { - // extraData gets all values after the first one, we keep the first one. - log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField) - extraData[origField] = vs[1:] - v = vs[0] + // This might be a []byte, though. + vbytes, ok := coerceToByteSlice(vs) + if ok { + v = string(vbytes) + } else { + // extraData gets all values after the first one, we keep the first one. + log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField) + extraData[origField] = vs[1:] + v = vs[0] + } + } + if vs, ok := v.([]interface{}); ok { + // It's possible we had a [][]byte, so now coerce this again. + vbytes, ok := coerceToByteSlice(vs) + if ok { + v = string(vbytes) + } } transformer := fieldTransform[newField]