journal2clickhouse: coerce things that look like strings back to strings
There's binary data sometimes, but on the whole I don't care about preserving it properly (sorry), so let's just coerce it to a string if it is supposed to go into a "proper" field.
This commit is contained in:
parent
7e848a2622
commit
086f5fe597
1 changed files with 56 additions and 5 deletions
|
@ -185,11 +185,49 @@ func insertIntoClickhouse(ctx context.Context, chAddr string, rows []interface{}
|
||||||
return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows)
|
return doInsert(ctx, chAddr, "logger.systemd_scrape", mostRecentRows)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func coerceToByteSlice(n []interface{}) ([]byte, bool) {
|
||||||
|
// Fast fail.
|
||||||
|
nInt, isInt := n[0].(int)
|
||||||
|
nFloat, isFloat := n[0].(float64)
|
||||||
|
_, isByte := n[0].(byte)
|
||||||
|
if !isInt && !isByte && !isFloat {
|
||||||
|
return nil, false
|
||||||
|
} else if isInt && (nInt >= 256 || nInt < 0) {
|
||||||
|
return nil, false
|
||||||
|
} else if isFloat && float64(byte(nFloat)) != nFloat {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]byte, len(n))
|
||||||
|
for idx, vInterface := range n {
|
||||||
|
vInt, okInt := vInterface.(int)
|
||||||
|
vByte, okByte := vInterface.(byte)
|
||||||
|
vFloat, okFloat := vInterface.(float64)
|
||||||
|
switch {
|
||||||
|
case okInt:
|
||||||
|
if vInt >= 256 || vInt < 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
out[idx] = byte(vInt)
|
||||||
|
case okByte:
|
||||||
|
out[idx] = vByte
|
||||||
|
case okFloat:
|
||||||
|
out[idx] = byte(vFloat)
|
||||||
|
if float64(out[idx]) != vFloat {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
func parseLog(d map[string]interface{}) (map[string]interface{}, error) {
|
func parseLog(d map[string]interface{}) (map[string]interface{}, error) {
|
||||||
data := make(map[string]interface{})
|
data := make(map[string]interface{})
|
||||||
extraData := make(map[string]interface{})
|
extraData := make(map[string]interface{})
|
||||||
|
|
||||||
msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; MESSAGE: %v", data["_HOSTNAME"], data["__REALTIME_TIMESTAMP"], data["MESSAGE"])
|
msgID := fmt.Sprintf("_HOSTNAME: %v; __REALTIME_TIMESTAMP: %v; __CURSOR: %v; MESSAGE: %v", d["_HOSTNAME"], d["__REALTIME_TIMESTAMP"], d["__CURSOR"], d["MESSAGE"])
|
||||||
|
|
||||||
for origField, newField := range fieldMap {
|
for origField, newField := range fieldMap {
|
||||||
v, ok := d[origField]
|
v, ok := d[origField]
|
||||||
|
@ -199,11 +237,24 @@ func parseLog(d map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
|
||||||
// Check if this is a slice...
|
// Check if this is a slice...
|
||||||
if vs, ok := v.([]interface{}); ok {
|
if vs, ok := v.([]interface{}); ok {
|
||||||
|
// This might be a []byte, though.
|
||||||
|
vbytes, ok := coerceToByteSlice(vs)
|
||||||
|
if ok {
|
||||||
|
v = string(vbytes)
|
||||||
|
} else {
|
||||||
// extraData gets all values after the first one, we keep the first one.
|
// extraData gets all values after the first one, we keep the first one.
|
||||||
log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField)
|
log.Printf("[%v] origField %v newField %v got a repeated message; keeping only the first element", msgID, origField, newField)
|
||||||
extraData[origField] = vs[1:]
|
extraData[origField] = vs[1:]
|
||||||
v = vs[0]
|
v = vs[0]
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if vs, ok := v.([]interface{}); ok {
|
||||||
|
// It's possible we had a [][]byte, so now coerce this again.
|
||||||
|
vbytes, ok := coerceToByteSlice(vs)
|
||||||
|
if ok {
|
||||||
|
v = string(vbytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
transformer := fieldTransform[newField]
|
transformer := fieldTransform[newField]
|
||||||
if transformer != nil {
|
if transformer != nil {
|
||||||
|
|
Loading…
Reference in a new issue