170 lines
3.9 KiB
Go
170 lines
3.9 KiB
Go
|
package darwingestftp
|
||
|
|
||
|
import (
|
||
|
"compress/gzip"
|
||
|
"context"
|
||
|
"encoding/xml"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"log"
|
||
|
"sort"
|
||
|
"time"
|
||
|
|
||
|
"github.com/jlaffaye/ftp"
|
||
|
"hg.lukegb.com/lukegb/depot/go/trains/darwin"
|
||
|
)
|
||
|
|
||
|
func loadGzippedFile(ctx context.Context, sc *ftp.ServerConn, fn string, cb func(pp *darwin.PushPort) error) error {
|
||
|
log.Printf("fetching %v", fn)
|
||
|
resp, err := sc.Retr(fn)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Retr(%q): %w", fn, err)
|
||
|
}
|
||
|
defer resp.Close()
|
||
|
|
||
|
respGz, err := gzip.NewReader(resp)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("gzip.NewReader from %q: %w", fn, err)
|
||
|
}
|
||
|
defer respGz.Close()
|
||
|
|
||
|
xmlDec := xml.NewDecoder(respGz)
|
||
|
for {
|
||
|
if ctx.Err() != nil {
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
|
||
|
var pp darwin.PushPort
|
||
|
if err := xmlDec.Decode(&pp); errors.Is(err, io.EOF) {
|
||
|
break
|
||
|
} else if err != nil {
|
||
|
return fmt.Errorf("xmlDec.Decode from %q: %w", fn, err)
|
||
|
}
|
||
|
|
||
|
if err := cb(&pp); err != nil {
|
||
|
return fmt.Errorf("processing %q: %w", fn, err)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func LoadSnapshot(ctx context.Context, sc *ftp.ServerConn, cb func(*darwin.PushPort) error) (lowWater, highWater time.Time, err error) {
|
||
|
log.Println("fetching snapshot...")
|
||
|
err = loadGzippedFile(ctx, sc, "/snapshot/snapshot.gz", func(pp *darwin.PushPort) error {
|
||
|
if lowWater.IsZero() || lowWater.After(pp.Time) {
|
||
|
lowWater = pp.Time
|
||
|
}
|
||
|
if highWater.IsZero() || highWater.Before(pp.Time) {
|
||
|
highWater = pp.Time
|
||
|
}
|
||
|
return cb(pp)
|
||
|
})
|
||
|
if err != nil {
|
||
|
return time.Time{}, time.Time{}, err
|
||
|
}
|
||
|
return lowWater, highWater, nil
|
||
|
}
|
||
|
|
||
|
type logFTPFile struct {
|
||
|
Filename string
|
||
|
EndAt time.Time
|
||
|
}
|
||
|
|
||
|
func LoadLogs(ctx context.Context, sc *ftp.ServerConn, startAt, endAt time.Time, cb func(*darwin.PushPort) error) error {
|
||
|
firstIt := true
|
||
|
for {
|
||
|
if ctx.Err() != nil {
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
|
||
|
log.Println("fetching logs starting at", startAt)
|
||
|
entries, err := sc.List("/pushport")
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("listing /pushport: %w", err)
|
||
|
}
|
||
|
|
||
|
var fs []logFTPFile
|
||
|
for _, e := range entries {
|
||
|
if e.Type != ftp.EntryTypeFile {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
t, err := time.ParseInLocation("pPortData.log.2006-01-02_1504.gz", e.Name, darwin.London)
|
||
|
if err != nil {
|
||
|
log.Println("failed to parse filename %q: %v", err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fs = append(fs, logFTPFile{
|
||
|
Filename: e.Name,
|
||
|
EndAt: t,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
sort.Slice(fs, func(i, j int) bool {
|
||
|
return fs[i].EndAt.Before(fs[j].EndAt)
|
||
|
})
|
||
|
|
||
|
startReadingFrom := -1
|
||
|
endReadingAt := len(fs) - 1
|
||
|
for n, f := range fs {
|
||
|
if f.EndAt.After(startAt) && startReadingFrom == -1 {
|
||
|
startReadingFrom = n
|
||
|
}
|
||
|
if f.EndAt.After(endAt) && !endAt.IsZero() {
|
||
|
endReadingAt = n
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var lowWater, highWater time.Time
|
||
|
if startReadingFrom != -1 {
|
||
|
for n := startReadingFrom; n <= endReadingAt; n++ {
|
||
|
fmt.Printf("%v: ", fs[n].Filename)
|
||
|
err := loadGzippedFile(ctx, sc, fmt.Sprintf("/pushport/%v", fs[n].Filename), func(pp *darwin.PushPort) error {
|
||
|
if lowWater.IsZero() || lowWater.After(pp.Time) {
|
||
|
lowWater = pp.Time
|
||
|
}
|
||
|
if highWater.IsZero() || highWater.Before(pp.Time) {
|
||
|
highWater = pp.Time
|
||
|
}
|
||
|
if !pp.Time.After(startAt) {
|
||
|
fmt.Printf("x")
|
||
|
return nil
|
||
|
} else if !endAt.IsZero() && (endAt.Before(pp.Time) || endAt.Equal(pp.Time)) {
|
||
|
fmt.Printf("_")
|
||
|
return nil
|
||
|
}
|
||
|
return cb(pp)
|
||
|
})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
fmt.Print("\n")
|
||
|
}
|
||
|
|
||
|
if !lowWater.Before(startAt) {
|
||
|
// Don't believe that we saw everything :/
|
||
|
log.Printf("low water was %v; start at was %v; we might not have seen everything!", lowWater, startAt)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !highWater.After(endAt) {
|
||
|
if !firstIt {
|
||
|
log.Printf("not yet caught up: high water %v; end at %v; waiting 1 minute...", highWater, endAt)
|
||
|
if err := darwin.Sleep(ctx, 1*time.Minute); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
firstIt = false
|
||
|
if !highWater.IsZero() {
|
||
|
startAt = highWater
|
||
|
}
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
}
|