twitterchiver/archiver: store mapping of tweet ID to user ID that saw it
This commit is contained in:
parent
68d8fa6d42
commit
014d1c881a
2 changed files with 40 additions and 6 deletions
|
@ -110,12 +110,23 @@ func fetchTweets(ctx context.Context, twitterOAuthConfig *oauth1.Config, user us
|
|||
}
|
||||
|
||||
type userResult struct {
|
||||
UserID int64
|
||||
TweetID int64
|
||||
UserID int64
|
||||
TweetID int64
|
||||
SeenTweetIDs []int64
|
||||
}
|
||||
|
||||
func updateUser(ctx context.Context, conn *pgx.Conn, ur userResult) error {
|
||||
_, err := conn.Exec(ctx, "UPDATE user_accounts SET latest_tweet=$1 WHERE userid=$2", ur.TweetID, ur.UserID)
|
||||
var b pgx.Batch
|
||||
b.Queue("UPDATE user_accounts SET latest_tweet=$1 WHERE userid=$2", ur.TweetID, ur.UserID)
|
||||
for _, twid := range ur.SeenTweetIDs {
|
||||
b.Queue("INSERT INTO user_accounts_tweets (userid, tweetid) VALUES ($1, $2) ON CONFLICT DO NOTHING", ur.UserID, twid)
|
||||
}
|
||||
log.Printf("sending batch of updates for %d...", ur.UserID)
|
||||
start := time.Now()
|
||||
br := conn.SendBatch(ctx, &b)
|
||||
defer br.Close()
|
||||
_, err := br.Exec()
|
||||
log.Printf("batch of updates for %d done in %s", ur.UserID, time.Now().Sub(start))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -145,6 +156,7 @@ func tick(ctx context.Context, conn *pgx.Conn, twitterOAuthConfig *oauth1.Config
|
|||
defer wg.Done()
|
||||
|
||||
var largestTweetID int64
|
||||
var seenTweetIDs []int64
|
||||
var tweetCount int
|
||||
for user := range userCh {
|
||||
tweets, err := fetchTweets(ctx, twitterOAuthConfig, user)
|
||||
|
@ -155,6 +167,7 @@ func tick(ctx context.Context, conn *pgx.Conn, twitterOAuthConfig *oauth1.Config
|
|||
for _, tw := range tweets {
|
||||
tweetCount++
|
||||
resultCh <- tw
|
||||
seenTweetIDs = append(seenTweetIDs, tw.ID)
|
||||
if tw.ID > largestTweetID {
|
||||
largestTweetID = tw.ID
|
||||
}
|
||||
|
@ -162,8 +175,9 @@ func tick(ctx context.Context, conn *pgx.Conn, twitterOAuthConfig *oauth1.Config
|
|||
log.Printf("ingested tweets for %v (%d tweets, largest ID %d)", user.Username, tweetCount, largestTweetID)
|
||||
if largestTweetID != 0 {
|
||||
userResultCh <- userResult{
|
||||
UserID: user.UserID,
|
||||
TweetID: largestTweetID,
|
||||
UserID: user.UserID,
|
||||
TweetID: largestTweetID,
|
||||
SeenTweetIDs: seenTweetIDs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,10 +8,30 @@ CREATE TABLE tweets (
|
|||
object JSONB NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX tweets_retweet_text_idx ON public.tweets USING gin (to_tsvector('english'::regconfig, ((object -> 'retweeted_status'::text) ->> 'full_text'::text)));
|
||||
CREATE INDEX tweets_retweet_user_idx ON public.tweets USING btree (((((object -> 'retweeted_status'::text) -> 'user'::text) ->> 'screen_name'::text)));
|
||||
CREATE INDEX tweets_text_idx ON public.tweets USING gin (to_tsvector('english'::regconfig, text));
|
||||
CREATE INDEX tweets_user_idx ON public.tweets USING btree ((((object -> 'user'::text) ->> 'screen_name'::text)));
|
||||
|
||||
CREATE TABLE user_accounts (
|
||||
username TEXT NOT NULL PRIMARY KEY,
|
||||
userid BIGINT NOT NULL,
|
||||
access_token TEXT NOT NULL,
|
||||
access_secret TEXT NOT NULL,
|
||||
latest_tweet BIGINT NOT NULL
|
||||
latest_tweet BIGINT NOT NULL,
|
||||
CONSTRAINT unique_userid
|
||||
UNIQUE (userid)
|
||||
);
|
||||
|
||||
CREATE TABLE public.user_accounts_tweets (
|
||||
userid bigint NOT NULL,
|
||||
tweetid bigint NOT NULL,
|
||||
PRIMARY KEY (userid, tweetid),
|
||||
UNIQUE (tweetid, userid),
|
||||
CONSTRAINT fk_userid
|
||||
FOREIGN KEY (userid)
|
||||
REFERENCES user_accounts(userid),
|
||||
CONSTRAINT fk_tweetid
|
||||
FOREIGN KEY (tweetid)
|
||||
REFERENCES tweets(id))
|
||||
);
|
||||
|
|
Loading…
Reference in a new issue