2021-11-18 22:24:20 +00:00
|
|
|
package darwindb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
|
|
|
|
pgx "github.com/jackc/pgx/v4"
|
2024-11-16 15:30:41 +00:00
|
|
|
"git.lukegb.com/lukegb/depot/go/trains/darwin"
|
2021-11-18 22:24:20 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// handleDataResponse handles a Darwin DataResponse message (i.e. a SnapshotResponse or a UpdateResponse).
|
|
|
|
func handleDataResponse(ctx context.Context, tx pgx.Tx, dr *darwin.DataResponse, a *Affected) error {
|
|
|
|
for _, s := range dr.Schedule {
|
|
|
|
if err := handleSchedule(ctx, tx, &s, a, scheduleModeOverwrite); err != nil {
|
|
|
|
return fmt.Errorf("handling schedule for RID %v: %w", s.RID, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, ts := range dr.TrainStatus {
|
|
|
|
if err := handleTrainStatus(ctx, tx, &ts, a); err != nil {
|
|
|
|
return fmt.Errorf("handling trainstatus for RID %v: %w", ts.RID, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, ds := range dr.Deactivated {
|
|
|
|
var tid int
|
|
|
|
row := tx.QueryRow(ctx, "UPDATE train_services SET active=FALSE WHERE rid=$1 RETURNING id", ds.RID)
|
|
|
|
if err := row.Scan(&tid); err == pgx.ErrNoRows {
|
|
|
|
log.Printf("deactivated schedule for unknown RID %v", ds.RID)
|
|
|
|
} else if err != nil {
|
|
|
|
return fmt.Errorf("handling deactivateschedule for RID %v: %w", ds.RID, err)
|
|
|
|
}
|
|
|
|
a.RID(ds.RID)
|
|
|
|
a.TSID(tid)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ProcessTimetable updates the database by processing a PportTimetable message in the given transaction.
|
|
|
|
func ProcessTimetable(ctx context.Context, tx pgx.Tx, ppt *darwin.PushPortTimetable, a *Affected) error {
|
|
|
|
for _, s := range ppt.Journey {
|
|
|
|
if err := handleSchedule(ctx, tx, &s, a, scheduleModeIgnoreExisting); err != nil {
|
|
|
|
return fmt.Errorf("handling timetable journey for RID %v: %w", s.RID, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Process updates the database by processing a single PushPort message in the given transaction.
|
|
|
|
func Process(ctx context.Context, tx pgx.Tx, pp *darwin.PushPort, a *Affected) error {
|
|
|
|
drs := make([]*darwin.DataResponse, 0, len(pp.SnapshotResp)+len(pp.UpdateResp))
|
|
|
|
for _, r := range pp.SnapshotResp {
|
|
|
|
drs = append(drs, &r.DataResponse)
|
|
|
|
}
|
|
|
|
for _, r := range pp.UpdateResp {
|
|
|
|
drs = append(drs, &r.DataResponse)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(drs) == 0 {
|
|
|
|
// Short-circuit if we don't have anything we're interested in - which is very unlikely.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, dr := range drs {
|
|
|
|
if err := handleDataResponse(ctx, tx, dr, a); err != nil {
|
|
|
|
return fmt.Errorf("handling data response: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|