twitterchiver/related_fetcher: add separate flags for tick interval/work for each worker

This commit is contained in:
Luke Granger-Brown 2020-10-18 21:23:26 +01:00
parent 297fc79db0
commit e6180a6850

View file

@ -33,6 +33,11 @@ var (
workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).") workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).")
mediaFetchDestination = flag.String("media_fetch_destination", "/tmp", "Path to write fetched media to.") mediaFetchDestination = flag.String("media_fetch_destination", "/tmp", "Path to write fetched media to.")
mediaTickInterval = flag.Duration("media_tick_interval", 0, "Tick interval for media - takes priority over --tick_interval if non-zero.")
mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.")
relatedTickInterval = flag.Duration("related_tick_interval", 0, "Tick interval for related - takes priority over --tick_interval if non-zero.")
relatedWorkAtOnce = flag.Int("related_work_at_once", 0, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.")
) )
type WorkerConfig struct { type WorkerConfig struct {
@ -46,6 +51,28 @@ type WorkerConfig struct {
WorkQueue chan func(context.Context) WorkQueue chan func(context.Context)
} }
func (wc WorkerConfig) Merge(other WorkerConfig) WorkerConfig {
if other.Name != "" {
wc.Name = other.Name
}
if other.TickInterval > 0 {
wc.TickInterval = other.TickInterval
}
if other.OAuthConfig != nil {
wc.OAuthConfig = other.OAuthConfig
}
if other.WorkAtOnce != 0 {
wc.WorkAtOnce = other.WorkAtOnce
}
if other.DatabasePool != nil {
wc.DatabasePool = other.DatabasePool
}
if other.WorkQueue != nil {
wc.WorkQueue = other.WorkQueue
}
return wc
}
type worker func(context.Context, WorkerConfig) error type worker func(context.Context, WorkerConfig) error
type tweetVideoVariant struct { type tweetVideoVariant struct {
@ -454,8 +481,8 @@ func main() {
} }
workers := map[string]worker{ workers := map[string]worker{
"mediaFetchWorker": mediaFetchWorker, "media": mediaFetchWorker,
"relatedFetchWorker": relatedFetchWorker, "related": relatedFetchWorker,
} }
wg.Add(len(workers)) wg.Add(len(workers))
cfg := WorkerConfig{ cfg := WorkerConfig{
@ -466,13 +493,28 @@ func main() {
DatabasePool: conn, DatabasePool: conn,
WorkQueue: workQueue, WorkQueue: workQueue,
} }
configs := map[string]WorkerConfig{
"media": cfg.Merge(WorkerConfig{
Name: "media",
TickInterval: *mediaTickInterval,
WorkAtOnce: *mediaWorkAtOnce,
}),
"related": cfg.Merge(WorkerConfig{
Name: "related",
TickInterval: *relatedTickInterval,
WorkAtOnce: *relatedWorkAtOnce,
}),
}
for wn, w := range workers { for wn, w := range workers {
wn, w, cfg := wn, w, cfg wn, w := wn, w
cfg.Name = wn wcfg, ok := configs[wn]
if !ok {
wcfg = cfg
}
go func() { go func() {
defer wg.Done() defer wg.Done()
log.Printf("Starting worker %v.", wn) log.Printf("Starting worker %v.", wn)
if err := w(cctx, cfg); err != nil { if err := w(cctx, wcfg); err != nil {
log.Printf("[%v] %v", wn, err) log.Printf("[%v] %v", wn, err)
} }
}() }()