go/trains: start work towards gRPC API; add FCM topic publisher

This commit is contained in:
Luke Granger-Brown 2021-11-20 23:46:53 +00:00
parent 7af28f3ef5
commit c9cc5cc5f4
18 changed files with 458 additions and 116 deletions

View file

@ -8,6 +8,10 @@ result-*
web/quotes/theme/static/
syntax: regexp
^go/trains/.*/start.sh$
^go/trains/.*/lukegb-trains.json$
syntax: glob
*.sw?
*.pyc

View file

@ -120,12 +120,26 @@ func main() {
return tx.Commit(ctx)
}
notify := func(ctx context.Context, tx pgx.Tx, s string) error {
if _, err := tx.Exec(ctx, fmt.Sprintf("NOTIFY %q", s)); err != nil {
return fmt.Errorf("notifying channel for %q: %w", s, err)
}
if _, err := tx.Exec(ctx, fmt.Sprintf("NOTIFY \"firehose\", '%s'", s)); err != nil {
return fmt.Errorf("notifying firehose for %q: %w", s, err)
}
return nil
}
sendAffectedNotifications := func(ctx context.Context, tx pgx.Tx, affected *darwindb.Affected) error {
for tsid := range affected.TSIDs {
if _, err := tx.Exec(ctx, fmt.Sprintf(`NOTIFY "tsid-%d"`, tsid)); err != nil {
if err := notify(ctx, tx, fmt.Sprintf("tsid-%d", tsid)); err != nil {
return fmt.Errorf("notifying channel for tsid %d: %w", tsid, err)
}
}
for tiploc := range affected.TIPLOCs {
if err := notify(ctx, tx, fmt.Sprintf("tiploc-%s", tiploc)); err != nil {
return fmt.Errorf("notifying channel for tiploc %s: %w", tiploc, err)
}
}
return nil
}
processMessage := func(ctx context.Context, pp *darwin.PushPort) error {

View file

@ -0,0 +1,117 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
firebase "firebase.google.com/go/v4"
"firebase.google.com/go/v4/messaging"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
)
var (
projectID = flag.String("project_id", "lukegb-trains", "Firebase project ID")
batchInterval = flag.Duration("batch_interval", 1*time.Second, "interval at which to batch messages")
batchMax = flag.Int("batch_max", 300, "Maximum messages before we will instantly send the batch")
)
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pc, err := pgx.Connect(ctx, ("host=/var/run/postgresql database=trains search_path=darwindb"))
if err != nil {
log.Fatalf("pgx.Connect: %v", err)
}
defer pc.Close(context.Background())
fapp, err := firebase.NewApp(ctx, &firebase.Config{
ProjectID: *projectID,
})
if err != nil {
log.Fatalf("firebase.NewApp: %v", err)
}
fcm, err := fapp.Messaging(ctx)
if err != nil {
log.Fatalf("fapp.Messaging: %v", err)
}
if _, err := pc.Exec(ctx, "LISTEN firehose"); err != nil {
log.Fatalf("subscribing to firehose: %v", err)
}
notificationCh := make(chan *pgconn.Notification)
go func() {
defer close(notificationCh)
for {
n, err := pc.WaitForNotification(ctx)
if err != nil {
log.Fatalf("WaitForNotification: %v", err)
}
notificationCh <- n
}
}()
t := time.NewTicker(*batchInterval)
batch := map[string]bool{}
sendBatchNow := func(ctx context.Context) error {
if len(batch) == 0 {
return nil
}
msgs := make([]*messaging.Message, 0, len(batch))
for b := range batch {
msgs = append(msgs, &messaging.Message{
Topic: b,
})
}
br, err := fcm.SendAll(ctx, msgs)
if err != nil {
return fmt.Errorf("SendAll: %q", err)
}
log.Printf("sent batch: %d success, %d failure", br.SuccessCount, br.FailureCount)
if br.FailureCount != 0 {
for _, sr := range br.Responses {
if sr.Success {
continue
}
log.Printf("message %s: %v", sr.MessageID, sr.Error)
}
}
batch = map[string]bool{}
return nil
}
maybeSendBatch := func(ctx context.Context) error {
if len(batch) < *batchMax {
return nil
}
return sendBatchNow(ctx)
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-t.C:
// send batch
if err := sendBatchNow(ctx); err != nil {
log.Printf("sending batch on ticker: %v", err)
}
case n := <-notificationCh:
batch[n.Payload] = true
if err := maybeSendBatch(ctx); err != nil {
log.Printf("sending batch because batch is full: %v", err)
}
}
}
}

View file

