diff --git a/go/nix/bcachegc/bcachegc.go b/go/nix/bcachegc/bcachegc.go new file mode 100644 index 0000000000..dc057282f8 --- /dev/null +++ b/go/nix/bcachegc/bcachegc.go @@ -0,0 +1,387 @@ +// Binary bcachegc garbage collects a Nix binary cache. +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "regexp" + "strings" + "sync" + "time" + + "gocloud.dev/blob" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" + "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" + + _ "gocloud.dev/blob/gcsblob" +) + +var ( + blobURLFlag = flag.String("cache_url", "", "Cache URL") + rootsFlag = flag.String("gc_roots_file", "", "Path to file containing GC roots") + dryRunFlag = flag.Bool("dry_run", true, "If true, don't actually delete anything") + trustPublicCacheFlag = flag.Bool("trust_public_cache", true, "If true, also remove things which are present in public caches") +) + +var ( + hashExtractRegexp = regexp.MustCompile(`(^|/)([0-9a-df-np-sv-z]{32})([-.].*)?$`) + + publicCaches = []string{ + "https://cache.nixos.org", + } +) + +func hashExtract(s string) string { + res := hashExtractRegexp.FindStringSubmatch(s) + if len(res) == 0 { + return "" + } + return res[2] +} + +func loadGCRoots(ctx context.Context, rootsFilePath string) (map[string]bool, error) { + r := map[string]bool{} + f, err := os.Open(rootsFilePath) + if err != nil { + return nil, fmt.Errorf("opening file: %w", err) + } + defer f.Close() + + s := bufio.NewScanner(f) + for s.Scan() { + if h := hashExtract(strings.TrimSpace(s.Text())); h != "" { + r[h] = true + } + } + if err := s.Err(); err != nil { + return nil, fmt.Errorf("scanning: %w", err) + } + + return r, nil +} + +type gcer struct { + bucket *blob.Bucket + + publicCaches []string + publicCacheSF singleflight.Group + publicCacheMap sync.Map +} + +func (g *gcer) enumerateBucket(ctx context.Context) (narInfoFilenames, narFilenames map[string]bool, err error) { + iter := g.bucket.List(nil) + narInfoFilenames = map[string]bool{} + narFilenames = map[string]bool{} +iterLoop: + for { + obj, err := iter.Next(ctx) + switch { + case err == io.EOF: + break iterLoop + case err != nil: + return nil, nil, fmt.Errorf("iterating files: %w", err) + } + if strings.HasSuffix(obj.Key, ".narinfo") { + narInfoFilenames[obj.Key] = true + } + if strings.HasPrefix(obj.Key, "nar/") { + narFilenames[obj.Key] = true + } + } + return narInfoFilenames, narFilenames, nil +} + +func (g *gcer) isInPublicCache(ctx context.Context, narinfoHash string) (bool, error) { + r, ok := g.publicCacheMap.Load(narinfoHash) + if ok { + return r.(bool), nil + } + r, err, _ := g.publicCacheSF.Do(narinfoHash, func() (interface{}, error) { + for _, c := range g.publicCaches { + req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%v/%v.narinfo", c, narinfoHash), nil) + if err != nil { + return false, fmt.Errorf("constructing request for %v/%v.narinfo: %v", c, narinfoHash, err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false, fmt.Errorf("making request for %v/%v.narinfo: %v", c, narinfoHash, err) + } + resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + g.publicCacheMap.Store(narinfoHash, true) + return true, nil + } + } + g.publicCacheMap.Store(narinfoHash, false) + return false, nil + }) + return r.(bool), err +} + +func (g *gcer) expandRoots(ctx context.Context, roots, narInfoFilenames map[string]bool) (keepNarInfos map[string]bool, keepNars map[string]bool, err error) { + const workerCount = 20 + keepNarInfos = map[string]bool{} + keepNars = map[string]bool{} + + eg, egctx := errgroup.WithContext(ctx) + var running sync.WaitGroup + + // Deduplicating and reenqueueing worker + foundRootCh := make(chan string) + keepNarCh := make(chan string) + workCh := make(chan string, len(narInfoFilenames)) + eg.Go(func() error { + for { + select { + case <-egctx.Done(): + return egctx.Err() + case f, ok := <-keepNarCh: + if !ok { + return nil + } + keepNars[f] = true + } + } + }) + eg.Go(func() error { + defer close(workCh) + defer close(keepNarCh) + for { + select { + case <-egctx.Done(): + return egctx.Err() + case r, ok := <-foundRootCh: + if !ok { + return nil + } + if keepNarInfos[r] { + running.Done() + continue + } + keepNarInfos[r] = true + workCh <- r + } + } + }) + + // Workers + processWork := func(w string) error { + defer running.Done() + + // Check if it exists in the set of known narInfoFilenames. + filename := fmt.Sprintf("%s.narinfo", w) + if !narInfoFilenames[filename] { + return fmt.Errorf("no known narinfo %v", filename) + } + + // Great, parse it. + r, err := g.bucket.NewReader(egctx, filename, nil) + if err != nil { + return fmt.Errorf("opening narinfo %v: %w", filename, err) + } + defer r.Close() + + ni, err := narinfo.ParseNarInfo(r) + if err != nil { + return fmt.Errorf("parsing narinfo: %w", err) + } + keepNarCh <- ni.URL + for _, ref := range ni.References { + refHash := hashExtract(ref) + if exists, err := g.isInPublicCache(egctx, refHash); err != nil { + return fmt.Errorf("checking if %v is in public cache: %v", refHash, err) + } else if !exists { + running.Add(1) + foundRootCh <- refHash + } + } + + return nil + } + for n := 0; n < workerCount; n++ { + eg.Go(func() error { + for { + select { + case <-egctx.Done(): + return egctx.Err() + case w, ok := <-workCh: + if !ok { + return nil + } + if err := processWork(w); err != nil { + return fmt.Errorf("working on %q: %w", w, err) + } + } + } + }) + } + + // Enqueue work! + running.Add(1) + go func() { + defer running.Done() + for k := range roots { + running.Add(1) + select { + case foundRootCh <- k: + case <-egctx.Done(): + running.Done() + log.Printf("aborting enqueueing %v", egctx.Err()) + return + } + } + log.Printf("Enqueued %d roots", len(roots)) + }() + + // Once there's no work remaining to be done, close the foundRootCh. + go func() { + running.Wait() + log.Println("Work queues drained, shutting down") + close(foundRootCh) + }() + + if err := eg.Wait(); err != nil { + return nil, nil, err + } + return keepNarInfos, keepNars, nil +} + +func except(src map[string]bool, other map[string]bool) map[string]bool { + out := make(map[string]bool) + for k := range src { + out[k] = true + } + for k := range other { + delete(out, k) + } + return out +} + +func (g *gcer) deleteFiles(ctx context.Context, files map[string]bool) error { + const workerCount = 64 + deleteCh := make(chan string, workerCount) + + eg, egctx := errgroup.WithContext(ctx) + for n := 0; n < workerCount; n++ { + eg.Go(func() error { + ctx := egctx + for f := range deleteCh { + if err := g.bucket.Delete(ctx, f); err != nil { + return err + } + } + return nil + }) + } + + start := time.Now() + n := len(files) +queueLoop: + for f := range files { + deletedSoFar := len(files) - n + if n%1000 == 0 && deletedSoFar > 0 { + deletedInSeconds := time.Since(start).Seconds() + secondsPerFile := deletedInSeconds / float64(deletedSoFar) + estimatedTimeRemaining := time.Duration(float64(n) * secondsPerFile * float64(time.Second)) + log.Printf("%d files left (guesstimating %s for this batch of files)...", n, estimatedTimeRemaining) + } + select { + case <-egctx.Done(): + break queueLoop + case deleteCh <- f: + } + n-- + } + close(deleteCh) + log.Printf("Done enqueueing %d files for deleting in %s.", len(files), time.Since(start)) + + if err := eg.Wait(); err != nil { + return err + } + log.Printf("Done deleting %d files in %s.", len(files), time.Since(start)) + return nil +} + +func (g *gcer) Run(ctx context.Context, roots map[string]bool) error { + log.Println("Iterating through bucket") + start := time.Now() + narInfoFilenames, narFilenames, err := g.enumerateBucket(ctx) + if err != nil { + return fmt.Errorf("enumerateBucket: %w", err) + } + log.Printf("Done loading bucket data; found %d narinfo files and %d nar files in %v", len(narInfoFilenames), len(narFilenames), time.Since(start)) + + // Expand gcroots to all covered NAR infos. + // Error if gcroots don't have a .narinfo in scope. + log.Println("Expanding gcroots") + roots, nars, err := g.expandRoots(ctx, roots, narInfoFilenames) + if err != nil { + return fmt.Errorf("expandRoots: %w", err) + } + log.Printf("Got %d narinfos to keep, with %d nars", len(roots), len(nars)) + + // Compute what we're _deleting_. + deleteNarInfos := map[string]bool{} + for k := range roots { + deleteNarInfos[k+".narinfo"] = true + } + deleteNarInfos = except(narInfoFilenames, deleteNarInfos) + deleteNars := except(narFilenames, nars) + log.Printf("Computed %d narinfos and %d nars to delete", len(deleteNarInfos), len(deleteNars)) + + if *dryRunFlag { + return nil + } + + // Delete all the narinfos first, then the NARs. This avoids having narinfos with no actual NAR behind them. + log.Printf("Deleting %d narinfos...", len(deleteNarInfos)) + if err := g.deleteFiles(ctx, deleteNarInfos); err != nil { + return fmt.Errorf("deleting narinfos: %w", err) + } + + log.Printf("Deleting %d nars...", len(deleteNars)) + if err := g.deleteFiles(ctx, deleteNars); err != nil { + return fmt.Errorf("deleting nars: %w", err) + } + + return nil +} + +func main() { + flag.Parse() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + log.Printf("Using cache URL %q", *blobURLFlag) + bucket, err := blob.OpenBucket(ctx, *blobURLFlag) + if err != nil { + log.Fatalf("opening bucket %q: %v", *blobURLFlag, err) + } + defer bucket.Close() + + roots, err := loadGCRoots(ctx, *rootsFlag) + if err != nil { + log.Fatalf("loading GC roots from %q: %v", *rootsFlag, err) + } + log.Printf("Using %d roots from %v", len(roots), *rootsFlag) + + g := &gcer{ + bucket: bucket, + } + if *trustPublicCacheFlag { + g.publicCaches = publicCaches + } + if err := g.Run(ctx, roots); err != nil { + log.Fatalf("running GC: %v", err) + } +} diff --git a/go/nix/bcachegc/default.nix b/go/nix/bcachegc/default.nix new file mode 100644 index 0000000000..80dd8fdc87 --- /dev/null +++ b/go/nix/bcachegc/default.nix @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: 2020 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.program { + name = "bcachegc"; + srcs = [ + ./bcachegc.go + ]; + deps = with depot; [ + third_party.gopkgs."gocloud.dev".blob + third_party.gopkgs."gocloud.dev".blob.gcsblob + third_party.gopkgs."golang.org".x.sync.errgroup + third_party.gopkgs."golang.org".x.sync.singleflight + go.nix.nar.narinfo + ]; +} diff --git a/go/nix/default.nix b/go/nix/default.nix index 7f4b56d657..86135fe7f3 100644 --- a/go/nix/default.nix +++ b/go/nix/default.nix @@ -5,4 +5,5 @@ args: { nar = import ./nar args; + bcachegc = import ./bcachegc args; } diff --git a/go/nix/nar/narinfo/default.nix b/go/nix/nar/narinfo/default.nix index 9be17bf0f7..5f041d18a6 100644 --- a/go/nix/nar/narinfo/default.nix +++ b/go/nix/nar/narinfo/default.nix @@ -5,6 +5,7 @@ { depot, ... }: depot.third_party.buildGo.package { name = "narinfo"; + path = "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"; srcs = [ ./narinfo.go ]; diff --git a/third_party/gopkgs/cloud.google.com/go/default.nix b/third_party/gopkgs/cloud.google.com/go/default.nix index a0df7c76c1..ebdfcfae73 100644 --- a/third_party/gopkgs/cloud.google.com/go/default.nix +++ b/third_party/gopkgs/cloud.google.com/go/default.nix @@ -30,6 +30,7 @@ depot.third_party.buildGo.external { gopkgs."google.golang.org".api.transport.grpc gopkgs."google.golang.org".genproto.googleapis.firestore.v1 gopkgs."google.golang.org".genproto.googleapis.iam.v1 + gopkgs."google.golang.org".genproto.googleapis.iam.credentials.v1 gopkgs."google.golang.org".genproto.googleapis.storage.v2 gopkgs."google.golang.org".genproto.googleapis.type.expr gopkgs."google.golang.org".genproto.googleapis.rpc.code diff --git a/third_party/gopkgs/gocloud.dev/default.nix b/third_party/gopkgs/gocloud.dev/default.nix index 411d246dfd..eee6bb2f9d 100644 --- a/third_party/gopkgs/gocloud.dev/default.nix +++ b/third_party/gopkgs/gocloud.dev/default.nix @@ -12,12 +12,22 @@ depot.third_party.buildGo.external { hash = "sha256:13xzadkcw7zj0rj0zc2yalap27wcp7bxcgjhmqaclbcxv0h7ynmj"; }; deps = with depot.third_party; [ + gopkgs."cloud.google.com".go.compute.metadata + gopkgs."cloud.google.com".go.iam.credentials.apiv1 + gopkgs."cloud.google.com".go.storage gopkgs."go.opencensus.io".stats gopkgs."go.opencensus.io".stats.view gopkgs."go.opencensus.io".tag gopkgs."go.opencensus.io".trace gopkgs."go.opencensus.io".plugin.ocgrpc gopkgs."golang.org".x.xerrors + gopkgs."golang.org".x.oauth2 + gopkgs."golang.org".x.oauth2.google + gopkgs."google.golang.org".api.googleapi + gopkgs."google.golang.org".api.iterator + gopkgs."google.golang.org".api.option + gopkgs."google.golang.org".genproto.googleapis.iam.credentials.v1 + gopkgs."google.golang.org".grpc gopkgs."google.golang.org".grpc.codes gopkgs."google.golang.org".grpc.status gopkgs."github.com".googleapis.gax-go.v2 diff --git a/third_party/gopkgs/google.golang.org/genproto/default.nix b/third_party/gopkgs/google.golang.org/genproto/default.nix index 0943418de1..c2ccfa52df 100644 --- a/third_party/gopkgs/google.golang.org/genproto/default.nix +++ b/third_party/gopkgs/google.golang.org/genproto/default.nix @@ -19,6 +19,7 @@ depot.third_party.buildGo.external { gopkgs."google.golang.org".protobuf.reflect.protoreflect gopkgs."google.golang.org".protobuf.runtime.protoimpl gopkgs."google.golang.org".protobuf.types.known.anypb + gopkgs."google.golang.org".protobuf.types.known.durationpb gopkgs."google.golang.org".protobuf.types.known.emptypb gopkgs."google.golang.org".protobuf.types.known.fieldmaskpb gopkgs."google.golang.org".protobuf.types.known.structpb