package darwingestftp import ( "compress/gzip" "context" "encoding/xml" "errors" "fmt" "io" "log" "sort" "time" "github.com/jlaffaye/ftp" "git.lukegb.com/lukegb/depot/go/trains/darwin" ) func loadGzippedFile(ctx context.Context, sc *ftp.ServerConn, fn string, cb func(pp *darwin.PushPort) error) error { log.Printf("fetching %v", fn) resp, err := sc.Retr(fn) if err != nil { return fmt.Errorf("Retr(%q): %w", fn, err) } defer resp.Close() respGz, err := gzip.NewReader(resp) if err != nil { return fmt.Errorf("gzip.NewReader from %q: %w", fn, err) } defer respGz.Close() xmlDec := xml.NewDecoder(respGz) for { if ctx.Err() != nil { return ctx.Err() } var pp darwin.PushPort if err := xmlDec.Decode(&pp); errors.Is(err, io.EOF) { break } else if err != nil { return fmt.Errorf("xmlDec.Decode from %q: %w", fn, err) } if err := cb(&pp); err != nil { return fmt.Errorf("processing %q: %w", fn, err) } } return nil } func LoadSnapshot(ctx context.Context, sc *ftp.ServerConn, cb func(*darwin.PushPort) error) (lowWater, highWater time.Time, err error) { log.Println("fetching snapshot...") err = loadGzippedFile(ctx, sc, "/snapshot/snapshot.gz", func(pp *darwin.PushPort) error { if lowWater.IsZero() || lowWater.After(pp.Time) { lowWater = pp.Time } if highWater.IsZero() || highWater.Before(pp.Time) { highWater = pp.Time } return cb(pp) }) if err != nil { return time.Time{}, time.Time{}, err } return lowWater, highWater, nil } type logFTPFile struct { Filename string EndAt time.Time } func LoadLogs(ctx context.Context, sc *ftp.ServerConn, startAt, endAt time.Time, cb func(*darwin.PushPort) error) error { firstIt := true for { if ctx.Err() != nil { return ctx.Err() } log.Println("fetching logs starting at", startAt) entries, err := sc.List("/pushport") if err != nil { return fmt.Errorf("listing /pushport: %w", err) } var fs []logFTPFile for _, e := range entries { if e.Type != ftp.EntryTypeFile { continue } t, err := time.ParseInLocation("pPortData.log.2006-01-02_1504.gz", e.Name, darwin.London) if err != nil { log.Println("failed to parse filename %q: %v", err) continue } fs = append(fs, logFTPFile{ Filename: e.Name, EndAt: t, }) } sort.Slice(fs, func(i, j int) bool { return fs[i].EndAt.Before(fs[j].EndAt) }) startReadingFrom := -1 endReadingAt := len(fs) - 1 for n, f := range fs { if f.EndAt.After(startAt) && startReadingFrom == -1 { startReadingFrom = n } if f.EndAt.After(endAt) && !endAt.IsZero() { endReadingAt = n break } } var lowWater, highWater time.Time if startReadingFrom != -1 { for n := startReadingFrom; n <= endReadingAt; n++ { fmt.Printf("%v: ", fs[n].Filename) err := loadGzippedFile(ctx, sc, fmt.Sprintf("/pushport/%v", fs[n].Filename), func(pp *darwin.PushPort) error { if lowWater.IsZero() || lowWater.After(pp.Time) { lowWater = pp.Time } if highWater.IsZero() || highWater.Before(pp.Time) { highWater = pp.Time } if !pp.Time.After(startAt) { fmt.Printf("x") return nil } else if !endAt.IsZero() && (endAt.Before(pp.Time) || endAt.Equal(pp.Time)) { fmt.Printf("_") return nil } return cb(pp) }) if err != nil { return err } fmt.Print("\n") } if !lowWater.Before(startAt) { // Don't believe that we saw everything :/ log.Printf("low water was %v; start at was %v; we might not have seen everything!", lowWater, startAt) } } if !highWater.After(endAt) { if !firstIt { log.Printf("not yet caught up: high water %v; end at %v; waiting 1 minute...", highWater, endAt) if err := darwin.Sleep(ctx, 1*time.Minute); err != nil { return err } } firstIt = false if !highWater.IsZero() { startAt = highWater } continue } return nil } }