depot/go/trains/cmd/db2fcm/db2fcm.go

118 lines
2.5 KiB
Go
Raw Normal View History

package main
import (
"context"
"flag"
"fmt"
"log"
"time"
firebase "firebase.google.com/go/v4"
"firebase.google.com/go/v4/messaging"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
)
var (
projectID = flag.String("project_id", "lukegb-trains", "Firebase project ID")
batchInterval = flag.Duration("batch_interval", 1*time.Second, "interval at which to batch messages")
batchMax = flag.Int("batch_max", 300, "Maximum messages before we will instantly send the batch")
)
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pc, err := pgx.Connect(ctx, ("host=/var/run/postgresql database=trains search_path=darwindb"))
if err != nil {
log.Fatalf("pgx.Connect: %v", err)
}
defer pc.Close(context.Background())
fapp, err := firebase.NewApp(ctx, &firebase.Config{
ProjectID: *projectID,
})
if err != nil {
log.Fatalf("firebase.NewApp: %v", err)
}
fcm, err := fapp.Messaging(ctx)
if err != nil {
log.Fatalf("fapp.Messaging: %v", err)
}
if _, err := pc.Exec(ctx, "LISTEN firehose"); err != nil {
log.Fatalf("subscribing to firehose: %v", err)
}
notificationCh := make(chan *pgconn.Notification)
go func() {
defer close(notificationCh)
for {
n, err := pc.WaitForNotification(ctx)
if err != nil {
log.Fatalf("WaitForNotification: %v", err)
}
notificationCh <- n
}
}()
t := time.NewTicker(*batchInterval)
batch := map[string]bool{}
sendBatchNow := func(ctx context.Context) error {
if len(batch) == 0 {
return nil
}
msgs := make([]*messaging.Message, 0, len(batch))
for b := range batch {
msgs = append(msgs, &messaging.Message{
Topic: b,
})
}
br, err := fcm.SendAll(ctx, msgs)
if err != nil {
return fmt.Errorf("SendAll: %q", err)
}
log.Printf("sent batch: %d success, %d failure", br.SuccessCount, br.FailureCount)
if br.FailureCount != 0 {
for _, sr := range br.Responses {
if sr.Success {
continue
}
log.Printf("message %s: %v", sr.MessageID, sr.Error)
}
}
batch = map[string]bool{}
return nil
}
maybeSendBatch := func(ctx context.Context) error {
if len(batch) < *batchMax {
return nil
}
return sendBatchNow(ctx)
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-t.C:
// send batch
if err := sendBatchNow(ctx); err != nil {
log.Printf("sending batch on ticker: %v", err)
}
case n := <-notificationCh:
batch[n.Payload] = true
if err := maybeSendBatch(ctx); err != nil {
log.Printf("sending batch because batch is full: %v", err)
}
}
}
}