From 3d475e0efef8a76c5f5294417b8856331a0976bc Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Thu, 1 Apr 2021 00:07:41 +0000 Subject: [PATCH] go/twitterchiver: port relatedfetcher to gocloud.dev --- go/twitterchiver/default.nix | 3 +++ go/twitterchiver/related_fetcher/fetcher.go | 28 ++++++++++----------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/go/twitterchiver/default.nix b/go/twitterchiver/default.nix index 9d62474da8..51e3098579 100644 --- a/go/twitterchiver/default.nix +++ b/go/twitterchiver/default.nix @@ -47,6 +47,9 @@ depot.third_party.gopkgs."github.com".dghubble.oauth1 depot.third_party.gopkgs."github.com".jackc.pgtype depot.third_party.gopkgs."github.com".jackc.pgx.v4.pgxpool + depot.third_party.gopkgs."gocloud.dev".blob + depot.third_party.gopkgs."gocloud.dev".blob.s3blob + depot.third_party.gopkgs."gocloud.dev".gcerrors ]; }; } diff --git a/go/twitterchiver/related_fetcher/fetcher.go b/go/twitterchiver/related_fetcher/fetcher.go index 2ffe521507..d80fd21a64 100644 --- a/go/twitterchiver/related_fetcher/fetcher.go +++ b/go/twitterchiver/related_fetcher/fetcher.go @@ -24,6 +24,10 @@ import ( "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "gocloud.dev/blob" + "gocloud.dev/gcerrors" + + _ "gocloud.dev/blob/s3blob" ) var ( @@ -32,7 +36,7 @@ var ( workerCount = flag.Int("workers", 20, "Workers to spawn for fetching info from Twitter API") workAtOnce = flag.Int("work_at_once", 40, "Work to enqueue at once (i.e. we will only look for this much work to do).") - mediaFetchDestination = flag.String("media_fetch_destination", "/tmp", "Path to write fetched media to.") + mediaFetchDestination = flag.String("media_fetch_destination", "s3://public-lukegb-twitterchiver?endpoint=objdump.zxcvbnm.ninja®ion=london", "gocloud.dev URL to write fetched media to.") mediaTickInterval = flag.Duration("media_tick_interval", 0, "Tick interval for media - takes priority over --tick_interval if non-zero.") mediaWorkAtOnce = flag.Int("media_work_at_once", 0, "Work to enqueue at once for media - takes priority over --work_at_once if non-zero.") @@ -48,6 +52,7 @@ type WorkerConfig struct { OAuthConfig *oauth1.Config WorkAtOnce int DatabasePool *pgxpool.Pool + Bucket *blob.Bucket WorkQueue chan func(context.Context) } @@ -116,7 +121,7 @@ func mediaFetchOutputPath(u string) (string, error) { } path := strings.TrimLeft(p.Path, "/") - return filepath.Join(*mediaFetchDestination, p.Host, path), nil + return filepath.Join(p.Host, path), nil } type databaseQuerier interface { @@ -211,24 +216,19 @@ func mediaFetchTick(ctx context.Context, cfg WorkerConfig) error { } // Fetch u -> out - if fi, err := os.Stat(out); err == nil { + if fi, err := cfg.Bucket.Attributes(ctx, out); err == nil { // File already exists, does it have non-zero size? - if fi.Size() > 0 { + if fi.Size > 0 { // It does! log.Printf("[%v:%d:%v] %v already exists; skipping", cfg.Name, t.ID, u, out) continue } log.Printf("[%v:%d:%v] %v already exists but has zero size; refetching", cfg.Name, t.ID, u, out) - } else if !os.IsNotExist(err) { + } else if gcerrors.Code(err) == gcerrors.NotFound { log.Printf("[%v:%d:%v] checking output path %v: %v", cfg.Name, t.ID, u, out, err) return } - if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil { - log.Printf("[%v:%d:%v] creating output path %v: %v", cfg.Name, t.ID, u, out, err) - return - } - if err := func() error { resp, err := http.Get(u) if err != nil { @@ -246,21 +246,21 @@ func mediaFetchTick(ctx context.Context, cfg WorkerConfig) error { return fmt.Errorf("HTTP status %d - %v", resp.StatusCode, resp.Status) } - f, err := os.Create(out) + f, err := cfg.Bucket.NewWriter(ctx, out, nil) if err != nil { return err } - defer f.Close() written, err := io.Copy(f, resp.Body) if err != nil { // Try to delete the file we created to force a refetch next time. - _ = f.Truncate(0) // Ignore the error, though. + _ = f.Close() + _ = cfg.Bucket.Delete(ctx, out) return fmt.Errorf("IO error: %w", err) } log.Printf("[%v:%d:%v] downloaded %d bytes.", cfg.Name, t.ID, u, written) - return nil + return f.Close() }(); err != nil { log.Printf("[%v:%d:%v] %v", cfg.Name, t.ID, u, err) return