twitterchiver/related_fetcher: up the work-cap, but introduce an absolute cap on number of tweets actually fetched
This commit is contained in:
parent
c6071fc9d3
commit
e49cb8eb05
2 changed files with 13 additions and 1 deletions
|
@ -14,6 +14,7 @@ require (
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/gorilla/securecookie v1.1.1
|
github.com/gorilla/securecookie v1.1.1
|
||||||
github.com/gorilla/sessions v1.2.1
|
github.com/gorilla/sessions v1.2.1
|
||||||
|
github.com/jackc/pgtype v1.4.2
|
||||||
github.com/jackc/pgx/v4 v4.8.1
|
github.com/jackc/pgx/v4 v4.8.1
|
||||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||||
)
|
)
|
||||||
|
|
|
@ -37,7 +37,8 @@ var (
|
||||||
mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.")
|
mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.")
|
||||||
|
|
||||||
relatedTickInterval = flag.Duration("related_tick_interval", 0, "Tick interval for related - takes priority over --tick_interval if non-zero.")
|
relatedTickInterval = flag.Duration("related_tick_interval", 0, "Tick interval for related - takes priority over --tick_interval if non-zero.")
|
||||||
relatedWorkAtOnce = flag.Int("related_work_at_once", 0, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.")
|
relatedWorkAtOnce = flag.Int("related_work_at_once", 200, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.")
|
||||||
|
relatedMaxFetch = flag.Int("related_max_fetch", 900/15, "Cap of tweets to try to fetch at once.")
|
||||||
)
|
)
|
||||||
|
|
||||||
type WorkerConfig struct {
|
type WorkerConfig struct {
|
||||||
|
@ -288,6 +289,12 @@ func relatedFetchTick(ctx context.Context, cfg WorkerConfig) error {
|
||||||
}
|
}
|
||||||
log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids)
|
log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids)
|
||||||
|
|
||||||
|
fetchLimited := make(chan struct{}, *relatedMaxFetch)
|
||||||
|
for i := 0; i < *relatedMaxFetch; i++ {
|
||||||
|
fetchLimited <- struct{}{}
|
||||||
|
}
|
||||||
|
close(fetchLimited)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, t := range tweets {
|
for _, t := range tweets {
|
||||||
var tweetIDsToFetch []int64
|
var tweetIDsToFetch []int64
|
||||||
|
@ -326,6 +333,10 @@ func relatedFetchTick(ctx context.Context, cfg WorkerConfig) error {
|
||||||
httpClient := cfg.OAuthConfig.Client(ctx, oauth1.NewToken(accessToken, accessSecret))
|
httpClient := cfg.OAuthConfig.Client(ctx, oauth1.NewToken(accessToken, accessSecret))
|
||||||
|
|
||||||
for _, tid := range tweetIDsToFetch {
|
for _, tid := range tweetIDsToFetch {
|
||||||
|
if _, ok := <-fetchLimited; !ok {
|
||||||
|
log.Printf("[%v:%d] Out of fetch tokens", cfg.Name, t.ID)
|
||||||
|
break
|
||||||
|
}
|
||||||
// Check if we already have tid.
|
// Check if we already have tid.
|
||||||
log.Printf("[%v:%d] Fetching %d", cfg.Name, t.ID, tid)
|
log.Printf("[%v:%d] Fetching %d", cfg.Name, t.ID, tid)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue