From cc8f24e8e196a4eb53df46a8307c1b9e3fa36c0f Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Sun, 18 Oct 2020 20:44:50 +0100 Subject: [PATCH] twitterchiver: add relatedfetcher, to fetch referenced media and tweets --- docker-images.nix | 1 + go/twitterchiver/default.nix | 10 + go/twitterchiver/related_fetcher/fetcher.go | 477 ++++++++++++++++++++ 3 files changed, 488 insertions(+) create mode 100644 go/twitterchiver/related_fetcher/fetcher.go diff --git a/docker-images.nix b/docker-images.nix index 4483f8c69a..4a86abbb25 100644 --- a/docker-images.nix +++ b/docker-images.nix @@ -9,6 +9,7 @@ let images = { "registry.apps.k8s.lukegb.tech/twitterchiver/archiver:latest" = depot.go.twitterchiver.archiver.dockerImage; "registry.apps.k8s.lukegb.tech/twitterchiver/viewer:latest" = depot.go.twitterchiver.viewer.dockerImage; + "registry.apps.k8s.lukegb.tech/twitterchiver/relatedfetcher:latest" = depot.go.twitterchiver.relatedfetcher.dockerImage; "registry.apps.k8s.lukegb.tech/lukegb-openshiftauth-test/example:latest" = depot.go.openshiftauth.example.dockerImage; "registry.apps.k8s.lukegb.tech/depotcron/update_nixpkgs:latest" = depot.ops.maint.update_nixpkgs; }; diff --git a/go/twitterchiver/default.nix b/go/twitterchiver/default.nix index abd8b66406..fc0c82bc6c 100644 --- a/go/twitterchiver/default.nix +++ b/go/twitterchiver/default.nix @@ -30,4 +30,14 @@ '' ) ]; }; + + relatedfetcher = depot.third_party.buildGo.program { + name = "relatedfetcher"; + srcs = [ ./related_fetcher/fetcher.go ]; + deps = [ + depot.third_party.gopkgs."github.com".dghubble.oauth1 + depot.third_party.gopkgs."github.com".jackc.pgtype + depot.third_party.gopkgs."github.com".jackc.pgx.v4.pgxpool + ]; + }; } diff --git a/go/twitterchiver/related_fetcher/fetcher.go b/go/twitterchiver/related_fetcher/fetcher.go new file mode 100644 index 0000000000..1188159dd5 --- /dev/null +++ b/go/twitterchiver/related_fetcher/fetcher.go @@ -0,0 +1,477 @@ +// SPDX-FileCopyrightText: 2020 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "os/signal" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/dghubble/oauth1" + "github.com/jackc/pgtype" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" +) + +var ( + databaseURL = flag.String("database_url", "", "Database URL") + tickInterval = flag.Duration("tick_interval", 1*time.Minute, "Tick interval") + workerCount = flag.Int("workers", 20, "Workers to spawn for fetching info from Twitter API") + workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).") + + mediaFetchDestination = flag.String("media_fetch_destination", "/tmp", "Path to write fetched media to.") +) + +type WorkerConfig struct { + Name string + + TickInterval time.Duration + OAuthConfig *oauth1.Config + WorkAtOnce int + DatabasePool *pgxpool.Pool + + WorkQueue chan func(context.Context) +} + +type worker func(context.Context, WorkerConfig) error + +type tweetVideoVariant struct { + Bitrate int `json:"bitrate"` + ContentType string `json:"content_type"` + URL string `json:"url"` +} + +type tweetVideoInfo struct { + Variants []tweetVideoVariant `json:"variants"` +} + +type tweetExtendedMedia struct { + ID int64 `json:"id"` + Type string `json:"type"` + MediaURLHTTPS string `json:"media_url_https"` + VideoInfo *tweetVideoInfo `json:"video_info"` +} + +type tweetExtendedEntities struct { + Media []tweetExtendedMedia `json:"media"` +} + +type tweetData struct { + ID int64 `json:"id"` + Text string `json:"full_text"` + QuotedStatus *tweetData `json:"quoted_status"` + RetweetedStatus *tweetData `json:"retweeted_status"` + InReplyToStatusID *int64 `json:"in_reply_to_status_id"` + ExtendedEntities tweetExtendedEntities `json:"extended_entities"` + + Data json.RawMessage `json:"-"` +} + +func mediaFetchOutputPath(u string) (string, error) { + p, err := url.Parse(u) + if err != nil { + return "", err + } + path := strings.TrimLeft(p.Path, "/") + + return filepath.Join(*mediaFetchDestination, p.Host, path), nil +} + +type databaseQuerier interface { + Query(context.Context, string, ...interface{}) (pgx.Rows, error) +} + +func fetchTweets(ctx context.Context, conn databaseQuerier, query string, args ...interface{}) ([]tweetData, error) { + rows, err := conn.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("querying: %w", err) + } + + var tweets []tweetData + for rows.Next() { + var tweetJSON pgtype.JSONB + if err := rows.Scan(&tweetJSON); err != nil { + return nil, fmt.Errorf("scanning: %w", err) + } + + var tweet tweetData + if err := json.Unmarshal(tweetJSON.Bytes, &tweet); err != nil { + return nil, fmt.Errorf("unmarshaling: %w", err) + } + tweets = append(tweets, tweet) + } + if rows.Err() != nil { + return nil, fmt.Errorf("retrieving: %w", err) + } + return tweets, nil +} + +func mediaFetchTick(ctx context.Context, cfg WorkerConfig) error { + tweets, err := fetchTweets(ctx, cfg.DatabasePool, "SELECT object FROM tweets WHERE NOT fetched_media LIMIT $1", cfg.WorkAtOnce) + if err != nil { + return fmt.Errorf("fetching unfetched media tweet IDs: %w", err) + } + var ids []int64 + for _, t := range tweets { + ids = append(ids, t.ID) + } + log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids) + + var wg sync.WaitGroup + wg.Add(len(tweets)) + for _, t := range tweets { + t := t + cfg.WorkQueue <- func(ctx context.Context) { + defer wg.Done() + + // Abbreviated message for tweets with no media, which is the majority of them. + if len(t.ExtendedEntities.Media) == 0 { + log.Printf("[%v:%d] No media", cfg.Name, t.ID) + return + } + + log.Printf("[%v:%d] Starting work", cfg.Name, t.ID) + defer log.Printf("[%v:%d] Done with work", cfg.Name, t.ID) + + var mediaURLs []string + for _, m := range t.ExtendedEntities.Media { + if m.MediaURLHTTPS != "" { + mediaURLs = append(mediaURLs, m.MediaURLHTTPS) + } + if m.VideoInfo != nil { + bestBitrate := -1 + var bestURL string + for _, v := range m.VideoInfo.Variants { + if v.ContentType == "application/x-mpegURL" { + // m3u8 isn't very interesting since we'd have to parse it to get a playlist. + continue + } + if v.Bitrate > bestBitrate { + bestBitrate = v.Bitrate + bestURL = v.URL + } + } + if bestURL != "" { + mediaURLs = append(mediaURLs, bestURL) + } + } + } + + log.Printf("[%v:%d] %d media URLs to fetch: %v", cfg.Name, t.ID, len(mediaURLs), mediaURLs) + for _, u := range mediaURLs { + out, err := mediaFetchOutputPath(u) + if err != nil { + log.Printf("[%v:%d:%v] fetching output path: %v", cfg.Name, t.ID, u, err) + return + } + + // Fetch u -> out + if fi, err := os.Stat(out); err == nil { + // File already exists, does it have non-zero size? + if fi.Size() > 0 { + // It does! + log.Printf("[%v:%d:%v] %v already exists; skipping", cfg.Name, t.ID, u, out) + continue + } + log.Printf("[%v:%d:%v] %v already exists but has zero size; refetching", cfg.Name, t.ID, u, out) + } else if !os.IsNotExist(err) { + log.Printf("[%v:%d:%v] checking output path %v: %v", cfg.Name, t.ID, u, out, err) + return + } + + if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil { + log.Printf("[%v:%d:%v] creating output path %v: %v", cfg.Name, t.ID, u, out, err) + return + } + + if err := func() error { + resp, err := http.Get(u) + if err != nil { + return fmt.Errorf("http.Get: %w", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + break // This is what we expect. + case http.StatusNotFound: + log.Printf("[%v:%d:%v] got a 404; marking as successful since this has probably been deleted.", cfg.Name, t.ID, u) + return nil + default: + return fmt.Errorf("HTTP status %d - %v", resp.StatusCode, resp.Status) + } + + f, err := os.Create(out) + if err != nil { + return err + } + defer f.Close() + + written, err := io.Copy(f, resp.Body) + if err != nil { + // Try to delete the file we created to force a refetch next time. + _ = f.Truncate(0) // Ignore the error, though. + return fmt.Errorf("IO error: %w", err) + } + log.Printf("[%v:%d:%v] downloaded %d bytes.", cfg.Name, t.ID, u, written) + + return nil + }(); err != nil { + log.Printf("[%v:%d:%v] %v", cfg.Name, t.ID, u, err) + return + } + } + log.Printf("[%v:%d] Done downloading media, marking as complete.", cfg.Name, t.ID) + + if _, err := cfg.DatabasePool.Exec(ctx, "UPDATE tweets SET fetched_media=true WHERE id=$1", t.ID); err != nil { + log.Printf("[%v:%d] Failed to update tweets table: %v", cfg.Name, t.ID, err) + } + } + } + wg.Wait() + log.Printf("[%v] Done with work.", cfg.Name) + + return nil +} + +func relatedFetchTick(ctx context.Context, cfg WorkerConfig) error { + tweets, err := fetchTweets(ctx, cfg.DatabasePool, "SELECT object FROM tweets WHERE NOT fetched_related_tweets LIMIT $1", cfg.WorkAtOnce) + if err != nil { + return fmt.Errorf("fetching unfetched related tweet IDs: %w", err) + } + var ids []int64 + for _, t := range tweets { + ids = append(ids, t.ID) + } + log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids) + + var wg sync.WaitGroup + for _, t := range tweets { + var tweetIDsToFetch []int64 + if t.QuotedStatus != nil { + tweetIDsToFetch = append(tweetIDsToFetch, t.QuotedStatus.ID) + } + if t.RetweetedStatus != nil { + tweetIDsToFetch = append(tweetIDsToFetch, t.RetweetedStatus.ID) + } + if t.InReplyToStatusID != nil { + tweetIDsToFetch = append(tweetIDsToFetch, *t.InReplyToStatusID) + } + + log.Printf("[%v:%d] Got %d tweets to fetch (%v)", cfg.Name, t.ID, len(tweetIDsToFetch), tweetIDsToFetch) + if len(tweetIDsToFetch) == 0 { + continue + } + + wg.Add(1) + t := t + cfg.WorkQueue <- func(ctx context.Context) { + defer wg.Done() + + // Fetch user credentials for this. + // TODO(lukegb): maybe this needs to be smarter (e.g. for protected accounts?), but for now we just pick someone who could see the original tweet. + var accessToken, accessSecret string + if err := cfg.DatabasePool.QueryRow(ctx, "SELECT ua.access_token, ua.access_secret FROM user_accounts ua INNER JOIN user_accounts_tweets uat ON uat.userid=ua.userid WHERE uat.tweetid=$1 LIMIT 1", t.ID).Scan(&accessToken, &accessSecret); err != nil { + log.Printf("[%v:%d] Retrieving OAuth user credentials: %v", cfg.Name, t.ID, err) + return + } + httpClient := cfg.OAuthConfig.Client(ctx, oauth1.NewToken(accessToken, accessSecret)) + + for _, tid := range tweetIDsToFetch { + // Check if we already have tid. + log.Printf("[%v:%d] Fetching %d", cfg.Name, t.ID, tid) + + var cnt int + if err := cfg.DatabasePool.QueryRow(ctx, "SELECT COUNT(1) FROM tweets WHERE id=$1", tid).Scan(&cnt); err != nil { + log.Printf("[%v:%d:%d] Existence check failed: %v", cfg.Name, t.ID, tid, err) + return + } + if cnt > 0 { + log.Printf("[%v:%d] Already have %d", cfg.Name, t.ID, tid) + continue + } + + // We don't have it already; let's fetch it from ze Twitterz. + req, err := http.NewRequest("GET", fmt.Sprintf("https://api.twitter.com/1.1/statuses/show.json?id=%d&include_entities=true&include_ext_alt_text=true", tid), nil) + if err != nil { + log.Printf("[%v:%d:%d] Constructing GET request: %v", cfg.Name, t.ID, tid, err) + return + } + resp, err := httpClient.Do(req.WithContext(ctx)) + if err != nil { + log.Printf("[%v:%d:%d] Executing GET request: %v", cfg.Name, t.ID, tid, err) + return + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusOK: + // Expected. + break + case http.StatusNotFound: + // Tweet has probably been deleted, or is otherwise ACLed. + log.Printf("[%v:%d:%d] Not found response; assuming that it's been deleted and treating as a success.", cfg.Name, t.ID, tid) + continue + default: + log.Printf("[%v:%d:%d] GET statuses/show.json returned %d - %v", cfg.Name, t.ID, tid, resp.StatusCode, resp.Status) + return + } + + tweetBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("[%v:%d:%d] ReadAll: %v", cfg.Name, t.ID, tid, err) + return + } + var subtweet tweetData + if err := json.Unmarshal(tweetBytes, &subtweet); err != nil { + log.Printf("[%v:%d:%d] Decoding tweet as JSON: %v", cfg.Name, t.ID, tid, err) + return + } + + // Insert it into the database. + if _, err := cfg.DatabasePool.Exec(ctx, "INSERT INTO tweets (id, text, object) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", subtweet.ID, subtweet.Text, tweetBytes); err != nil { + log.Printf("[%v:%d:%d] Inserting tweet into database: %v", cfg.Name, t.ID, tid, err) + return + } + + // Insert the ACLs. + if _, err := cfg.DatabasePool.Exec(ctx, "INSERT INTO user_accounts_tweets SELECT userid, $1 tweetid, false on_timeline FROM user_accounts_tweets WHERE tweetid=$2 ON CONFLICT DO NOTHING", tid, t.ID); err != nil { + log.Printf("[%v:%d:%d] Inserting ACLs into table failed: %v", cfg.Name, t.ID, tid, err) + return + } + } + + // We have fetched all the subtweets, mark the tweet as complete. + if _, err := cfg.DatabasePool.Exec(ctx, "UPDATE tweets SET fetched_related_tweets=true WHERE id=$1", t.ID); err != nil { + log.Printf("[%v:%d] Updating tweet to mark as completed: %v", cfg.Name, t.ID, err) + } else { + log.Printf("[%v:%d] Successfully processed.", cfg.Name, t.ID) + } + } + } + wg.Wait() + log.Printf("[%v] Done with work.", cfg.Name) + + return nil +} + +func tickToWorker(w worker) worker { + return func(ctx context.Context, cfg WorkerConfig) error { + log.Printf("[%v] Performing initial tick.", cfg.Name) + if err := w(ctx, cfg); err != nil { + log.Printf("[%v] Initial tick failed: %v", cfg.Name, err) + } + t := time.NewTicker(cfg.TickInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + log.Printf("[%v] Tick...", cfg.Name) + if err := w(ctx, cfg); err != nil { + log.Printf("[%v] Tick failed: %v", cfg.Name, err) + } + } + } + } +} + +var ( + mediaFetchWorker = tickToWorker(mediaFetchTick) + relatedFetchWorker = tickToWorker(relatedFetchTick) +) + +func main() { + flag.Parse() + + ctx := context.Background() + + conn, err := pgxpool.Connect(ctx, *databaseURL) + if err != nil { + fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err) + os.Exit(1) + } + defer conn.Close() + + ckey, csecret := os.Getenv("TWITTER_OAUTH_CONSUMER_KEY"), os.Getenv("TWITTER_OAUTH_CONSUMER_SECRET") + if ckey == "" || csecret == "" { + fmt.Fprintf(os.Stderr, "No TWITTER_OAUTH_CONSUMER_KEY or TWITTER_OAUTH_CONSUMER_SECRET\n") + os.Exit(1) + } + twitterOAuthConfig := oauth1.NewConfig(ckey, csecret) + + stop := func() <-chan struct{} { + st := make(chan os.Signal) + signal.Notify(st, os.Interrupt) + stopCh := make(chan struct{}) + go func() { + <-st + log.Printf("Shutting down") + close(stopCh) + signal.Reset(os.Interrupt) + }() + return stopCh + }() + cctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + + workQueue := make(chan func(context.Context)) + wg.Add(*workerCount) + for n := 0; n < *workerCount; n++ { + go func() { + ctx := cctx + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case f := <-workQueue: + f(ctx) + } + } + }() + } + + workers := map[string]worker{ + "mediaFetchWorker": mediaFetchWorker, + "relatedFetchWorker": relatedFetchWorker, + } + wg.Add(len(workers)) + cfg := WorkerConfig{ + TickInterval: *tickInterval, + OAuthConfig: twitterOAuthConfig, + WorkAtOnce: *workAtOnce, + + DatabasePool: conn, + WorkQueue: workQueue, + } + for wn, w := range workers { + wn, w, cfg := wn, w, cfg + cfg.Name = wn + go func() { + defer wg.Done() + log.Printf("Starting worker %v.", wn) + if err := w(cctx, cfg); err != nil { + log.Printf("[%v] %v", wn, err) + } + }() + } + + <-stop + log.Printf("Got stop signal. Shutting down...") + cancel() + wg.Wait() +}