depot/go/twitterchiver/related_fetcher/fetcher.go

552 lines
16 KiB
Go
Raw Normal View History

// SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"time"
"github.com/dghubble/oauth1"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"gocloud.dev/blob"
"gocloud.dev/gcerrors"
_ "gocloud.dev/blob/s3blob"
)
var (
databaseURL = flag.String("database_url", "", "Database URL")
tickInterval = flag.Duration("tick_interval", 1*time.Minute, "Tick interval")
workerCount = flag.Int("workers", 20, "Workers to spawn for fetching info from Twitter API")
workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).")
mediaFetchDestination = flag.String("media_fetch_destination", "s3://public-lukegb-twitterchiver?endpoint=objdump.zxcvbnm.ninja&region=london", "gocloud.dev URL to write fetched media to.")
mediaTickInterval = flag.Duration("media_tick_interval", 0, "Tick interval for media - takes priority over --tick_interval if non-zero.")
mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.")
relatedTickInterval = flag.Duration("related_tick_interval", 0, "Tick interval for related - takes priority over --tick_interval if non-zero.")
relatedWorkAtOnce = flag.Int("related_work_at_once", 200, "Work to enqueue at once for related - takes priority over --work_at_once if non-zero.")
relatedMaxFetch = flag.Int("related_max_fetch", 900/15, "Cap of tweets to try to fetch at once.")
)
type WorkerConfig struct {
Name string
TickInterval time.Duration
OAuthConfig *oauth1.Config
WorkAtOnce int
DatabasePool *pgxpool.Pool
Bucket *blob.Bucket
WorkQueue chan func(context.Context)
}
func (wc WorkerConfig) Merge(other WorkerConfig) WorkerConfig {
if other.Name != "" {
wc.Name = other.Name
}
if other.TickInterval > 0 {
wc.TickInterval = other.TickInterval
}
if other.OAuthConfig != nil {
wc.OAuthConfig = other.OAuthConfig
}
if other.WorkAtOnce != 0 {
wc.WorkAtOnce = other.WorkAtOnce
}
if other.DatabasePool != nil {
wc.DatabasePool = other.DatabasePool
}
if other.WorkQueue != nil {
wc.WorkQueue = other.WorkQueue
}
return wc
}
type worker func(context.Context, WorkerConfig) error
type tweetVideoVariant struct {
Bitrate int `json:"bitrate"`
ContentType string `json:"content_type"`
URL string `json:"url"`
}
type tweetVideoInfo struct {
Variants []tweetVideoVariant `json:"variants"`
}
type tweetExtendedMedia struct {
ID int64 `json:"id"`
Type string `json:"type"`
MediaURLHTTPS string `json:"media_url_https"`
VideoInfo *tweetVideoInfo `json:"video_info"`
}
type tweetExtendedEntities struct {
Media []tweetExtendedMedia `json:"media"`
}
type tweetData struct {
ID int64 `json:"id"`
Text string `json:"full_text"`
QuotedStatus *tweetData `json:"quoted_status"`
QuotedStatusID *int64 `json:"quoted_status_id"`
RetweetedStatus *tweetData `json:"retweeted_status"`
InReplyToStatusID *int64 `json:"in_reply_to_status_id"`
ExtendedEntities tweetExtendedEntities `json:"extended_entities"`
Data json.RawMessage `json:"-"`
}
func mediaFetchOutputPath(u string) (string, error) {
p, err := url.Parse(u)
if err != nil {
return "", err
}
path := strings.TrimLeft(p.Path, "/")
return filepath.Join(p.Host, path), nil
}
type databaseQuerier interface {
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
}
func fetchTweets(ctx context.Context, conn databaseQuerier, query string, args ...interface{}) ([]tweetData, error) {
rows, err := conn.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("querying: %w", err)
}
var tweets []tweetData
for rows.Next() {
var tweetJSON pgtype.JSONB
if err := rows.Scan(&tweetJSON); err != nil {
return nil, fmt.Errorf("scanning: %w", err)
}
var tweet tweetData
if err := json.Unmarshal(tweetJSON.Bytes, &tweet); err != nil {
return nil, fmt.Errorf("unmarshaling: %w", err)
}
tweets = append(tweets, tweet)
}
if rows.Err() != nil {
return nil, fmt.Errorf("retrieving: %w", err)
}
return tweets, nil
}
func mediaFetchTick(ctx context.Context, cfg WorkerConfig) error {
tweets, err := fetchTweets(ctx, cfg.DatabasePool, "SELECT object FROM tweets WHERE NOT fetched_media LIMIT $1", cfg.WorkAtOnce)
if err != nil {
return fmt.Errorf("fetching unfetched media tweet IDs: %w", err)
}
var ids []int64
for _, t := range tweets {
ids = append(ids, t.ID)
}
log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids)
var wg sync.WaitGroup
wg.Add(len(tweets))
for _, t := range tweets {
t := t
cfg.WorkQueue <- func(ctx context.Context) {
defer wg.Done()
// Abbreviated message for tweets with no media, which is the majority of them.
if len(t.ExtendedEntities.Media) == 0 {
if _, err := cfg.DatabasePool.Exec(ctx, "UPDATE tweets SET fetched_media=true WHERE id=$1", t.ID); err != nil {
log.Printf("[%v:%d] Failed to update tweets table: %v", cfg.Name, t.ID, err)
}
log.Printf("[%v:%d] No media", cfg.Name, t.ID)
return
}
log.Printf("[%v:%d] Starting work", cfg.Name, t.ID)
defer log.Printf("[%v:%d] Done with work", cfg.Name, t.ID)
var mediaURLs []string
for _, m := range t.ExtendedEntities.Media {
if m.MediaURLHTTPS != "" {
mediaURLs = append(mediaURLs, m.MediaURLHTTPS)
}
if m.VideoInfo != nil {
bestBitrate := -1
var bestURL string
for _, v := range m.VideoInfo.Variants {
if v.ContentType == "application/x-mpegURL" {
// m3u8 isn't very interesting since we'd have to parse it to get a playlist.
continue
}
if v.Bitrate > bestBitrate {
bestBitrate = v.Bitrate
bestURL = v.URL
}
}
if bestURL != "" {
mediaURLs = append(mediaURLs, bestURL)
}
}
}
log.Printf("[%v:%d] %d media URLs to fetch: %v", cfg.Name, t.ID, len(mediaURLs), mediaURLs)
for _, u := range mediaURLs {
out, err := mediaFetchOutputPath(u)
if err != nil {
log.Printf("[%v:%d:%v] fetching output path: %v", cfg.Name, t.ID, u, err)
return
}
// Fetch u -> out
if fi, err := cfg.Bucket.Attributes(ctx, out); err == nil {
// File already exists, does it have non-zero size?
if fi.Size > 0 {
// It does!
log.Printf("[%v:%d:%v] %v already exists; skipping", cfg.Name, t.ID, u, out)
continue
}
log.Printf("[%v:%d:%v] %v already exists but has zero size; refetching", cfg.Name, t.ID, u, out)
} else if gcerrors.Code(err) != gcerrors.NotFound {
log.Printf("[%v:%d:%v] checking output path %v: %v", cfg.Name, t.ID, u, out, err)
return
}
if err := func() error {
resp, err := http.Get(u)
if err != nil {
return fmt.Errorf("http.Get: %w", err)
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
break // This is what we expect.
case http.StatusNotFound, http.StatusForbidden:
log.Printf("[%v:%d:%v] got a %d; marking as successful since this has probably been deleted.", cfg.Name, t.ID, u, resp.StatusCode)
return nil
default:
return fmt.Errorf("HTTP status %d - %v", resp.StatusCode, resp.Status)
}
f, err := cfg.Bucket.NewWriter(ctx, out, nil)
if err != nil {
return err
}
written, err := io.Copy(f, resp.Body)
if err != nil {
// Try to delete the file we created to force a refetch next time.
_ = f.Close()
_ = cfg.Bucket.Delete(ctx, out)
return fmt.Errorf("IO error: %w", err)
}
log.Printf("[%v:%d:%v] downloaded %d bytes.", cfg.Name, t.ID, u, written)
return f.Close()
}(); err != nil {
log.Printf("[%v:%d:%v] %v", cfg.Name, t.ID, u, err)
return
}
}
log.Printf("[%v:%d] Done downloading media, marking as complete.", cfg.Name, t.ID)
if _, err := cfg.DatabasePool.Exec(ctx, "UPDATE tweets SET fetched_media=true WHERE id=$1", t.ID); err != nil {
log.Printf("[%v:%d] Failed to update tweets table: %v", cfg.Name, t.ID, err)
}
}
}
wg.Wait()
log.Printf("[%v] Done with work.", cfg.Name)
return nil
}
func relatedFetchTick(ctx context.Context, cfg WorkerConfig) error {
tweets, err := fetchTweets(ctx, cfg.DatabasePool, "SELECT object FROM tweets WHERE NOT fetched_related_tweets LIMIT $1", cfg.WorkAtOnce)
if err != nil {
return fmt.Errorf("fetching unfetched related tweet IDs: %w", err)
}
var ids []int64
for _, t := range tweets {
ids = append(ids, t.ID)
}
log.Printf("[%v] Got %d tweets (%v)", cfg.Name, len(tweets), ids)
fetchLimited := make(chan struct{}, *relatedMaxFetch)
for i := 0; i < *relatedMaxFetch; i++ {
fetchLimited <- struct{}{}
}
close(fetchLimited)
var wg sync.WaitGroup
for _, t := range tweets {
tweetIDsToFetch := make(map[int64]bool)
if t.QuotedStatus != nil {
tweetIDsToFetch[t.QuotedStatus.ID] = true
}
if t.QuotedStatusID != nil {
tweetIDsToFetch[*t.QuotedStatusID] = true
}
if t.RetweetedStatus != nil {
tweetIDsToFetch[t.RetweetedStatus.ID] = true
}
if t.InReplyToStatusID != nil {
tweetIDsToFetch[*t.InReplyToStatusID] = true
}
log.Printf("[%v:%d] Got %d tweets to fetch (%v)", cfg.Name, t.ID, len(tweetIDsToFetch), tweetIDsToFetch)
if len(tweetIDsToFetch) == 0 {
if _, err := cfg.DatabasePool.Exec(ctx, "UPDATE tweets SET fetched_related_tweets=true WHERE id=$1", t.ID); err != nil {
log.Printf("[%v:%d] Updating tweet to mark as completed: %v", cfg.Name, t.ID, err)
} else {
log.Printf("[%v:%d] Successfully marked as done (no tweets to fetch).", cfg.Name, t.ID)
}
continue
}
wg.Add(1)
t := t
cfg.WorkQueue <- func(ctx context.Context) {
defer wg.Done()
// Fetch user credentials for this.
// TODO(lukegb): maybe this needs to be smarter (e.g. for protected accounts?), but for now we just pick someone who could see the original tweet.
var accessToken, accessSecret string
if err := cfg.DatabasePool.QueryRow(ctx, "SELECT ua.access_token, ua.access_secret FROM user_accounts ua INNER JOIN user_accounts_tweets uat ON uat.userid=ua.userid WHERE uat.tweetid=$1 LIMIT 1", t.ID).Scan(&accessToken, &accessSecret); err != nil {
log.Printf("[%v:%d] Retrieving OAuth user credentials: %v", cfg.Name, t.ID, err)
return
}
httpClient := cfg.OAuthConfig.Client(ctx, oauth1.NewToken(accessToken, accessSecret))
for tid := range tweetIDsToFetch {
// Check if we already have tid.
log.Printf("[%v:%d] Fetching %d", cfg.Name, t.ID, tid)
var cnt int
if err := cfg.DatabasePool.QueryRow(ctx, "SELECT COUNT(1) FROM tweets WHERE id=$1", tid).Scan(&cnt); err != nil {
log.Printf("[%v:%d:%d] Existence check failed: %v", cfg.Name, t.ID, tid, err)
return
}
if cnt > 0 {
log.Printf("[%v:%d] Already have %d", cfg.Name, t.ID, tid)
continue
}
if _, ok := <-fetchLimited; !ok {
log.Printf("[%v:%d] Out of fetch tokens", cfg.Name, t.ID)
break
}
// We don't have it already; let's fetch it from ze Twitterz.
req, err := http.NewRequest("GET", fmt.Sprintf("https://api.twitter.com/1.1/statuses/show.json?id=%d&include_entities=true&include_ext_alt_text=true&tweet_mode=extended", tid), nil)
if err != nil {
log.Printf("[%v:%d:%d] Constructing GET request: %v", cfg.Name, t.ID, tid, err)
return
}
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
log.Printf("[%v:%d:%d] Executing GET request: %v", cfg.Name, t.ID, tid, err)
return
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
// Expected.
break
case http.StatusNotFound:
// Tweet has probably been deleted, or is otherwise ACLed.
log.Printf("[%v:%d:%d] Not found response; assuming that it's been deleted and treating as a success.", cfg.Name, t.ID, tid)
continue
default:
log.Printf("[%v:%d:%d] GET statuses/show.json returned %d - %v", cfg.Name, t.ID, tid, resp.StatusCode, resp.Status)
return
}
tweetBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[%v:%d:%d] ReadAll: %v", cfg.Name, t.ID, tid, err)
return
}
var subtweet tweetData
if err := json.Unmarshal(tweetBytes, &subtweet); err != nil {
log.Printf("[%v:%d:%d] Decoding tweet as JSON: %v", cfg.Name, t.ID, tid, err)
return
}
// Insert it into the database.
if _, err := cfg.DatabasePool.Exec(ctx, "INSERT INTO tweets (id, text, object) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", subtweet.ID, subtweet.Text, tweetBytes); err != nil {
log.Printf("[%v:%d:%d] Inserting tweet into database: %v", cfg.Name, t.ID, tid, err)
return
}
// Insert the ACLs.
if _, err := cfg.DatabasePool.Exec(ctx, "INSERT INTO user_accounts_tweets SELECT userid, $1 tweetid, false on_timeline FROM user_accounts_tweets WHERE tweetid=$2 ON CONFLICT DO NOTHING", tid, t.ID); err != nil {
log.Printf("[%v:%d:%d] Inserting ACLs into table failed: %v", cfg.Name, t.ID, tid, err)
return
}
}
// We have fetched all the subtweets, mark the tweet as complete.
if _, err := cfg.DatabasePool.Exec(ctx, "UPDATE tweets SET fetched_related_tweets=true WHERE id=$1", t.ID); err != nil {
log.Printf("[%v:%d] Updating tweet to mark as completed: %v", cfg.Name, t.ID, err)
} else {
log.Printf("[%v:%d] Successfully processed.", cfg.Name, t.ID)
}
}
}
wg.Wait()
log.Printf("[%v] Done with work.", cfg.Name)
return nil
}
func tickToWorker(w worker) worker {
return func(ctx context.Context, cfg WorkerConfig) error {
log.Printf("[%v] Performing initial tick.", cfg.Name)
if err := w(ctx, cfg); err != nil {
log.Printf("[%v] Initial tick failed: %v", cfg.Name, err)
}
t := time.NewTicker(cfg.TickInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
log.Printf("[%v] Tick...", cfg.Name)
if err := w(ctx, cfg); err != nil {
log.Printf("[%v] Tick failed: %v", cfg.Name, err)
}
}
}
}
}
var (
mediaFetchWorker = tickToWorker(mediaFetchTick)
relatedFetchWorker = tickToWorker(relatedFetchTick)
)
func main() {
flag.Parse()
ctx := context.Background()
conn, err := pgxpool.Connect(ctx, *databaseURL)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer conn.Close()
bucket, err := blob.OpenBucket(ctx, *mediaFetchDestination)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to bucket: %v\n", err)
os.Exit(1)
}
defer bucket.Close()
ckey, csecret := os.Getenv("TWITTER_OAUTH_CONSUMER_KEY"), os.Getenv("TWITTER_OAUTH_CONSUMER_SECRET")
if ckey == "" || csecret == "" {
fmt.Fprintf(os.Stderr, "No TWITTER_OAUTH_CONSUMER_KEY or TWITTER_OAUTH_CONSUMER_SECRET\n")
os.Exit(1)
}
twitterOAuthConfig := oauth1.NewConfig(ckey, csecret)
stop := func() <-chan struct{} {
st := make(chan os.Signal)
signal.Notify(st, os.Interrupt)
stopCh := make(chan struct{})
go func() {
<-st
log.Printf("Shutting down")
close(stopCh)
signal.Reset(os.Interrupt)
}()
return stopCh
}()
cctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
workQueue := make(chan func(context.Context))
wg.Add(*workerCount)
for n := 0; n < *workerCount; n++ {
go func() {
ctx := cctx
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case f := <-workQueue:
f(ctx)
}
}
}()
}
workers := map[string]worker{
"media": mediaFetchWorker,
"related": relatedFetchWorker,
}
wg.Add(len(workers))
cfg := WorkerConfig{
TickInterval: *tickInterval,
OAuthConfig: twitterOAuthConfig,
WorkAtOnce: *workAtOnce,
DatabasePool: conn,
WorkQueue: workQueue,
Bucket: bucket,
}
configs := map[string]WorkerConfig{
"media": cfg.Merge(WorkerConfig{
Name: "media",
TickInterval: *mediaTickInterval,
WorkAtOnce: *mediaWorkAtOnce,
}),
"related": cfg.Merge(WorkerConfig{
Name: "related",
TickInterval: *relatedTickInterval,
WorkAtOnce: *relatedWorkAtOnce,
}),
}
for wn, w := range workers {
wn, w := wn, w
wcfg, ok := configs[wn]
if !ok {
wcfg = cfg
}
go func() {
defer wg.Done()
log.Printf("Starting worker %v.", wn)
if err := w(cctx, wcfg); err != nil {
log.Printf("[%v] %v", wn, err)
}
}()
}
<-stop
log.Printf("Got stop signal. Shutting down...")
cancel()
wg.Wait()
}