From e49cb8eb05ba90376e8712e36b3ef1f5f989a9c0 Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Tue, 20 Oct 2020 23:30:50 +0000 Subject: [PATCH] twitterchiver/related_fetcher: up the work-cap, but introduce an absolute cap on number of tweets actually fetched --- go/go.mod | 1 + go/twitterchiver/related_fetcher/fetcher.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/go/go.mod b/go/go.mod index 3470cf8946..50142abf42 100644 --- a/go/go.mod +++ b/go/go.mod @@ -14,6 +14,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/securecookie v1.1.1 github.com/gorilla/sessions v1.2.1 + github.com/jackc/pgtype v1.4.2 github.com/jackc/pgx/v4 v4.8.1 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 ) diff --git a/go/twitterchiver/related_fetcher/fetcher.go b/go/twitterchiver/related_fetcher/fetcher.go index 6493086f8f..afaea0a0d2 100644 --- a/go/twitterchiver/related_fetcher/fetcher.go +++ b/go/twitterchiver/related_fetcher/fetcher.go @@ -37,7 +37,8 @@ var ( mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.") relatedTickInterval = flag.Duration("related_tick_interval", 0, "Tick interval for related - takes priority over --tick_interval if non-zero.") - relatedWorkAtOnce = flag.Int("related_work_at_once", 0, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.") + relatedWorkAtOnce = flag.Int("related_work_at_once", 200, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.") + relatedMaxFetch = flag.Int("related_max_fetch", 900/15, "Cap of tweets to try to fetch at once.") ) type WorkerConfig struct { @@ -288,6 +289,12 @@ func relatedFetchTick(ctx context.Context, cfg WorkerConfig) error { } log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids) + fetchLimited := make(chan struct{}, *relatedMaxFetch) + for i := 0; i < *relatedMaxFetch; i++ { + fetchLimited <- struct{}{} + } + close(fetchLimited) + var wg sync.WaitGroup for _, t := range tweets { var tweetIDsToFetch []int64 @@ -326,6 +333,10 @@ func relatedFetchTick(ctx context.Context, cfg WorkerConfig) error { httpClient := cfg.OAuthConfig.Client(ctx, oauth1.NewToken(accessToken, accessSecret)) for _, tid := range tweetIDsToFetch { + if _, ok := <-fetchLimited; !ok { + log.Printf("[%v:%d] Out of fetch tokens", cfg.Name, t.ID) + break + } // Check if we already have tid. log.Printf("[%v:%d] Fetching %d", cfg.Name, t.ID, tid)