diff --git a/go/twitterchiver/archiver/archiver.go b/go/twitterchiver/archiver/archiver.go index 94f57e5788..159e1a6c39 100644 --- a/go/twitterchiver/archiver/archiver.go +++ b/go/twitterchiver/archiver/archiver.go @@ -110,12 +110,23 @@ func fetchTweets(ctx context.Context, twitterOAuthConfig *oauth1.Config, user us } type userResult struct { - UserID int64 - TweetID int64 + UserID int64 + TweetID int64 + SeenTweetIDs []int64 } func updateUser(ctx context.Context, conn *pgx.Conn, ur userResult) error { - _, err := conn.Exec(ctx, "UPDATE user_accounts SET latest_tweet=$1 WHERE userid=$2", ur.TweetID, ur.UserID) + var b pgx.Batch + b.Queue("UPDATE user_accounts SET latest_tweet=$1 WHERE userid=$2", ur.TweetID, ur.UserID) + for _, twid := range ur.SeenTweetIDs { + b.Queue("INSERT INTO user_accounts_tweets (userid, tweetid) VALUES ($1, $2) ON CONFLICT DO NOTHING", ur.UserID, twid) + } + log.Printf("sending batch of updates for %d...", ur.UserID) + start := time.Now() + br := conn.SendBatch(ctx, &b) + defer br.Close() + _, err := br.Exec() + log.Printf("batch of updates for %d done in %s", ur.UserID, time.Now().Sub(start)) return err } @@ -145,6 +156,7 @@ func tick(ctx context.Context, conn *pgx.Conn, twitterOAuthConfig *oauth1.Config defer wg.Done() var largestTweetID int64 + var seenTweetIDs []int64 var tweetCount int for user := range userCh { tweets, err := fetchTweets(ctx, twitterOAuthConfig, user) @@ -155,6 +167,7 @@ func tick(ctx context.Context, conn *pgx.Conn, twitterOAuthConfig *oauth1.Config for _, tw := range tweets { tweetCount++ resultCh <- tw + seenTweetIDs = append(seenTweetIDs, tw.ID) if tw.ID > largestTweetID { largestTweetID = tw.ID } @@ -162,8 +175,9 @@ func tick(ctx context.Context, conn *pgx.Conn, twitterOAuthConfig *oauth1.Config log.Printf("ingested tweets for %v (%d tweets, largest ID %d)", user.Username, tweetCount, largestTweetID) if largestTweetID != 0 { userResultCh <- userResult{ - UserID: user.UserID, - TweetID: largestTweetID, + UserID: user.UserID, + TweetID: largestTweetID, + SeenTweetIDs: seenTweetIDs, } } } diff --git a/go/twitterchiver/archiver/schema.sql b/go/twitterchiver/archiver/schema.sql index 70299b2227..7aee199c28 100644 --- a/go/twitterchiver/archiver/schema.sql +++ b/go/twitterchiver/archiver/schema.sql @@ -8,10 +8,30 @@ CREATE TABLE tweets ( object JSONB NOT NULL ); +CREATE INDEX tweets_retweet_text_idx ON public.tweets USING gin (to_tsvector('english'::regconfig, ((object -> 'retweeted_status'::text) ->> 'full_text'::text))); +CREATE INDEX tweets_retweet_user_idx ON public.tweets USING btree (((((object -> 'retweeted_status'::text) -> 'user'::text) ->> 'screen_name'::text))); +CREATE INDEX tweets_text_idx ON public.tweets USING gin (to_tsvector('english'::regconfig, text)); +CREATE INDEX tweets_user_idx ON public.tweets USING btree ((((object -> 'user'::text) ->> 'screen_name'::text))); + CREATE TABLE user_accounts ( username TEXT NOT NULL PRIMARY KEY, userid BIGINT NOT NULL, access_token TEXT NOT NULL, access_secret TEXT NOT NULL, - latest_tweet BIGINT NOT NULL + latest_tweet BIGINT NOT NULL, + CONSTRAINT unique_userid + UNIQUE (userid) +); + +CREATE TABLE public.user_accounts_tweets ( + userid bigint NOT NULL, + tweetid bigint NOT NULL, + PRIMARY KEY (userid, tweetid), + UNIQUE (tweetid, userid), + CONSTRAINT fk_userid + FOREIGN KEY (userid) + REFERENCES user_accounts(userid), + CONSTRAINT fk_tweetid + FOREIGN KEY (tweetid) + REFERENCES tweets(id)) );