diff --git a/go/twitterchiver/related_fetcher/fetcher.go b/go/twitterchiver/related_fetcher/fetcher.go index 5fc8a9ee25..e867e7de31 100644 --- a/go/twitterchiver/related_fetcher/fetcher.go +++ b/go/twitterchiver/related_fetcher/fetcher.go @@ -33,6 +33,11 @@ var ( workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).") mediaFetchDestination = flag.String("media_fetch_destination", "/tmp", "Path to write fetched media to.") + mediaTickInterval = flag.Duration("media_tick_interval", 0, "Tick interval for media - takes priority over --tick_interval if non-zero.") + mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.") + + relatedTickInterval = flag.Duration("related_tick_interval", 0, "Tick interval for related - takes priority over --tick_interval if non-zero.") + relatedWorkAtOnce = flag.Int("related_work_at_once", 0, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.") ) type WorkerConfig struct { @@ -46,6 +51,28 @@ type WorkerConfig struct { WorkQueue chan func(context.Context) } +func (wc WorkerConfig) Merge(other WorkerConfig) WorkerConfig { + if other.Name != "" { + wc.Name = other.Name + } + if other.TickInterval > 0 { + wc.TickInterval = other.TickInterval + } + if other.OAuthConfig != nil { + wc.OAuthConfig = other.OAuthConfig + } + if other.WorkAtOnce != 0 { + wc.WorkAtOnce = other.WorkAtOnce + } + if other.DatabasePool != nil { + wc.DatabasePool = other.DatabasePool + } + if other.WorkQueue != nil { + wc.WorkQueue = other.WorkQueue + } + return wc +} + type worker func(context.Context, WorkerConfig) error type tweetVideoVariant struct { @@ -454,8 +481,8 @@ func main() { } workers := map[string]worker{ - "mediaFetchWorker": mediaFetchWorker, - "relatedFetchWorker": relatedFetchWorker, + "media": mediaFetchWorker, + "related": relatedFetchWorker, } wg.Add(len(workers)) cfg := WorkerConfig{ @@ -466,13 +493,28 @@ func main() { DatabasePool: conn, WorkQueue: workQueue, } + configs := map[string]WorkerConfig{ + "media": cfg.Merge(WorkerConfig{ + Name: "media", + TickInterval: *mediaTickInterval, + WorkAtOnce: *mediaWorkAtOnce, + }), + "related": cfg.Merge(WorkerConfig{ + Name: "related", + TickInterval: *relatedTickInterval, + WorkAtOnce: *relatedWorkAtOnce, + }), + } for wn, w := range workers { - wn, w, cfg := wn, w, cfg - cfg.Name = wn + wn, w := wn, w + wcfg, ok := configs[wn] + if !ok { + wcfg = cfg + } go func() { defer wg.Done() log.Printf("Starting worker %v.", wn) - if err := w(cctx, cfg); err != nil { + if err := w(cctx, wcfg); err != nil { log.Printf("[%v] %v", wn, err) } }()