From 7af28f3ef5604d54b7537014aff5f0feb11d7750 Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Sat, 20 Nov 2021 19:12:47 +0000 Subject: [PATCH] go/trains: split summarizeService into its own package It might come in handy when sending push notifications to devices... --- go/trains/cmd/db2web/db2web.go | 119 ++----------------------- go/trains/cmd/db2web/default.nix | 1 + go/trains/webapi/default.nix | 6 +- go/trains/webapi/summarize/default.nix | 14 +++ go/trains/webapi/summarize/service.go | 114 +++++++++++++++++++++++ 5 files changed, 140 insertions(+), 114 deletions(-) create mode 100644 go/trains/webapi/summarize/default.nix create mode 100644 go/trains/webapi/summarize/service.go diff --git a/go/trains/cmd/db2web/db2web.go b/go/trains/cmd/db2web/db2web.go index db081c0497..f4abefd878 100644 --- a/go/trains/cmd/db2web/db2web.go +++ b/go/trains/cmd/db2web/db2web.go @@ -16,118 +16,13 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/protojson" - "hg.lukegb.com/lukegb/depot/go/trains/webapi" + "hg.lukegb.com/lukegb/depot/go/trains/webapi/summarize" ) var ( allowStress = flag.Bool("allow_stress", false, "Allow checking the validity of every record in the database") ) -type querier interface { - Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) - QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row -} - -func summarizeService(ctx context.Context, pc querier, id int) (*webapi.ServiceData, error) { - row := pc.QueryRow(ctx, ` -SELECT - ts.id, ts.rid, ts.uid, ts.rsid, ts.headcode, ts.scheduled_start_date::varchar, - ts.train_operator, COALESCE(rop.name, ''), COALESCE(rop.url, ''), - COALESCE(ts.delay_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.delay_reason_tiploc, ''), COALESCE(rlrloc.name, ''), COALESCE(rlrloc.crs, ''), COALESCE(rlrloc.toc, ''), COALESCE(rlrop.name, ''), COALESCE(rlrop.url, ''), COALESCE(ts.delay_reason_tiploc_near, false), - COALESCE(ts.cancellation_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.cancellation_reason_tiploc, ''), COALESCE(rlrloc.name, ''), COALESCE(rlrloc.crs, ''), COALESCE(rlrloc.toc, ''), COALESCE(rlrop.name, ''), COALESCE(rlrop.url, ''), COALESCE(ts.cancellation_reason_tiploc_near, false), - ts.active, ts.deleted, ts.cancelled -FROM - train_services ts - LEFT JOIN ref_tocs rop ON rop.toc=ts.train_operator - - LEFT JOIN ref_late_running_reasons rlr ON rlr.code=ts.delay_reason_code::int - LEFT JOIN ref_locations rlrloc ON rlrloc.tiploc=ts.delay_reason_tiploc - LEFT JOIN ref_tocs rlrop ON rlrop.toc=rlrloc.toc - - LEFT JOIN ref_cancel_reasons rc ON rc.code=ts.cancellation_reason_code::int - LEFT JOIN ref_locations rcloc ON rcloc.tiploc=ts.cancellation_reason_tiploc - LEFT JOIN ref_tocs rcop ON rcop.toc=rcloc.toc -WHERE - ts.id=$1 -`, id) - - sd := webapi.ServiceData{ - DelayReason: &webapi.DisruptionReason{Location: &webapi.Location{Operator: &webapi.TrainOperator{}}}, - CancelReason: &webapi.DisruptionReason{Location: &webapi.Location{Operator: &webapi.TrainOperator{}}}, - Operator: &webapi.TrainOperator{}, - } - if err := row.Scan( - &sd.Id, &sd.Rid, &sd.Uid, &sd.Rsid, &sd.Headcode, &sd.ScheduledStartDate, - &sd.Operator.Code, &sd.Operator.Name, &sd.Operator.Url, - &sd.DelayReason.Code, &sd.DelayReason.Text, &sd.DelayReason.Location.Tiploc, &sd.DelayReason.Location.Name, &sd.DelayReason.Location.Crs, &sd.DelayReason.Location.Operator.Code, &sd.DelayReason.Location.Operator.Name, &sd.DelayReason.Location.Operator.Url, &sd.DelayReason.NearLocation, - &sd.CancelReason.Code, &sd.CancelReason.Text, &sd.CancelReason.Location.Tiploc, &sd.CancelReason.Location.Name, &sd.CancelReason.Location.Crs, &sd.CancelReason.Location.Operator.Code, &sd.CancelReason.Location.Operator.Name, &sd.CancelReason.Location.Operator.Url, &sd.CancelReason.NearLocation, - &sd.IsActive, &sd.IsDeleted, &sd.IsCancelled, - ); err != nil { - return nil, fmt.Errorf("reading from train_services for %d: %w", id, err) - } - sd.Canonicalize() - - // Now for the locations. - rows, err := pc.Query(ctx, ` -SELECT - tl.id, - tl.tiploc, COALESCE(rloc.name, ''), COALESCE(rloc.crs, ''), COALESCE(rloc.toc, ''), COALESCE(rloctoc.name, ''), COALESCE(rloctoc.url, ''), - tl.calling_point::varchar, COALESCE(tl.train_length, 0), tl.service_suppressed, - tl.schedule_cancelled, - COALESCE(tl.schedule_platform, ''), COALESCE(tl.platform, ''), COALESCE(tl.platform_confirmed, false), COALESCE(tl.platform_suppressed, false), - tl.schedule_false_destination_tiploc, COALESCE(rfdloc.name, ''), COALESCE(rfdloc.crs, ''), COALESCE(rfdloc.toc, ''), COALESCE(rfdloctoc.name, ''), COALESCE(rfdloctoc.url, ''), - - tl.schedule_public_arrival, tl.schedule_working_arrival, tl.estimated_arrival, tl.working_estimated_arrival, tl.actual_arrival, - tl.schedule_public_departure, tl.schedule_working_departure, tl.estimated_departure, tl.working_estimated_departure, tl.actual_departure, - /* no public_pass */ tl.schedule_working_pass, tl.estimated_pass, tl.working_estimated_pass, tl.actual_pass -FROM - train_locations tl - LEFT JOIN ref_locations rloc ON rloc.tiploc=tl.tiploc - LEFT JOIN ref_tocs rloctoc ON rloctoc.toc=rloc.toc - - LEFT JOIN ref_locations rfdloc ON rfdloc.tiploc=tl.schedule_false_destination_tiploc - LEFT JOIN ref_tocs rfdloctoc ON rfdloctoc.toc=rfdloc.toc -WHERE - tl.tsid=$1 AND tl.active_in_schedule -ORDER BY - COALESCE(tl.schedule_working_arrival, tl.schedule_working_pass, tl.schedule_working_departure) -`, id) - if err != nil { - return nil, fmt.Errorf("querying locations for %d: %w", id, err) - } - defer rows.Close() - for rows.Next() { - loc := webapi.ServiceLocation{ - Location: &webapi.Location{Operator: &webapi.TrainOperator{}}, - Platform: &webapi.PlatformData{}, - FalseDestination: &webapi.Location{Operator: &webapi.TrainOperator{}}, - ArrivalTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, - DepartureTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, - PassTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, - } - if err := rows.Scan( - &loc.Id, - &loc.Location.Tiploc, &loc.Location.Name, &loc.Location.Crs, &loc.Location.Operator.Code, &loc.Location.Operator.Name, &loc.Location.Operator.Url, - &loc.CallingPointType, &loc.TrainLength, &loc.ServiceSuppressed, - &loc.Cancelled, - &loc.Platform.Scheduled, &loc.Platform.Live, &loc.Platform.Confirmed, &loc.Platform.Suppressed, - &loc.FalseDestination.Tiploc, &loc.FalseDestination.Name, &loc.FalseDestination.Crs, &loc.FalseDestination.Operator.Code, &loc.FalseDestination.Operator.Name, &loc.FalseDestination.Operator.Url, - loc.ArrivalTiming.PublicScheduled, loc.ArrivalTiming.WorkingScheduled, loc.ArrivalTiming.PublicEstimated, loc.ArrivalTiming.WorkingEstimated, loc.ArrivalTiming.Actual, - loc.DepartureTiming.PublicScheduled, loc.DepartureTiming.WorkingScheduled, loc.DepartureTiming.PublicEstimated, loc.DepartureTiming.WorkingEstimated, loc.DepartureTiming.Actual, - /*loc.PassTiming.PublicScheduled,*/ loc.PassTiming.WorkingScheduled, loc.PassTiming.PublicEstimated, loc.PassTiming.WorkingEstimated, loc.PassTiming.Actual, - ); err != nil { - return nil, fmt.Errorf("scanning locations for %d: %w", id, err) - } - loc.Canonicalize() - sd.Locations = append(sd.Locations, &loc) - } - if rows.Err() != nil { - return nil, fmt.Errorf("iterating over locations for %d: %w", id, err) - } - - return &sd, nil -} - type watchingResponseWriter struct { http.ResponseWriter http.Flusher @@ -205,9 +100,9 @@ func (s *server) stress(ctx context.Context, rw http.ResponseWriter, r *http.Req if tsid == 0 { return nil } - svc, err := summarizeService(ctx, conn, tsid) + svc, err := summarize.Service(ctx, conn, tsid) if err != nil { - return fmt.Errorf("summarizeService %v: %w", tsid, err) + return fmt.Errorf("summarize.Service %v: %w", tsid, err) } if _, err := protojson.Marshal(svc); err != nil { @@ -276,7 +171,7 @@ func (s *server) handleJSON(ctx context.Context, rw http.ResponseWriter, r *http } defer conn.Release() - svc, err := summarizeService(ctx, conn, id) + svc, err := summarize.Service(ctx, conn, id) if errors.Is(err, pgx.ErrNoRows) { return httpError{ err: err, @@ -355,14 +250,14 @@ func (s *server) handleEventStream(ctx context.Context, rw http.ResponseWriter, rw.Header().Set("Content-Type", "text/event-stream") rw.Header().Set("Cache-Control", "no-cache") - encodeSvc := func(conn querier, id int) error { - svc, err := summarizeService(ctx, conn, id) + encodeSvc := func(conn summarize.Querier, id int) error { + svc, err := summarize.Service(ctx, conn, id) if errors.Is(err, pgx.ErrNoRows) { if _, err := fmt.Fprintf(rw, "event: notyet\ndata: %d\n\n", id); err != nil { return err } } else if err != nil { - return fmt.Errorf("summarizeService(%d): %w", id, err) + return fmt.Errorf("summarize.Service(%d): %w", id, err) } if _, err := fmt.Fprint(rw, "data: "); err != nil { diff --git a/go/trains/cmd/db2web/default.nix b/go/trains/cmd/db2web/default.nix index 0824efa2cf..10b43c14b0 100644 --- a/go/trains/cmd/db2web/default.nix +++ b/go/trains/cmd/db2web/default.nix @@ -13,5 +13,6 @@ depot.third_party.buildGo.program { gopkgs."google.golang.org".protobuf.encoding.protojson gopkgs."golang.org".x.sync.errgroup depot.go.trains.webapi + depot.go.trains.webapi.summarize ]; } diff --git a/go/trains/webapi/default.nix b/go/trains/webapi/default.nix index 915291b6ec..ea8196bb57 100644 --- a/go/trains/webapi/default.nix +++ b/go/trains/webapi/default.nix @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -{ depot, ... }: +{ depot, ... }@args: (depot.third_party.buildGo.proto { name = "webapi"; path = "hg.lukegb.com/lukegb/depot/go/trains/webapi"; @@ -14,4 +14,6 @@ ]; }).overrideGo (old: { srcs = old.srcs ++ [ ./utils.go ]; -}) +}) // { + summarize = import ./summarize args; +} diff --git a/go/trains/webapi/summarize/default.nix b/go/trains/webapi/summarize/default.nix new file mode 100644 index 0000000000..448eab9842 --- /dev/null +++ b/go/trains/webapi/summarize/default.nix @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2020 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.package { + name = "summarize"; + path = "hg.lukegb.com/lukegb/depot/go/trains/webapi/summarize"; + srcs = [ ./service.go ]; + deps = with depot.third_party; [ + gopkgs."github.com".jackc.pgx.v4 + depot.go.trains.webapi + ]; +} diff --git a/go/trains/webapi/summarize/service.go b/go/trains/webapi/summarize/service.go new file mode 100644 index 0000000000..148660b4e1 --- /dev/null +++ b/go/trains/webapi/summarize/service.go @@ -0,0 +1,114 @@ +package summarize + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v4" + "hg.lukegb.com/lukegb/depot/go/trains/webapi" +) + +type Querier interface { + Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row +} + +func Service(ctx context.Context, pc Querier, id int) (*webapi.ServiceData, error) { + row := pc.QueryRow(ctx, ` +SELECT + ts.id, ts.rid, ts.uid, ts.rsid, ts.headcode, ts.scheduled_start_date::varchar, + ts.train_operator, COALESCE(rop.name, ''), COALESCE(rop.url, ''), + COALESCE(ts.delay_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.delay_reason_tiploc, ''), COALESCE(rlrloc.name, ''), COALESCE(rlrloc.crs, ''), COALESCE(rlrloc.toc, ''), COALESCE(rlrop.name, ''), COALESCE(rlrop.url, ''), COALESCE(ts.delay_reason_tiploc_near, false), + COALESCE(ts.cancellation_reason_code::int, 0), COALESCE(rlr.text, ''), COALESCE(ts.cancellation_reason_tiploc, ''), COALESCE(rlrloc.name, ''), COALESCE(rlrloc.crs, ''), COALESCE(rlrloc.toc, ''), COALESCE(rlrop.name, ''), COALESCE(rlrop.url, ''), COALESCE(ts.cancellation_reason_tiploc_near, false), + ts.active, ts.deleted, ts.cancelled +FROM + train_services ts + LEFT JOIN ref_tocs rop ON rop.toc=ts.train_operator + + LEFT JOIN ref_late_running_reasons rlr ON rlr.code=ts.delay_reason_code::int + LEFT JOIN ref_locations rlrloc ON rlrloc.tiploc=ts.delay_reason_tiploc + LEFT JOIN ref_tocs rlrop ON rlrop.toc=rlrloc.toc + + LEFT JOIN ref_cancel_reasons rc ON rc.code=ts.cancellation_reason_code::int + LEFT JOIN ref_locations rcloc ON rcloc.tiploc=ts.cancellation_reason_tiploc + LEFT JOIN ref_tocs rcop ON rcop.toc=rcloc.toc +WHERE + ts.id=$1 +`, id) + + sd := webapi.ServiceData{ + DelayReason: &webapi.DisruptionReason{Location: &webapi.Location{Operator: &webapi.TrainOperator{}}}, + CancelReason: &webapi.DisruptionReason{Location: &webapi.Location{Operator: &webapi.TrainOperator{}}}, + Operator: &webapi.TrainOperator{}, + } + if err := row.Scan( + &sd.Id, &sd.Rid, &sd.Uid, &sd.Rsid, &sd.Headcode, &sd.ScheduledStartDate, + &sd.Operator.Code, &sd.Operator.Name, &sd.Operator.Url, + &sd.DelayReason.Code, &sd.DelayReason.Text, &sd.DelayReason.Location.Tiploc, &sd.DelayReason.Location.Name, &sd.DelayReason.Location.Crs, &sd.DelayReason.Location.Operator.Code, &sd.DelayReason.Location.Operator.Name, &sd.DelayReason.Location.Operator.Url, &sd.DelayReason.NearLocation, + &sd.CancelReason.Code, &sd.CancelReason.Text, &sd.CancelReason.Location.Tiploc, &sd.CancelReason.Location.Name, &sd.CancelReason.Location.Crs, &sd.CancelReason.Location.Operator.Code, &sd.CancelReason.Location.Operator.Name, &sd.CancelReason.Location.Operator.Url, &sd.CancelReason.NearLocation, + &sd.IsActive, &sd.IsDeleted, &sd.IsCancelled, + ); err != nil { + return nil, fmt.Errorf("reading from train_services for %d: %w", id, err) + } + sd.Canonicalize() + + // Now for the locations. + rows, err := pc.Query(ctx, ` +SELECT + tl.id, + tl.tiploc, COALESCE(rloc.name, ''), COALESCE(rloc.crs, ''), COALESCE(rloc.toc, ''), COALESCE(rloctoc.name, ''), COALESCE(rloctoc.url, ''), + tl.calling_point::varchar, COALESCE(tl.train_length, 0), tl.service_suppressed, + tl.schedule_cancelled, + COALESCE(tl.schedule_platform, ''), COALESCE(tl.platform, ''), COALESCE(tl.platform_confirmed, false), COALESCE(tl.platform_suppressed, false), + tl.schedule_false_destination_tiploc, COALESCE(rfdloc.name, ''), COALESCE(rfdloc.crs, ''), COALESCE(rfdloc.toc, ''), COALESCE(rfdloctoc.name, ''), COALESCE(rfdloctoc.url, ''), + + tl.schedule_public_arrival, tl.schedule_working_arrival, tl.estimated_arrival, tl.working_estimated_arrival, tl.actual_arrival, + tl.schedule_public_departure, tl.schedule_working_departure, tl.estimated_departure, tl.working_estimated_departure, tl.actual_departure, + /* no public_pass */ tl.schedule_working_pass, tl.estimated_pass, tl.working_estimated_pass, tl.actual_pass +FROM + train_locations tl + LEFT JOIN ref_locations rloc ON rloc.tiploc=tl.tiploc + LEFT JOIN ref_tocs rloctoc ON rloctoc.toc=rloc.toc + + LEFT JOIN ref_locations rfdloc ON rfdloc.tiploc=tl.schedule_false_destination_tiploc + LEFT JOIN ref_tocs rfdloctoc ON rfdloctoc.toc=rfdloc.toc +WHERE + tl.tsid=$1 AND tl.active_in_schedule +ORDER BY + COALESCE(tl.schedule_working_arrival, tl.schedule_working_pass, tl.schedule_working_departure) +`, id) + if err != nil { + return nil, fmt.Errorf("querying locations for %d: %w", id, err) + } + defer rows.Close() + for rows.Next() { + loc := webapi.ServiceLocation{ + Location: &webapi.Location{Operator: &webapi.TrainOperator{}}, + Platform: &webapi.PlatformData{}, + FalseDestination: &webapi.Location{Operator: &webapi.TrainOperator{}}, + ArrivalTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, + DepartureTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, + PassTiming: &webapi.TimingData{PublicScheduled: &webapi.DateTime{}, WorkingScheduled: &webapi.DateTime{}, PublicEstimated: &webapi.DateTime{}, WorkingEstimated: &webapi.DateTime{}, Actual: &webapi.DateTime{}}, + } + if err := rows.Scan( + &loc.Id, + &loc.Location.Tiploc, &loc.Location.Name, &loc.Location.Crs, &loc.Location.Operator.Code, &loc.Location.Operator.Name, &loc.Location.Operator.Url, + &loc.CallingPointType, &loc.TrainLength, &loc.ServiceSuppressed, + &loc.Cancelled, + &loc.Platform.Scheduled, &loc.Platform.Live, &loc.Platform.Confirmed, &loc.Platform.Suppressed, + &loc.FalseDestination.Tiploc, &loc.FalseDestination.Name, &loc.FalseDestination.Crs, &loc.FalseDestination.Operator.Code, &loc.FalseDestination.Operator.Name, &loc.FalseDestination.Operator.Url, + loc.ArrivalTiming.PublicScheduled, loc.ArrivalTiming.WorkingScheduled, loc.ArrivalTiming.PublicEstimated, loc.ArrivalTiming.WorkingEstimated, loc.ArrivalTiming.Actual, + loc.DepartureTiming.PublicScheduled, loc.DepartureTiming.WorkingScheduled, loc.DepartureTiming.PublicEstimated, loc.DepartureTiming.WorkingEstimated, loc.DepartureTiming.Actual, + /*loc.PassTiming.PublicScheduled,*/ loc.PassTiming.WorkingScheduled, loc.PassTiming.PublicEstimated, loc.PassTiming.WorkingEstimated, loc.PassTiming.Actual, + ); err != nil { + return nil, fmt.Errorf("scanning locations for %d: %w", id, err) + } + loc.Canonicalize() + sd.Locations = append(sd.Locations, &loc) + } + if rows.Err() != nil { + return nil, fmt.Errorf("iterating over locations for %d: %w", id, err) + } + + return &sd, nil +}