depot/go/trains/cmd/rttingest/tiploc.go

132 lines
3.3 KiB
Go
Raw Permalink Normal View History

2021-11-18 22:24:20 +00:00
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
pgx "github.com/jackc/pgx/v4"
)
func fetchWork(ctx context.Context, pc *pgx.Conn) ([]string, error) {
rows, err := pc.Query(ctx, `SELECT tiploc FROM ref_locations WHERE name=tiploc AND override_name IS NULL`)
if err != nil {
return nil, fmt.Errorf("querying for work: %w", err)
}
defer rows.Close()
var tiplocs []string
for rows.Next() {
var tiploc string
if err := rows.Scan(&tiploc); err != nil {
return nil, fmt.Errorf("scanning row: %w", err)
}
tiplocs = append(tiplocs, tiploc)
}
if rows.Err() != nil {
return nil, fmt.Errorf("retrieving rows for work: %w", err)
}
return tiplocs, nil
}
type RTT struct {
Username, Password string
}
type RTTLocationDetail struct {
TIPLOC string `json:"tiploc"`
CRS string `json:"crs"`
Name string `json:"name"`
Description string `json:"description"`
}
type RTTLocationContainer struct {
Location *RTTLocationDetail `json:"location"`
}
func (r RTT) LocationDetail(ctx context.Context, inp string) (*RTTLocationDetail, error) {
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://api.rtt.io/api/v1/json/search/%v", inp), nil)
if err != nil {
return nil, fmt.Errorf("formulating query for %q: %w", inp, err)
}
req.SetBasicAuth(r.Username, r.Password)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("querying API for %q: %w", inp, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status for %q was %d %v; wanted 200 OK", inp, resp.StatusCode, resp.Status)
}
var cont RTTLocationContainer
if err := json.NewDecoder(resp.Body).Decode(&cont); err != nil {
return nil, fmt.Errorf("unmarshalling API response for %q: %w", inp, err)
}
return cont.Location, nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pc, err := pgx.Connect(ctx, "host=/var/run/postgresql database=trains search_path=darwindb")
if err != nil {
log.Fatalf("pgx.Connect: %v", err)
}
defer pc.Close(context.Background())
tiplocs, err := fetchWork(ctx, pc)
if err != nil {
log.Fatalf("fetchWork: %v", err)
}
log.Printf("%d tiplocs to process", len(tiplocs))
r := RTT{Username: "rttapi_lukegb", Password: "5c1e5ae8a88882b090cd2e55225a3f0f581ec57c"}
var rttApiErrCount int
for n, t := range tiplocs {
if rttApiErrCount == 20 {
log.Println("Too many errors from RTT, stopping")
break
}
if n%20 == 0 {
log.Printf("%d/%d (%d%%)", n, len(tiplocs), (n*100)/len(tiplocs))
}
d, err := r.LocationDetail(ctx, t)
if err != nil {
log.Printf("LocationDetail(ctx, %q): %v", t, err)
rttApiErrCount++
continue
}
if d == nil {
log.Printf("got a nil location for %q!", t)
rttApiErrCount++
continue
}
rttApiErrCount = 0 // reset the breaker
if d.Name == "" {
log.Printf("No name for %q", t)
continue
}
ct, err := pc.Exec(ctx, `UPDATE ref_locations SET name=$2, override_name=$2, override_name_src='RTT' WHERE tiploc=$1 AND override_name IS NULL AND name=tiploc`, t, d.Name)
if err != nil {
log.Printf("Updating database for %q failed: %w", t, err)
break
}
if ra := ct.RowsAffected(); ra != 1 {
log.Printf("Updated %d rows for %q; wanted 1???", ra, t)
}
}
log.Println("terminating...")
}