From 4f3356727a440003669370b93474dce58d69b35f Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Sat, 20 Nov 2021 19:06:43 +0000 Subject: [PATCH] go/trains: use protos for defining the web API This will probably be useful later when I make the web app... --- go/trains/cmd/db2web/db2web.go | 156 +++++++++++--- go/trains/cmd/db2web/default.nix | 2 + go/trains/webapi/default.nix | 15 +- go/trains/webapi/structs.go | 197 ----------------- go/trains/webapi/utils.go | 199 ++++++++++++++++++ go/trains/webapi/webapi.proto | 105 +++++++++ third_party/default.nix | 2 +- .../gopkgs/google.golang.org/grpc/default.nix | 7 +- .../google.golang.org/protobuf/default.nix | 4 +- third_party/tvl/nix/buildGo/default.nix | 17 +- third_party/tvl/nix/buildGo/proto.nix | 76 +------ 11 files changed, 464 insertions(+), 316 deletions(-) delete mode 100644 go/trains/webapi/structs.go create mode 100644 go/trains/webapi/utils.go create mode 100644 go/trains/webapi/webapi.proto diff --git a/go/trains/cmd/db2web/db2web.go b/go/trains/cmd/db2web/db2web.go index d22c2f38a7..db081c0497 100644 --- a/go/trains/cmd/db2web/db2web.go +++ b/go/trains/cmd/db2web/db2web.go @@ -2,8 +2,8 @@ package main import ( "context" - "encoding/json" "errors" + "flag" "fmt" "log" "net/http" @@ -14,9 +14,15 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/encoding/protojson" "hg.lukegb.com/lukegb/depot/go/trains/webapi" ) +var ( + allowStress = flag.Bool("allow_stress", false, "Allow checking the validity of every record in the database") +) + type querier interface { Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row @@ -26,9 +32,9 @@ func summarizeService(ctx context.Context, pc querier, id int) (*webapi.ServiceD row := pc.QueryRow(ctx, ` SELECT ts.id, ts.rid, ts.uid, ts.rsid, ts.headcode, ts.scheduled_start_date::varchar, - ts.train_operator, rop.name, rop.url, - COALESCE(ts.delay_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.delay_reason_tiploc, ''), COALESCE(rlrloc.name, ''), rlrloc.crs, COALESCE(rlrloc.toc, ''), rlrop.name, rlrop.url, COALESCE(ts.delay_reason_tiploc_near, false), - COALESCE(ts.cancellation_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.cancellation_reason_tiploc, ''), COALESCE(rlrloc.name, ''), rlrloc.crs, COALESCE(rlrloc.toc, ''), rlrop.name, rlrop.url, COALESCE(ts.cancellation_reason_tiploc_near, false), + ts.train_operator, COALESCE(rop.name, ''), COALESCE(rop.url, ''), + COALESCE(ts.delay_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.delay_reason_tiploc, ''), COALESCE(rlrloc.name, ''), COALESCE(rlrloc.crs, ''), COALESCE(rlrloc.toc, ''), COALESCE(rlrop.name, ''), COALESCE(rlrop.url, ''), COALESCE(ts.delay_reason_tiploc_near, false), + COALESCE(ts.cancellation_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.cancellation_reason_tiploc, ''), COALESCE(rlrloc.name, ''), COALESCE(rlrloc.crs, ''), COALESCE(rlrloc.toc, ''), COALESCE(rlrop.name, ''), COALESCE(rlrop.url, ''), COALESCE(ts.cancellation_reason_tiploc_near, false), ts.active, ts.deleted, ts.cancelled FROM train_services ts @@ -46,15 +52,16 @@ WHERE `, id) sd := webapi.ServiceData{ - DelayReason: &webapi.DisruptionReason{Location: &webapi.Location{TOC: &webapi.TrainOperator{}}}, - CancelReason: &webapi.DisruptionReason{Location: &webapi.Location{TOC: &webapi.TrainOperator{}}}, + DelayReason: &webapi.DisruptionReason{Location: &webapi.Location{Operator: &webapi.TrainOperator{}}}, + CancelReason: &webapi.DisruptionReason{Location: &webapi.Location{Operator: &webapi.TrainOperator{}}}, + Operator: &webapi.TrainOperator{}, } if err := row.Scan( - &sd.ID, &sd.RID, &sd.UID, &sd.RSID, &sd.Headcode, &sd.StartDate, - &sd.TrainOperator.Code, &sd.TrainOperator.Name, &sd.TrainOperator.URL, - &sd.DelayReason.Code, &sd.DelayReason.Text, &sd.DelayReason.Location.TIPLOC, &sd.DelayReason.Location.Name, &sd.DelayReason.Location.CRS, &sd.DelayReason.Location.TOC.Code, &sd.DelayReason.Location.TOC.Name, &sd.DelayReason.Location.TOC.URL, &sd.DelayReason.NearLocation, - &sd.CancelReason.Code, &sd.CancelReason.Text, &sd.CancelReason.Location.TIPLOC, &sd.CancelReason.Location.Name, &sd.CancelReason.Location.CRS, &sd.CancelReason.Location.TOC.Code, &sd.CancelReason.Location.TOC.Name, &sd.CancelReason.Location.TOC.URL, &sd.CancelReason.NearLocation, - &sd.Active, &sd.Deleted, &sd.Cancelled, + &sd.Id, &sd.Rid, &sd.Uid, &sd.Rsid, &sd.Headcode, &sd.ScheduledStartDate, + &sd.Operator.Code, &sd.Operator.Name, &sd.Operator.Url, + &sd.DelayReason.Code, &sd.DelayReason.Text, &sd.DelayReason.Location.Tiploc, &sd.DelayReason.Location.Name, &sd.DelayReason.Location.Crs, &sd.DelayReason.Location.Operator.Code, &sd.DelayReason.Location.Operator.Name, &sd.DelayReason.Location.Operator.Url, &sd.DelayReason.NearLocation, + &sd.CancelReason.Code, &sd.CancelReason.Text, &sd.CancelReason.Location.Tiploc, &sd.CancelReason.Location.Name, &sd.CancelReason.Location.Crs, &sd.CancelReason.Location.Operator.Code, &sd.CancelReason.Location.Operator.Name, &sd.CancelReason.Location.Operator.Url, &sd.CancelReason.NearLocation, + &sd.IsActive, &sd.IsDeleted, &sd.IsCancelled, ); err != nil { return nil, fmt.Errorf("reading from train_services for %d: %w", id, err) } @@ -64,11 +71,11 @@ WHERE rows, err := pc.Query(ctx, ` SELECT tl.id, - tl.tiploc, COALESCE(rloc.name, ''), COALESCE(rloc.crs, ''), COALESCE(rloc.toc, ''), rloctoc.name, rloctoc.url, - tl.calling_point::varchar, tl.train_length, tl.service_suppressed, + tl.tiploc, COALESCE(rloc.name, ''), COALESCE(rloc.crs, ''), COALESCE(rloc.toc, ''), COALESCE(rloctoc.name, ''), COALESCE(rloctoc.url, ''), + tl.calling_point::varchar, COALESCE(tl.train_length, 0), tl.service_suppressed, tl.schedule_cancelled, COALESCE(tl.schedule_platform, ''), COALESCE(tl.platform, ''), COALESCE(tl.platform_confirmed, false), COALESCE(tl.platform_suppressed, false), - tl.schedule_false_destination_tiploc, COALESCE(rfdloc.name, ''), COALESCE(rfdloc.crs, ''), COALESCE(rfdloc.toc, ''), rfdloctoc.name, rfdloctoc.url, + tl.schedule_false_destination_tiploc, COALESCE(rfdloc.name, ''), COALESCE(rfdloc.crs, ''), COALESCE(rfdloc.toc, ''), COALESCE(rfdloctoc.name, ''), COALESCE(rfdloctoc.url, ''), tl.schedule_public_arrival, tl.schedule_working_arrival, tl.estimated_arrival, tl.working_estimated_arrival, tl.actual_arrival, tl.schedule_public_departure, tl.schedule_working_departure, tl.estimated_departure, tl.working_estimated_departure, tl.actual_departure, @@ -91,28 +98,28 @@ ORDER BY defer rows.Close() for rows.Next() { loc := webapi.ServiceLocation{ - Location: &webapi.Location{TOC: &webapi.TrainOperator{}}, + Location: &webapi.Location{Operator: &webapi.TrainOperator{}}, Platform: &webapi.PlatformData{}, - FalseDestination: &webapi.Location{TOC: &webapi.TrainOperator{}}, - ArrivalTiming: &webapi.TimingData{}, - DepartureTiming: &webapi.TimingData{}, - PassTiming: &webapi.TimingData{}, + FalseDestination: &webapi.Location{Operator: &webapi.TrainOperator{}}, + ArrivalTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, + DepartureTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, + PassTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, } if err := rows.Scan( - &loc.ID, - &loc.Location.TIPLOC, &loc.Location.Name, &loc.Location.CRS, &loc.Location.TOC.Code, &loc.Location.TOC.Name, &loc.Location.TOC.URL, - &loc.CallingPointType, &loc.Length, &loc.Suppressed, + &loc.Id, + &loc.Location.Tiploc, &loc.Location.Name, &loc.Location.Crs, &loc.Location.Operator.Code, &loc.Location.Operator.Name, &loc.Location.Operator.Url, + &loc.CallingPointType, &loc.TrainLength, &loc.ServiceSuppressed, &loc.Cancelled, &loc.Platform.Scheduled, &loc.Platform.Live, &loc.Platform.Confirmed, &loc.Platform.Suppressed, - &loc.FalseDestination.TIPLOC, &loc.FalseDestination.Name, &loc.FalseDestination.CRS, &loc.FalseDestination.TOC.Code, &loc.FalseDestination.TOC.Name, &loc.FalseDestination.TOC.URL, - &loc.ArrivalTiming.PublicScheduled, &loc.ArrivalTiming.WorkingScheduled, &loc.ArrivalTiming.PublicEstimated, &loc.ArrivalTiming.WorkingEstimated, &loc.ArrivalTiming.Actual, - &loc.DepartureTiming.PublicScheduled, &loc.DepartureTiming.WorkingScheduled, &loc.DepartureTiming.PublicEstimated, &loc.DepartureTiming.WorkingEstimated, &loc.DepartureTiming.Actual, - /*&loc.PassTiming.PublicScheduled,*/ &loc.PassTiming.WorkingScheduled, &loc.PassTiming.PublicEstimated, &loc.PassTiming.WorkingEstimated, &loc.PassTiming.Actual, + &loc.FalseDestination.Tiploc, &loc.FalseDestination.Name, &loc.FalseDestination.Crs, &loc.FalseDestination.Operator.Code, &loc.FalseDestination.Operator.Name, &loc.FalseDestination.Operator.Url, + loc.ArrivalTiming.PublicScheduled, loc.ArrivalTiming.WorkingScheduled, loc.ArrivalTiming.PublicEstimated, loc.ArrivalTiming.WorkingEstimated, loc.ArrivalTiming.Actual, + loc.DepartureTiming.PublicScheduled, loc.DepartureTiming.WorkingScheduled, loc.DepartureTiming.PublicEstimated, loc.DepartureTiming.WorkingEstimated, loc.DepartureTiming.Actual, + /*loc.PassTiming.PublicScheduled,*/ loc.PassTiming.WorkingScheduled, loc.PassTiming.PublicEstimated, loc.PassTiming.WorkingEstimated, loc.PassTiming.Actual, ); err != nil { return nil, fmt.Errorf("scanning locations for %d: %w", id, err) } loc.Canonicalize() - sd.Locations = append(sd.Locations, loc) + sd.Locations = append(sd.Locations, &loc) } if rows.Err() != nil { return nil, fmt.Errorf("iterating over locations for %d: %w", id, err) @@ -170,6 +177,79 @@ type server struct { dbPool *pgxpool.Pool } +func (s *server) stress(ctx context.Context, rw http.ResponseWriter, r *http.Request) error { + if r.URL.Path != "/stress" { + return httpError{ + httpStatusCode: http.StatusNotFound, publicError: "not found", + } + } + + // stress test mode, let's go + const numWorkers = 32 + ch := make(chan int, numWorkers) + eg, egCtx := errgroup.WithContext(ctx) + for n := 0; n < numWorkers; n++ { + eg.Go(func() error { + ctx := egCtx + conn, err := s.dbPool.Acquire(ctx) + if err != nil { + return fmt.Errorf("acquiring conn: %w", err) + } + defer conn.Release() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case tsid := <-ch: + if tsid == 0 { + return nil + } + svc, err := summarizeService(ctx, conn, tsid) + if err != nil { + return fmt.Errorf("summarizeService %v: %w", tsid, err) + } + + if _, err := protojson.Marshal(svc); err != nil { + return fmt.Errorf("protojson.Marshal %v: %w", tsid, err) + } + log.Println(tsid) + } + } + return nil + }) + } + + idConn, err := s.dbPool.Acquire(ctx) + if err != nil { + return fmt.Errorf("acquiring idConn: %w", err) + } + defer idConn.Release() + + rows, err := idConn.Query(ctx, "SELECT id FROM train_services ORDER BY id") + if err != nil { + return fmt.Errorf("querying for all IDs: %w", err) + } + defer rows.Close() + +loop: + for rows.Next() { + var w int + if err := rows.Scan(&w); err != nil { + return fmt.Errorf("scanning ID: %w", err) + } + + select { + case ch <- w: + case <-egCtx.Done(): + break loop + } + } + close(ch) + + return eg.Wait() +} + var jsonPathRegexp = regexp.MustCompile(`/([1-9][0-9]*)$`) func (s *server) handleJSON(ctx context.Context, rw http.ResponseWriter, r *http.Request) error { @@ -208,9 +288,13 @@ func (s *server) handleJSON(ctx context.Context, rw http.ResponseWriter, r *http } rw.Header().Set("Content-Type", "application/json; charset=utf-8") - if err := json.NewEncoder(rw).Encode(svc); err != nil { + svcBytes, err := protojson.Marshal(svc) + if err != nil { return fmt.Errorf("encoding JSON: %w", err) } + if _, err := rw.Write(svcBytes); err != nil { + return fmt.Errorf("writing JSON out: %w", err) + } return nil } @@ -271,8 +355,6 @@ func (s *server) handleEventStream(ctx context.Context, rw http.ResponseWriter, rw.Header().Set("Content-Type", "text/event-stream") rw.Header().Set("Cache-Control", "no-cache") - je := json.NewEncoder(rw) - encodeSvc := func(conn querier, id int) error { svc, err := summarizeService(ctx, conn, id) if errors.Is(err, pgx.ErrNoRows) { @@ -286,8 +368,12 @@ func (s *server) handleEventStream(ctx context.Context, rw http.ResponseWriter, if _, err := fmt.Fprint(rw, "data: "); err != nil { return fmt.Errorf("writing data: prefix for service %d: %w", id, err) } - if err := je.Encode(svc); err != nil { - return fmt.Errorf("je.Encode service %d: %w", id, err) + svcBytes, err := protojson.Marshal(svc) + if err != nil { + return fmt.Errorf("protojson.Marshal service %d: %w", id, err) + } + if _, err := rw.Write(svcBytes); err != nil { + return fmt.Errorf("writing protojson for service %d: %w", id, err) } if _, err := fmt.Fprint(rw, "\n\n"); err != nil { @@ -387,6 +473,11 @@ func (s *server) handleHTTP(ctx context.Context, rw http.ResponseWriter, r *http return s.handleJSON(ctx, rw, r) case "text/event-stream": return s.handleEventStream(ctx, rw, r) + case "text/plain": + if *allowStress { + return s.stress(ctx, rw, r) + } + fallthrough default: return httpError{ httpStatusCode: http.StatusNotAcceptable, @@ -427,6 +518,7 @@ func (s *server) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } func main() { + flag.Parse() ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/trains/cmd/db2web/default.nix b/go/trains/cmd/db2web/default.nix index e25986e0c3..0824efa2cf 100644 --- a/go/trains/cmd/db2web/default.nix +++ b/go/trains/cmd/db2web/default.nix @@ -10,6 +10,8 @@ depot.third_party.buildGo.program { deps = with depot.third_party; [ gopkgs."github.com".jackc.pgx.v4 gopkgs."github.com".jackc.pgx.v4.pgxpool + gopkgs."google.golang.org".protobuf.encoding.protojson + gopkgs."golang.org".x.sync.errgroup depot.go.trains.webapi ]; } diff --git a/go/trains/webapi/default.nix b/go/trains/webapi/default.nix index 856d3a2dbf..915291b6ec 100644 --- a/go/trains/webapi/default.nix +++ b/go/trains/webapi/default.nix @@ -3,10 +3,15 @@ # SPDX-License-Identifier: Apache-2.0 { depot, ... }: -depot.third_party.buildGo.package { +(depot.third_party.buildGo.proto { name = "webapi"; - srcs = [ - ./structs.go - ]; path = "hg.lukegb.com/lukegb/depot/go/trains/webapi"; -} + goPackage = ""; + proto = ./webapi.proto; + extraDeps = with depot.third_party; [ + gopkgs."google.golang.org".protobuf.types.known.timestamppb + gopkgs."github.com".jackc.pgtype + ]; +}).overrideGo (old: { + srcs = old.srcs ++ [ ./utils.go ]; +}) diff --git a/go/trains/webapi/structs.go b/go/trains/webapi/structs.go deleted file mode 100644 index d9182a1baa..0000000000 --- a/go/trains/webapi/structs.go +++ /dev/null @@ -1,197 +0,0 @@ -package webapi - -import ( - "time" -) - -type TrainOperator struct { - Code string `json:"code"` - - Name *string `json:"name"` - URL *string `json:"url"` -} - -func (toc *TrainOperator) IsEmpty() bool { - return toc == nil || toc.Code == "" -} - -func (toc *TrainOperator) Canonicalize() { - if toc == nil { - return - } - if *toc.Name == "" { - toc.Name = nil - } - if *toc.URL == "" { - toc.URL = nil - } -} - -type Location struct { - TIPLOC string `json:"tiploc"` - Name string `json:"name"` - - CRS *string `json:"crs"` - TOC *TrainOperator `json:"operator"` -} - -func (loc *Location) IsEmpty() bool { - return loc == nil || loc.TIPLOC == "" -} - -func (loc *Location) Canonicalize() { - if loc == nil { - return - } - if loc.CRS == nil || *loc.CRS == "" { - loc.CRS = nil - } - if loc.TOC.IsEmpty() { - loc.TOC = nil - } - loc.TOC.Canonicalize() -} - -type DisruptionReason struct { - Code int `json:"code"` - Text string `json:"text"` - - Location *Location `json:"location"` - NearLocation bool `json:"near_location"` -} - -func (dr *DisruptionReason) IsEmpty() bool { - return dr == nil || dr.Code == 0 -} - -func (dr *DisruptionReason) Canonicalize() { - if dr == nil { - return - } - if dr.Location.IsEmpty() { - dr.Location = nil - } - dr.Location.Canonicalize() -} - -type ServiceData struct { - ID int `json:"id"` - RID string `json:"rid"` - UID string `json:"uid"` - RSID string `json:"rsid"` - Headcode string `json:"headcode"` - StartDate string `json:"scheduled_start_date"` - - TrainOperator TrainOperator `json:"operator"` - - DelayReason *DisruptionReason `json:"delay_reason"` - CancelReason *DisruptionReason `json:"cancel_reason"` - - Active bool `json:"is_active"` - Deleted bool `json:"is_deleted"` - Cancelled bool `json:"is_cancelled"` - - Locations []ServiceLocation `json:"locations"` -} - -func (sd *ServiceData) Canonicalize() { - if sd.DelayReason.IsEmpty() { - sd.DelayReason = nil - } - sd.DelayReason.Canonicalize() - if sd.CancelReason.IsEmpty() { - sd.CancelReason = nil - } - sd.CancelReason.Canonicalize() -} - -type TimingData struct { - PublicScheduled *time.Time `json:"public_scheduled"` - WorkingScheduled *time.Time `json:"working_scheduled"` - - PublicEstimated *time.Time `json:"public_estimated"` - WorkingEstimated *time.Time `json:"working_estimated"` - - Actual *time.Time `json:"actual"` -} - -func (td *TimingData) IsEmpty() bool { - if td == nil { - return true - } - return td.PublicScheduled == nil && td.WorkingScheduled == nil && td.PublicEstimated == nil && td.WorkingEstimated == nil && td.Actual == nil -} - -func (td *TimingData) Canonicalize() { - if td == nil { - return - } - ts := []**time.Time{&td.PublicScheduled, &td.WorkingScheduled, &td.PublicEstimated, &td.WorkingEstimated, &td.Actual} - for _, t := range ts { - if *t == nil || (*t).IsZero() { - *t = nil - } - } -} - -type PlatformData struct { - Scheduled string `json:"scheduled"` - - Live string `json:"live"` - Confirmed bool `json:"confirmed"` - - Suppressed bool `json:"platform_suppressed"` -} - -func (pd *PlatformData) IsEmpty() bool { - return pd.Scheduled == "" && pd.Live == "" -} - -func (pd *PlatformData) Canonicalize() {} - -type ServiceLocation struct { - ID int `json:"id"` - Location *Location `json:"location"` - CallingPointType string `json:"calling_point_type"` - - Length *int `json:"train_length"` - Suppressed bool `json:"service_suppressed"` - Platform *PlatformData `json:"platform"` - - Cancelled bool `json:"cancelled"` - FalseDestination *Location `json:"false_destination"` - - ArrivalTiming *TimingData `json:"arrival_timing"` - DepartureTiming *TimingData `json:"departure_timing"` - PassTiming *TimingData `json:"pass_timing"` -} - -func (sl *ServiceLocation) Canonicalize() { - if sl == nil { - return - } - sl.Location.Canonicalize() - if sl.Length == nil || *sl.Length == 0 { - sl.Length = nil - } - if sl.Platform.IsEmpty() { - sl.Platform = nil - } - sl.Platform.Canonicalize() - if sl.FalseDestination.IsEmpty() { - sl.FalseDestination = nil - } - sl.FalseDestination.Canonicalize() - if sl.ArrivalTiming.IsEmpty() { - sl.ArrivalTiming = nil - } - sl.ArrivalTiming.Canonicalize() - if sl.DepartureTiming.IsEmpty() { - sl.DepartureTiming = nil - } - sl.DepartureTiming.Canonicalize() - if sl.PassTiming.IsEmpty() { - sl.PassTiming = nil - } - sl.PassTiming.Canonicalize() -} diff --git a/go/trains/webapi/utils.go b/go/trains/webapi/utils.go new file mode 100644 index 0000000000..6c80c09cdb --- /dev/null +++ b/go/trains/webapi/utils.go @@ -0,0 +1,199 @@ +package webapi + +import ( + "fmt" + "time" + + "github.com/jackc/pgtype" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func (toc *TrainOperator) IsEmpty() bool { + return toc == nil || toc.Code == "" +} + +func (toc *TrainOperator) Canonicalize() { + if toc == nil { + return + } +} + +func (loc *Location) IsEmpty() bool { + return loc == nil || loc.Tiploc == "" +} + +func (loc *Location) Canonicalize() { + if loc == nil { + return + } + if loc.Operator.IsEmpty() { + loc.Operator = nil + } + loc.Operator.Canonicalize() +} + +func (dr *DisruptionReason) IsEmpty() bool { + return dr == nil || dr.Code == 0 +} + +func (dr *DisruptionReason) Canonicalize() { + if dr == nil { + return + } + if dr.Location.IsEmpty() { + dr.Location = nil + } + dr.Location.Canonicalize() +} + +func (sd *ServiceData) Canonicalize() { + if sd.DelayReason.IsEmpty() { + sd.DelayReason = nil + } + sd.DelayReason.Canonicalize() + if sd.CancelReason.IsEmpty() { + sd.CancelReason = nil + } + sd.CancelReason.Canonicalize() +} + +func (tz *TimeZone) ToLocation() (*time.Location, error) { + return time.LoadLocation(tz.GetName()) +} + +func (dt *DateTime) ToTime() (time.Time, error) { + tz := dt.GetTimezone() + if tz == nil { + return time.Time{}, fmt.Errorf("no timezone specified") + } + loc, err := tz.ToLocation() + if err != nil { + return time.Time{}, fmt.Errorf("loading timezone (%s / offset %v): %w", tz.GetName(), tz.GetUtcOffset(), err) + } + return dt.GetTimestamp().AsTime().In(loc), nil +} + +func (dt *DateTime) decodeWith(ci *pgtype.ConnInfo, src []byte, dec func(*pgtype.Timestamptz, *pgtype.ConnInfo, []byte) error) error { + dt.Timestamp = nil + dt.Timezone = nil + + if src == nil { + return nil + } + + ttz := &pgtype.Timestamptz{} + if err := dec(ttz, ci, src); err != nil { + return err + } + + if ttz.Status != pgtype.Present { + return nil + } + + *dt = *(FromTime(ttz.Time)) + + return nil +} + +func (dt *DateTime) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { + return dt.decodeWith(ci, src, (*pgtype.Timestamptz).DecodeBinary) +} + +func (dt *DateTime) DecodeText(ci *pgtype.ConnInfo, src []byte) error { + return dt.decodeWith(ci, src, (*pgtype.Timestamptz).DecodeText) +} + +func (dt *DateTime) AssignTo(dst interface{}) error { + if dt == nil { + return pgtype.NullAssignTo(dst) + } + + switch dst := dst.(type) { + case (*DateTime): + *dst = *dt + return nil + default: + if nextDst, retry := pgtype.GetAssignToDstType(dst); retry { + return dt.AssignTo(nextDst) + } + return fmt.Errorf("unable to assign to %T, sad trombone", dst) + } + + return fmt.Errorf("cannot decode %#v into %T, sad trombone", dt, dst) +} + +func FromTime(t time.Time) *DateTime { + zName, zOffset := t.Zone() + tz := &TimeZone{ + Name: zName, + UtcOffset: int32(zOffset), + } + + return &DateTime{ + Timestamp: timestamppb.New(t), + Timezone: tz, + } +} + +func (dt *DateTime) IsZero() bool { + if dt.GetTimestamp() == nil { + return true + } + t, err := dt.ToTime() + if err != nil { + return true + } + return t.IsZero() +} + +func (td *TimingData) IsEmpty() bool { + if td == nil { + return true + } + return td.PublicScheduled.IsZero() && td.WorkingScheduled.IsZero() && td.PublicEstimated.IsZero() && td.WorkingEstimated.IsZero() && td.Actual.IsZero() +} + +func (td *TimingData) Canonicalize() { + if td == nil { + return + } + ts := []**DateTime{&td.PublicScheduled, &td.WorkingScheduled, &td.PublicEstimated, &td.WorkingEstimated, &td.Actual} + for _, t := range ts { + if *t == nil || (*t).IsZero() { + *t = nil + } + } +} + +func (pd *PlatformData) IsEmpty() bool { + return pd.Scheduled == "" && pd.Live == "" +} + +func (pd *PlatformData) Canonicalize() {} + +func (sl *ServiceLocation) Canonicalize() { + if sl == nil { + return + } + sl.Location.Canonicalize() + if sl.Platform.IsEmpty() { + sl.Platform = nil + } + sl.Platform.Canonicalize() + if sl.FalseDestination.IsEmpty() { + sl.FalseDestination = nil + } + sl.FalseDestination.Canonicalize() + if sl.ArrivalTiming.IsEmpty() { + sl.ArrivalTiming = nil + } + sl.ArrivalTiming.Canonicalize() + if sl.DepartureTiming.IsEmpty() { + sl.DepartureTiming = nil + } + sl.DepartureTiming.Canonicalize() + if sl.PassTiming.IsEmpty() { + sl.PassTiming = nil + } + sl.PassTiming.Canonicalize() +} diff --git a/go/trains/webapi/webapi.proto b/go/trains/webapi/webapi.proto new file mode 100644 index 0000000000..a116a95119 --- /dev/null +++ b/go/trains/webapi/webapi.proto @@ -0,0 +1,105 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package trains.webapi; + +message ServiceData { + uint64 id = 1; + string rid = 2; + string uid = 3; + string rsid = 4; + string headcode = 5; + string scheduled_start_date = 6; + + TrainOperator operator = 7; + + DisruptionReason delay_reason = 8; + DisruptionReason cancel_reason = 9; + + bool is_active = 10; + bool is_deleted = 11; + bool is_cancelled = 12; + + repeated ServiceLocation locations = 13; +} + +message TrainOperator { + string code = 1; + + string name = 2; + string url = 3; +} + +message Location { + string tiploc = 1; + string name = 2; + + string crs = 3; + TrainOperator operator = 4; +} + +message DisruptionReason { + uint32 code = 1; + string text = 2; + + Location location = 3; + bool near_location = 4; +} + +message TimeZone { + string name = 1; + int32 utc_offset = 2; +} + +message DateTime { + google.protobuf.Timestamp timestamp = 1; + TimeZone timezone = 2; +} + +message TimingData { + DateTime public_scheduled = 1; + DateTime working_scheduled = 2; + + DateTime public_estimated = 3; + DateTime working_estimated = 4; + + DateTime actual = 5; +} + +message PlatformData { + string scheduled = 1; + + string live = 2; + bool confirmed = 3; + + bool suppressed = 4; +} + +message ServiceLocation { + enum CallingPointType { + CALLING_POINT_UNKNOWN = 0; + OR = 1; + OPOR = 2; + PP = 3; + OPIP = 4; + IP = 5; + OPDT = 6; + DT = 7; + } + + uint32 id = 1; + Location location = 2; + string calling_point_type = 3; + + uint32 train_length = 4; + bool service_suppressed = 5; + PlatformData platform = 6; + + bool cancelled = 7; + Location false_destination = 8; + + TimingData arrival_timing = 9; + TimingData departure_timing = 10; + TimingData pass_timing = 11; +} diff --git a/third_party/default.nix b/third_party/default.nix index d7923c0f76..0551c07747 100644 --- a/third_party/default.nix +++ b/third_party/default.nix @@ -36,7 +36,7 @@ rec { nixos = import ./nixpkgs/nixos; nixeval = import ./nixpkgs/nixos/lib/eval-config.nix; buildGo = - let orig = import ./tvl/nix/buildGo { pkgs = nixpkgs; }; + let orig = import ./tvl/nix/buildGo { pkgs = nixpkgs; inherit gopkgs; }; in orig // { program = { dockerData ? [], ... }@args: let diff --git a/third_party/gopkgs/google.golang.org/grpc/default.nix b/third_party/gopkgs/google.golang.org/grpc/default.nix index 4adbd6cb31..ec473f10cc 100644 --- a/third_party/gopkgs/google.golang.org/grpc/default.nix +++ b/third_party/gopkgs/google.golang.org/grpc/default.nix @@ -8,8 +8,8 @@ depot.third_party.buildGo.external { src = depot.third_party.nixpkgs.fetchFromGitHub { owner = "grpc"; repo = "grpc-go"; - rev = "v1.36.1"; - hash = "sha256:0l3prxp18lb0pagqg4l6c9i0l6gakfxgf6vxcsv589i0xsxw8ivm"; + rev = "v1.42.0"; + sha256 = "sha256:0k5k762licfzs56nk817g83qji4np32z0gwnfbwr95y70klvs76q"; }; deps = with depot.third_party; [ gopkgs."golang.org".x.net.http2 @@ -18,8 +18,11 @@ depot.third_party.buildGo.external { gopkgs."golang.org".x.sys.unix gopkgs."github.com".golang.protobuf.proto gopkgs."github.com".golang.protobuf.ptypes + gopkgs."google.golang.org".protobuf.compiler.protogen gopkgs."google.golang.org".protobuf.reflect.protoreflect gopkgs."google.golang.org".protobuf.runtime.protoimpl + gopkgs."google.golang.org".protobuf.types.descriptorpb + gopkgs."google.golang.org".protobuf.types.pluginpb gopkgs."google.golang.org".protobuf.types.known.durationpb gopkgs."google.golang.org".protobuf.types.known.timestamppb gopkgs."google.golang.org".genproto.googleapis.rpc.status diff --git a/third_party/gopkgs/google.golang.org/protobuf/default.nix b/third_party/gopkgs/google.golang.org/protobuf/default.nix index c3ff0d6125..4cd66068c6 100644 --- a/third_party/gopkgs/google.golang.org/protobuf/default.nix +++ b/third_party/gopkgs/google.golang.org/protobuf/default.nix @@ -8,7 +8,7 @@ depot.third_party.buildGo.external { src = depot.third_party.nixpkgs.fetchFromGitHub { owner = "protocolbuffers"; repo = "protobuf-go"; - rev = "d3470999428befce9bbefe77980ff65ac5a494c4"; - hash = "sha256:0sgwfkcr6n7m1ivyq34rz4rd6gm5pzswa73nvzj59dkaknj68xfb"; + rev = "v1.27.1"; + sha256 = "sha256:0aszb7cv8fq1m8akgd4kjyg5q7g5z9fdqnry6057ygq9r8r2yif2"; }; } diff --git a/third_party/tvl/nix/buildGo/default.nix b/third_party/tvl/nix/buildGo/default.nix index 18b69dcd8a..700c5fa3c6 100644 --- a/third_party/tvl/nix/buildGo/default.nix +++ b/third_party/tvl/nix/buildGo/default.nix @@ -5,6 +5,7 @@ # rules_go. { pkgs ? import {} +, gopkgs , ... }: let @@ -110,7 +111,7 @@ let # Import support libraries needed for protobuf & gRPC support protoLibs = import ./proto.nix { - inherit external; + inherit gopkgs; }; # Build a Go library out of the specified protobuf definition. @@ -119,8 +120,16 @@ let deps = [ protoLibs.goProto.proto.gopkg ] ++ extraDeps; srcs = lib.singleton (runCommand "goproto-${name}.pb.go" {} '' cp ${proto} ${baseNameOf proto} - ${protobuf}/bin/protoc --plugin=${protoLibs.goProto.protoc-gen-go.gopkg}/bin/protoc-gen-go \ - --go_out=plugins=grpc,import_path=${baseNameOf path}:. ${baseNameOf proto} + ${protobuf}/bin/protoc \ + --plugin=${protoLibs.goProto.cmd.protoc-gen-go.gopkg}/bin/protoc-gen-go \ + --go_out=. \ + --go_opt=paths=source_relative \ + --go_opt=M${baseNameOf proto}=${path} \ + --plugin=${protoLibs.goGrpc.cmd.protoc-gen-go-grpc.gopkg}/bin/protoc-gen-go-grpc \ + --go-grpc_out=. \ + --go-grpc_opt=paths=source_relative \ + --go-grpc_opt=M${baseNameOf proto}=${path} \ + ${baseNameOf proto} mv ./${goPackage}/*.pb.go $out ''); }; @@ -133,7 +142,7 @@ in { # overrideable. program = makeOverridable program; package = makeOverridable package; - proto = makeOverridable proto; + proto = proto; grpc = makeOverridable grpc; external = makeOverridable external; } diff --git a/third_party/tvl/nix/buildGo/proto.nix b/third_party/tvl/nix/buildGo/proto.nix index 57e9a5ec98..92036faa81 100644 --- a/third_party/tvl/nix/buildGo/proto.nix +++ b/third_party/tvl/nix/buildGo/proto.nix @@ -4,81 +4,11 @@ # This file provides derivations for the dependencies of a gRPC # service in Go. -{ external }: +{ gopkgs }: let inherit (builtins) fetchGit map; in rec { - goProto = external { - path = "github.com/golang/protobuf"; - src = fetchGit { - url = "https://github.com/golang/protobuf"; - rev = "ed6926b37a637426117ccab59282c3839528a700"; - }; - }; - - xnet = external { - path = "golang.org/x/net"; - - src = fetchGit { - url = "https://go.googlesource.com/net"; - rev = "ffdde105785063a81acd95bdf89ea53f6e0aac2d"; - }; - - deps = [ - xtext.secure.bidirule - xtext.unicode.bidi - xtext.unicode.norm - ]; - }; - - xsys = external { - path = "golang.org/x/sys"; - src = fetchGit { - url = "https://go.googlesource.com/sys"; - rev = "bd437916bb0eb726b873ee8e9b2dcf212d32e2fd"; - }; - }; - - xtext = external { - path = "golang.org/x/text"; - src = fetchGit { - url = "https://go.googlesource.com/text"; - rev = "cbf43d21aaebfdfeb81d91a5f444d13a3046e686"; - }; - }; - - genproto = external { - path = "google.golang.org/genproto"; - src = fetchGit { - url = "https://github.com/google/go-genproto"; - rev = "83cc0476cb11ea0da33dacd4c6354ab192de6fe6"; - }; - - deps = with goProto; [ - proto - ptypes.any - ]; - }; - - goGrpc = external { - path = "google.golang.org/grpc"; - deps = ([ - xnet.trace - xnet.http2 - xsys.unix - xnet.http2.hpack - genproto.googleapis.rpc.status - ] ++ (with goProto; [ - proto - ptypes - ptypes.duration - ptypes.timestamp - ])); - - src = fetchGit { - url = "https://github.com/grpc/grpc-go"; - rev = "d8e3da36ac481ef00e510ca119f6b68177713689"; - }; - }; + goProto = gopkgs."google.golang.org".protobuf; + goGrpc = gopkgs."google.golang.org".grpc; }