@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.program {
name = "db2fcm";
srcs = [ ./db2fcm.go ];
deps = with depot.third_party; [
gopkgs."github.com".jackc.pgx.v4
gopkgs."firebase.google.com".go.v4
gopkgs."firebase.google.com".go.v4.messaging
];
}

View file

@ -10,17 +10,22 @@ import (
"net/url"
"regexp"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"hg.lukegb.com/lukegb/depot/go/trains/webapi/summarize"
)
var (
allowStress = flag.Bool("allow_stress", false, "Allow checking the validity of every record in the database")
certFile = flag.String("cert_file", "", "Path to TLS certificate. If empty, listens on plaintext HTTP instead.")
keyFile = flag.String("key_file", "", "Path to TLS private key.")
)
type watchingResponseWriter struct {
@ -70,6 +75,7 @@ func (he httpError) Error() string {
type server struct {
dbPool *pgxpool.Pool
grpcServer *grpc.Server
}
func (s *server) stress(ctx context.Context, rw http.ResponseWriter, r *http.Request) error {
@ -363,6 +369,10 @@ func (s *server) handleEventStream(ctx context.Context, rw http.ResponseWriter,
func (s *server) handleHTTP(ctx context.Context, rw http.ResponseWriter, r *http.Request) error {
accept := r.Header.Get("Accept")
if r.ProtoMajor == 2 && strings.HasPrefix(accept, "application/grpc") {
s.grpcServer.ServeHTTP(rw, r)
return nil
}
switch accept {
case "application/json":
return s.handleJSON(ctx, rw, r)
@ -443,8 +453,18 @@ func main() {
s := &server{
dbPool: pcpool,
grpcServer: nil,
}
listen := ":13974"
log.Printf("listening on %s", listen)
log.Fatal(http.ListenAndServe(listen, s))
httpServer := &http.Server{
Addr: listen,
Handler: s,
}
if *certFile != "" && *keyFile != "" {
log.Printf("listening on %s (TLS)", listen)
log.Fatal(httpServer.ListenAndServeTLS(*certFile, *keyFile))
} else {
log.Printf("listening on %s (plaintext HTTP)", listen)
log.Fatal(httpServer.ListenAndServe())
}
}

View file

@ -7,11 +7,12 @@
rec {
darwiningestd = import ./darwiningestd args;
db2web = import ./db2web args;
db2fcm = import ./db2fcm args;
rttingest = import ./rttingest args;
train2livesplit = import ./train2livesplit args;
bins = {
inherit darwiningestd db2web train2livesplit;
inherit darwiningestd db2web db2fcm train2livesplit;
inherit (rttingest) tiploc;
};
}

View file

@ -3,17 +3,17 @@
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }@args:
(depot.third_party.buildGo.proto {
depot.third_party.buildGo.grpc {
name = "webapi";
path = "hg.lukegb.com/lukegb/depot/go/trains/webapi";
goPackage = "";
proto = ./webapi.proto;
goPackage = "webapi";
protos = [ ./types.proto ./webapi.proto ];
withGrpc = true;
extraDeps = with depot.third_party; [
gopkgs."google.golang.org".protobuf.types.known.timestamppb
gopkgs."github.com".jackc.pgtype
];
}).overrideGo (old: {
srcs = old.srcs ++ [ ./utils.go ];
}) // {
extraSrcs = [ ./utils.go ];
} // {
summarize = import ./summarize args;
}

View file

@ -0,0 +1,105 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package trains.webapi;
message ServiceData {
uint64 id = 1;
string rid = 2;
string uid = 3;
string rsid = 4;
string headcode = 5;
string scheduled_start_date = 6;
TrainOperator operator = 7;
DisruptionReason delay_reason = 8;
DisruptionReason cancel_reason = 9;
bool is_active = 10;
bool is_deleted = 11;
bool is_cancelled = 12;
repeated ServiceLocation locations = 13;
}
message TrainOperator {
string code = 1;
string name = 2;
string url = 3;
}
message Location {
string tiploc = 1;
string name = 2;
string crs = 3;
TrainOperator operator = 4;
}
message DisruptionReason {
uint32 code = 1;
string text = 2;
Location location = 3;
bool near_location = 4;
}
message TimeZone {
string name = 1;
int32 utc_offset = 2;
}
message DateTime {
google.protobuf.Timestamp timestamp = 1;
TimeZone timezone = 2;
}
message TimingData {
DateTime public_scheduled = 1;
DateTime working_scheduled = 2;
DateTime public_estimated = 3;
DateTime working_estimated = 4;
DateTime actual = 5;
}
message PlatformData {
string scheduled = 1;
string live = 2;
bool confirmed = 3;
bool suppressed = 4;
}
message ServiceLocation {
enum CallingPointType {
CALLING_POINT_UNKNOWN = 0;
OR = 1;
OPOR = 2;
PP = 3;
OPIP = 4;
IP = 5;
OPDT = 6;
DT = 7;
}
uint32 id = 1;
Location location = 2;
string calling_point_type = 3;
uint32 train_length = 4;
bool service_suppressed = 5;
PlatformData platform = 6;
bool cancelled = 7;
Location false_destination = 8;
TimingData arrival_timing = 9;
TimingData departure_timing = 10;
TimingData pass_timing = 11;
}

View file

@ -1,105 +1,43 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "types.proto";
package trains.webapi;
message ServiceData {
message SummarizeServiceRequest {
uint64 id = 1;
string rid = 2;
string uid = 3;
string rsid = 4;
string headcode = 5;
string scheduled_start_date = 6;
TrainOperator operator = 7;
DisruptionReason delay_reason = 8;
DisruptionReason cancel_reason = 9;
bool is_active = 10;
bool is_deleted = 11;
bool is_cancelled = 12;
repeated ServiceLocation locations = 13;
}
message TrainOperator {
string code = 1;
string name = 2;
string url = 3;
}
message Location {
message LocationRequest {
string tiploc = 1;
string name = 2;
string crs = 2;
string crs = 3;
TrainOperator operator = 4;
bool include_in_past = 3;
}
message DisruptionReason {
uint32 code = 1;
string text = 2;
message ListServicesRequest {
string rid = 1;
string uid = 2;
string headcode = 3;
string scheduled_start_date = 4;
string train_operator_code = 5;
Location location = 3;
bool near_location = 4;
LocationRequest origin = 6;
LocationRequest destination = 7;
LocationRequest calling_at = 8;
LocationRequest passing = 9;
bool include_deleted = 10;
uint32 max_results = 11;
}
message TimeZone {
string name = 1;
int32 utc_offset = 2;
message ServiceList {
repeated ServiceData services = 1;
}
message DateTime {
google.protobuf.Timestamp timestamp = 1;
TimeZone timezone = 2;
}
message TimingData {
DateTime public_scheduled = 1;
DateTime working_scheduled = 2;
DateTime public_estimated = 3;
DateTime working_estimated = 4;
DateTime actual = 5;
}
message PlatformData {
string scheduled = 1;
string live = 2;
bool confirmed = 3;
bool suppressed = 4;
}
message ServiceLocation {
enum CallingPointType {
CALLING_POINT_UNKNOWN = 0;
OR = 1;
OPOR = 2;
PP = 3;
OPIP = 4;
IP = 5;
OPDT = 6;
DT = 7;
}
uint32 id = 1;
Location location = 2;
string calling_point_type = 3;
uint32 train_length = 4;
bool service_suppressed = 5;
PlatformData platform = 6;
bool cancelled = 7;
Location false_destination = 8;
TimingData arrival_timing = 9;
TimingData departure_timing = 10;
TimingData pass_timing = 11;
service TrainsService {
rpc SummarizeService(SummarizeServiceRequest) returns (ServiceData) {}
rpc ListServices(ListServicesRequest) returns (ServiceList) {}
}

View file

@ -0,0 +1,46 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.external {
path = "cloud.google.com/go";
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "googleapis";
repo = "google-cloud-go";
rev = "37c240e79a5296f51cf4ed5d9f4861febbc5a959";
sha256 = "sha256:1f9hvnc7cd9hsw0mgn8fjwcmn030qy93lv4lpcmsgzb3gbj7iyp1";
};
deps = with depot.third_party; [
gopkgs."golang.org".x.oauth2.google
gopkgs."golang.org".x.xerrors
gopkgs."github.com".golang.protobuf.proto
gopkgs."github.com".golang.protobuf.ptypes
gopkgs."github.com".golang.protobuf.ptypes.timestamp
gopkgs."github.com".golang.protobuf.ptypes.wrappers
gopkgs."github.com".googleapis.gax-go.v2
gopkgs."google.golang.org".api.googleapi
gopkgs."google.golang.org".api.iamcredentials.v1
gopkgs."google.golang.org".api.iterator
gopkgs."google.golang.org".api.option
gopkgs."google.golang.org".api.option.internaloption
gopkgs."google.golang.org".api.storage.v1
gopkgs."google.golang.org".api.transport
gopkgs."google.golang.org".api.transport.http
gopkgs."google.golang.org".api.transport.grpc
gopkgs."google.golang.org".genproto.googleapis.firestore.v1
gopkgs."google.golang.org".genproto.googleapis.iam.v1
gopkgs."google.golang.org".genproto.googleapis.storage.v2
gopkgs."google.golang.org".genproto.googleapis.type.expr
gopkgs."google.golang.org".genproto.googleapis.rpc.code
gopkgs."google.golang.org".genproto.googleapis.type.latlng
gopkgs."google.golang.org".grpc
gopkgs."google.golang.org".grpc.codes
gopkgs."google.golang.org".grpc.metadata
gopkgs."google.golang.org".grpc.status
gopkgs."google.golang.org".protobuf.proto
gopkgs."google.golang.org".protobuf.reflect.protoreflect
gopkgs."google.golang.org".protobuf.types.known.timestamppb
gopkgs."go.opencensus.io".trace
];
}

View file

@ -0,0 +1,22 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.external {
path = "firebase.google.com/go/v4";
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "firebase";
repo = "firebase-admin-go";
rev = "v4.6.1";
sha256 = "sha256:02w7vwwqw15z84icq7kf47i402yzsq3r89ay7gihvpzbmjj7g54k";
};
deps = with depot.third_party; [
gopkgs."cloud.google.com".go.firestore
gopkgs."cloud.google.com".go.storage
gopkgs."google.golang.org".api.option
gopkgs."google.golang.org".api.transport
gopkgs."google.golang.org".api.iterator
gopkgs."golang.org".x.oauth2
];
}

View file

@ -22,5 +22,6 @@ depot.third_party.buildGo.external {
types.known.timestamppb
types.known.anypb
types.known.durationpb
types.known.wrapperspb
];
}

View file

@ -8,8 +8,8 @@ depot.third_party.buildGo.external {
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "golang";
repo = "net";
rev = "0a1ea396d57c75b04aff7cb73d169717064c2b8a";
hash = "sha256:0wg1minaybg5nbqm1zjgjjr1ns8japq013j8srcybacmd90vrj2l";
rev = "6a13c67c3ce400be1b91076053a994c2d1ebf01b";
hash = "sha256:1iia85dy1i755xrrlaj273skanvpkipwb000k4irn7zg5h6d41z1";
};
deps = with depot.third_party; [
gopkgs."golang.org".x.text.secure.bidirule

View file

@ -8,10 +8,11 @@ depot.third_party.buildGo.external {
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "golang";
repo = "oauth2";
rev = "5d25da1a8d43b66f2898c444f899c7bcfd6a407e";
hash = "sha256:14jx8sxshbvnx4fggn12mlmz000zcrd1a73n1455b1wgg1xfzq3n";
rev = "d3ed0bb246c8d3c75b63937d9a5eecff9c74d7fe";
hash = "sha256:1vwkvx9kicl0sx27a41r5nfhaw3r5ni94xrvdg5mdihb0g57skwq";
};
deps = with depot.third_party; [
gopkgs."golang.org".x.net.context.ctxhttp
gopkgs."cloud.google.com".go.compute.metadata
];
}

View file

@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.external {
path = "google.golang.org/api";
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "googleapis";
repo = "google-api-go-client";
rev = "v0.60.0";
sha256 = "sha256:0drrwwq31lbpkx8kck6njfiwym86l3mdacwx2v9lsrcybqlfl745";
};
deps = with depot.third_party; [
gopkgs."golang.org".x.net.http2
gopkgs."golang.org".x.oauth2
gopkgs."golang.org".x.oauth2.google
gopkgs."golang.org".x.sys.unix
gopkgs."google.golang.org".grpc
gopkgs."google.golang.org".grpc.balancer.grpclb
gopkgs."google.golang.org".grpc.credentials
gopkgs."google.golang.org".grpc.credentials.google
gopkgs."google.golang.org".grpc.credentials.oauth
gopkgs."cloud.google.com".go.compute.metadata
gopkgs."go.opencensus.io".plugin.ocgrpc
gopkgs."go.opencensus.io".plugin.ochttp
gopkgs."go.opencensus.io".trace
gopkgs."go.opencensus.io".trace.propagation
gopkgs."github.com".googleapis.gax-go.v2
];
}

View file

@ -8,13 +8,22 @@ depot.third_party.buildGo.external {
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "googleapis";
repo = "go-genproto";
rev = "b7513248f0baeaf360936b49a5e918736b586837";
hash = "sha256:06yksav0ijinxynnc4zqc84ap1iiiv72afnld14hig9dm74ndkib";
rev = "81c1377c94b1698bef1982dc9e18f3260cfcbf1a";
hash = "sha256:0nc4a1prh884rgiynwcik32p8fgrr529lxlfcvxv5zfvfjmw8w1v";
};
deps = with depot.third_party; [
gopkgs."google.golang.org".grpc
gopkgs."google.golang.org".grpc.codes
gopkgs."google.golang.org".grpc.status
gopkgs."github.com".golang.protobuf.proto
gopkgs."google.golang.org".protobuf.reflect.protoreflect
gopkgs."google.golang.org".protobuf.runtime.protoimpl
gopkgs."google.golang.org".protobuf.types.known.anypb
gopkgs."google.golang.org".protobuf.types.known.emptypb
gopkgs."google.golang.org".protobuf.types.known.fieldmaskpb
gopkgs."google.golang.org".protobuf.types.known.structpb
gopkgs."google.golang.org".protobuf.types.known.timestamppb
gopkgs."google.golang.org".protobuf.types.known.wrapperspb
gopkgs."google.golang.org".protobuf.types.descriptorpb
];
}

View file

@ -15,9 +15,15 @@ depot.third_party.buildGo.external {
gopkgs."golang.org".x.net.http2
gopkgs."golang.org".x.net.http2.hpack
gopkgs."golang.org".x.net.trace
gopkgs."golang.org".x.oauth2
gopkgs."golang.org".x.oauth2.google
gopkgs."golang.org".x.oauth2.jwt
gopkgs."golang.org".x.sys.unix
gopkgs."github.com".golang.protobuf.proto
gopkgs."github.com".golang.protobuf.ptypes
gopkgs."github.com".golang.protobuf.ptypes.duration
gopkgs."github.com".golang.protobuf.ptypes.timestamp
gopkgs."github.com".google.go-cmp.cmp
gopkgs."google.golang.org".protobuf.compiler.protogen
gopkgs."google.golang.org".protobuf.reflect.protoreflect
gopkgs."google.golang.org".protobuf.runtime.protoimpl

View file

@ -115,27 +115,39 @@ let
};
# Build a Go library out of the specified protobuf definition.
proto = { name, proto, path ? name, goPackage ? name, extraDeps ? [] }: (makeOverridable package) {
proto = { name, proto ? null, protos ? [ proto ], path ? name, goPackage ? name, withGrpc ? false, extraSrcs ? [], extraDeps ? [] }:
let
protosDir = runCommand "protos" {} ''
mkdir $out
${lib.concatMapStrings (p: "cp ${p} $out/${baseNameOf p}\n") protos}
'';
mname = prefix: lib.concatMapStrings (p: "${prefix}M${baseNameOf p}=${path} ") protos;
in (makeOverridable package) {
inherit name path;
deps = [ protoLibs.goProto.proto.gopkg ] ++ extraDeps;
srcs = lib.singleton (runCommand "goproto-${name}.pb.go" {} ''
cp ${proto} ${baseNameOf proto}
srcs = lib.concatMap (proto: lib.singleton (runCommand "goproto-${name}-${baseNameOf proto}.pb.go" {} ''
${protobuf}/bin/protoc \
-I ${protosDir} \
--plugin=${protoLibs.goProto.cmd.protoc-gen-go.gopkg}/bin/protoc-gen-go \
--go_out=. \
--go_opt=paths=source_relative \
--go_opt=M${baseNameOf proto}=${path} \
${mname "--go_opt="} \
${protosDir}/${baseNameOf proto}
mv ./*.pb.go $out
'') ++ lib.optional withGrpc (runCommand "gogrpcproto-${name}-${baseNameOf proto}.pb.go" {} ''
${protobuf}/bin/protoc \
-I ${protosDir} \
--plugin=${protoLibs.goGrpc.cmd.protoc-gen-go-grpc.gopkg}/bin/protoc-gen-go-grpc \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
--go-grpc_opt=M${baseNameOf proto}=${path} \
${baseNameOf proto}
mv ./${goPackage}/*.pb.go $out
'');
${mname "--go-grpc_opt="} \
${protosDir}/${baseNameOf proto}
mv ./*.pb.go $out 2>/dev/null || echo "package ${goPackage}" >> $out
'')) protos ++ extraSrcs;
};
# Build a Go library out of the specified gRPC definition.
grpc = args: proto (args // { extraDeps = [ protoLibs.goGrpc.gopkg ]; });
grpc = { extraDeps ? [], ... }@args: proto (args // { withGrpc = true; extraDeps = extraDeps ++ [ protoLibs.goGrpc.gopkg ]; });
in {
# Only the high-level builder functions are exposed, but made