go/twitterchiver: port relatedfetcher to gocloud.dev

This commit is contained in:
Luke Granger-Brown 2021-04-01 00:07:41 +00:00
parent 0961a68532
commit 3d475e0efe
2 changed files with 17 additions and 14 deletions

View file

@ -47,6 +47,9 @@
depot.third_party.gopkgs."github.com".dghubble.oauth1 depot.third_party.gopkgs."github.com".dghubble.oauth1
depot.third_party.gopkgs."github.com".jackc.pgtype depot.third_party.gopkgs."github.com".jackc.pgtype
depot.third_party.gopkgs."github.com".jackc.pgx.v4.pgxpool depot.third_party.gopkgs."github.com".jackc.pgx.v4.pgxpool
depot.third_party.gopkgs."gocloud.dev".blob
depot.third_party.gopkgs."gocloud.dev".blob.s3blob
depot.third_party.gopkgs."gocloud.dev".gcerrors
]; ];
}; };
} }

View file

@ -24,6 +24,10 @@ import (
"github.com/jackc/pgtype" "github.com/jackc/pgtype"
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
"gocloud.dev/blob"
"gocloud.dev/gcerrors"
_ "gocloud.dev/blob/s3blob"
) )
var ( var (
@ -32,7 +36,7 @@ var (
workerCount = flag.Int("workers", 20, "Workers to spawn for fetching info from Twitter API") workerCount = flag.Int("workers", 20, "Workers to spawn for fetching info from Twitter API")
workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).") workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).")
mediaFetchDestination = flag.String("media_fetch_destination", "/tmp", "Path to write fetched media to.") mediaFetchDestination = flag.String("media_fetch_destination", "s3://public-lukegb-twitterchiver?endpoint=objdump.zxcvbnm.ninja&region=london", "gocloud.dev URL to write fetched media to.")
mediaTickInterval = flag.Duration("media_tick_interval", 0, "Tick interval for media - takes priority over --tick_interval if non-zero.") mediaTickInterval = flag.Duration("media_tick_interval", 0, "Tick interval for media - takes priority over --tick_interval if non-zero.")
mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.") mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.")
@ -48,6 +52,7 @@ type WorkerConfig struct {
OAuthConfig *oauth1.Config OAuthConfig *oauth1.Config
WorkAtOnce int WorkAtOnce int
DatabasePool *pgxpool.Pool DatabasePool *pgxpool.Pool
Bucket *blob.Bucket
WorkQueue chan func(context.Context) WorkQueue chan func(context.Context)
} }
@ -116,7 +121,7 @@ func mediaFetchOutputPath(u string) (string, error) {
} }
path := strings.TrimLeft(p.Path, "/") path := strings.TrimLeft(p.Path, "/")
return filepath.Join(*mediaFetchDestination, p.Host, path), nil return filepath.Join(p.Host, path), nil
} }
type databaseQuerier interface { type databaseQuerier interface {
@ -211,24 +216,19 @@ func mediaFetchTick(ctx context.Context, cfg WorkerConfig) error {
} }
// Fetch u -> out // Fetch u -> out
if fi, err := os.Stat(out); err == nil { if fi, err := cfg.Bucket.Attributes(ctx, out); err == nil {
// File already exists, does it have non-zero size? // File already exists, does it have non-zero size?
if fi.Size() > 0 { if fi.Size > 0 {
// It does! // It does!
log.Printf("[%v:%d:%v] %v already exists; skipping", cfg.Name, t.ID, u, out) log.Printf("[%v:%d:%v] %v already exists; skipping", cfg.Name, t.ID, u, out)
continue continue
} }
log.Printf("[%v:%d:%v] %v already exists but has zero size; refetching", cfg.Name, t.ID, u, out) log.Printf("[%v:%d:%v] %v already exists but has zero size; refetching", cfg.Name, t.ID, u, out)
} else if !os.IsNotExist(err) { } else if gcerrors.Code(err) == gcerrors.NotFound {
log.Printf("[%v:%d:%v] checking output path %v: %v", cfg.Name, t.ID, u, out, err) log.Printf("[%v:%d:%v] checking output path %v: %v", cfg.Name, t.ID, u, out, err)
return return
} }
if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil {
log.Printf("[%v:%d:%v] creating output path %v: %v", cfg.Name, t.ID, u, out, err)
return
}
if err := func() error { if err := func() error {
resp, err := http.Get(u) resp, err := http.Get(u)
if err != nil { if err != nil {
@ -246,21 +246,21 @@ func mediaFetchTick(ctx context.Context, cfg WorkerConfig) error {
return fmt.Errorf("HTTP status %d - %v", resp.StatusCode, resp.Status) return fmt.Errorf("HTTP status %d - %v", resp.StatusCode, resp.Status)
} }
f, err := os.Create(out) f, err := cfg.Bucket.NewWriter(ctx, out, nil)
if err != nil { if err != nil {
return err return err
} }
defer f.Close()
written, err := io.Copy(f, resp.Body) written, err := io.Copy(f, resp.Body)
if err != nil { if err != nil {
// Try to delete the file we created to force a refetch next time. // Try to delete the file we created to force a refetch next time.
_ = f.Truncate(0) // Ignore the error, though. _ = f.Close()
_ = cfg.Bucket.Delete(ctx, out)
return fmt.Errorf("IO error: %w", err) return fmt.Errorf("IO error: %w", err)
} }
log.Printf("[%v:%d:%v] downloaded %d bytes.", cfg.Name, t.ID, u, written) log.Printf("[%v:%d:%v] downloaded %d bytes.", cfg.Name, t.ID, u, written)
return nil return f.Close()
}(); err != nil { }(); err != nil {
log.Printf("[%v:%d:%v] %v", cfg.Name, t.ID, u, err) log.Printf("[%v:%d:%v] %v", cfg.Name, t.ID, u, err)
return return