depot/go/trains/darwin/darwingest/darwingestftp/darwingestftp.go

170 lines
3.9 KiB
Go
Raw Normal View History

2021-11-18 22:24:20 +00:00
package darwingestftp
import (
"compress/gzip"
"context"
"encoding/xml"
"errors"
"fmt"
"io"
"log"
"sort"
"time"
"github.com/jlaffaye/ftp"
"hg.lukegb.com/lukegb/depot/go/trains/darwin"
)
func loadGzippedFile(ctx context.Context, sc *ftp.ServerConn, fn string, cb func(pp *darwin.PushPort) error) error {
log.Printf("fetching %v", fn)
resp, err := sc.Retr(fn)
if err != nil {
return fmt.Errorf("Retr(%q): %w", fn, err)
}
defer resp.Close()
respGz, err := gzip.NewReader(resp)
if err != nil {
return fmt.Errorf("gzip.NewReader from %q: %w", fn, err)
}
defer respGz.Close()
xmlDec := xml.NewDecoder(respGz)
for {
if ctx.Err() != nil {
return ctx.Err()
}
var pp darwin.PushPort
if err := xmlDec.Decode(&pp); errors.Is(err, io.EOF) {
break
} else if err != nil {
return fmt.Errorf("xmlDec.Decode from %q: %w", fn, err)
}
if err := cb(&pp); err != nil {
return fmt.Errorf("processing %q: %w", fn, err)
}
}
return nil
}
func LoadSnapshot(ctx context.Context, sc *ftp.ServerConn, cb func(*darwin.PushPort) error) (lowWater, highWater time.Time, err error) {
log.Println("fetching snapshot...")
err = loadGzippedFile(ctx, sc, "/snapshot/snapshot.gz", func(pp *darwin.PushPort) error {
if lowWater.IsZero() || lowWater.After(pp.Time) {
lowWater = pp.Time
}
if highWater.IsZero() || highWater.Before(pp.Time) {
highWater = pp.Time
}
return cb(pp)
})
if err != nil {
return time.Time{}, time.Time{}, err
}
return lowWater, highWater, nil
}
type logFTPFile struct {
Filename string
EndAt time.Time
}
func LoadLogs(ctx context.Context, sc *ftp.ServerConn, startAt, endAt time.Time, cb func(*darwin.PushPort) error) error {
firstIt := true
for {
if ctx.Err() != nil {
return ctx.Err()
}
log.Println("fetching logs starting at", startAt)
entries, err := sc.List("/pushport")
if err != nil {
return fmt.Errorf("listing /pushport: %w", err)
}
var fs []logFTPFile
for _, e := range entries {
if e.Type != ftp.EntryTypeFile {
continue
}
t, err := time.ParseInLocation("pPortData.log.2006-01-02_1504.gz", e.Name, darwin.London)
if err != nil {
log.Println("failed to parse filename %q: %v", err)
continue
}
fs = append(fs, logFTPFile{
Filename: e.Name,
EndAt: t,
})
}
sort.Slice(fs, func(i, j int) bool {
return fs[i].EndAt.Before(fs[j].EndAt)
})
startReadingFrom := -1
endReadingAt := len(fs) - 1
for n, f := range fs {
if f.EndAt.After(startAt) && startReadingFrom == -1 {
startReadingFrom = n
}
if f.EndAt.After(endAt) && !endAt.IsZero() {
endReadingAt = n
break
}
}
var lowWater, highWater time.Time
if startReadingFrom != -1 {
for n := startReadingFrom; n <= endReadingAt; n++ {
fmt.Printf("%v: ", fs[n].Filename)
err := loadGzippedFile(ctx, sc, fmt.Sprintf("/pushport/%v", fs[n].Filename), func(pp *darwin.PushPort) error {
if lowWater.IsZero() || lowWater.After(pp.Time) {
lowWater = pp.Time
}
if highWater.IsZero() || highWater.Before(pp.Time) {
highWater = pp.Time
}
if !pp.Time.After(startAt) {
fmt.Printf("x")
return nil
} else if !endAt.IsZero() && (endAt.Before(pp.Time) || endAt.Equal(pp.Time)) {
fmt.Printf("_")
return nil
}
return cb(pp)
})
if err != nil {
return err
}
fmt.Print("\n")
}
if !lowWater.Before(startAt) {
// Don't believe that we saw everything :/
log.Printf("low water was %v; start at was %v; we might not have seen everything!", lowWater, startAt)
}
}
if !highWater.After(endAt) {
if !firstIt {
log.Printf("not yet caught up: high water %v; end at %v; waiting 1 minute...", highWater, endAt)
if err := darwin.Sleep(ctx, 1*time.Minute); err != nil {
return err
}
}
firstIt = false
if !highWater.IsZero() {
startAt = highWater
}
continue
}
return nil
}
}