go/nix/bcachegc: init

This commit is contained in:
Luke Granger-Brown 2021-07-27 22:10:47 +01:00
parent e44e0f128e
commit d03cc487f5
7 changed files with 419 additions and 0 deletions

387
go/nix/bcachegc/bcachegc.go Normal file
View file

@ -0,0 +1,387 @@
// Binary bcachegc garbage collects a Nix binary cache.
package main
import (
"bufio"
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"regexp"
"strings"
"sync"
"time"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
_ "gocloud.dev/blob/gcsblob"
)
var (
blobURLFlag = flag.String("cache_url", "", "Cache URL")
rootsFlag = flag.String("gc_roots_file", "", "Path to file containing GC roots")
dryRunFlag = flag.Bool("dry_run", true, "If true, don't actually delete anything")
trustPublicCacheFlag = flag.Bool("trust_public_cache", true, "If true, also remove things which are present in public caches")
)
var (
hashExtractRegexp = regexp.MustCompile(`(^|/)([0-9a-df-np-sv-z]{32})([-.].*)?$`)
publicCaches = []string{
"https://cache.nixos.org",
}
)
func hashExtract(s string) string {
res := hashExtractRegexp.FindStringSubmatch(s)
if len(res) == 0 {
return ""
}
return res[2]
}
func loadGCRoots(ctx context.Context, rootsFilePath string) (map[string]bool, error) {
r := map[string]bool{}
f, err := os.Open(rootsFilePath)
if err != nil {
return nil, fmt.Errorf("opening file: %w", err)
}
defer f.Close()
s := bufio.NewScanner(f)
for s.Scan() {
if h := hashExtract(strings.TrimSpace(s.Text())); h != "" {
r[h] = true
}
}
if err := s.Err(); err != nil {
return nil, fmt.Errorf("scanning: %w", err)
}
return r, nil
}
type gcer struct {
bucket *blob.Bucket
publicCaches []string
publicCacheSF singleflight.Group
publicCacheMap sync.Map
}
func (g *gcer) enumerateBucket(ctx context.Context) (narInfoFilenames, narFilenames map[string]bool, err error) {
iter := g.bucket.List(nil)
narInfoFilenames = map[string]bool{}
narFilenames = map[string]bool{}
iterLoop:
for {
obj, err := iter.Next(ctx)
switch {
case err == io.EOF:
break iterLoop
case err != nil:
return nil, nil, fmt.Errorf("iterating files: %w", err)
}
if strings.HasSuffix(obj.Key, ".narinfo") {
narInfoFilenames[obj.Key] = true
}
if strings.HasPrefix(obj.Key, "nar/") {
narFilenames[obj.Key] = true
}
}
return narInfoFilenames, narFilenames, nil
}
func (g *gcer) isInPublicCache(ctx context.Context, narinfoHash string) (bool, error) {
r, ok := g.publicCacheMap.Load(narinfoHash)
if ok {
return r.(bool), nil
}
r, err, _ := g.publicCacheSF.Do(narinfoHash, func() (interface{}, error) {
for _, c := range g.publicCaches {
req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%v/%v.narinfo", c, narinfoHash), nil)
if err != nil {
return false, fmt.Errorf("constructing request for %v/%v.narinfo: %v", c, narinfoHash, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, fmt.Errorf("making request for %v/%v.narinfo: %v", c, narinfoHash, err)
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
g.publicCacheMap.Store(narinfoHash, true)
return true, nil
}
}
g.publicCacheMap.Store(narinfoHash, false)
return false, nil
})
return r.(bool), err
}
func (g *gcer) expandRoots(ctx context.Context, roots, narInfoFilenames map[string]bool) (keepNarInfos map[string]bool, keepNars map[string]bool, err error) {
const workerCount = 20
keepNarInfos = map[string]bool{}
keepNars = map[string]bool{}
eg, egctx := errgroup.WithContext(ctx)
var running sync.WaitGroup
// Deduplicating and reenqueueing worker
foundRootCh := make(chan string)
keepNarCh := make(chan string)
workCh := make(chan string, len(narInfoFilenames))
eg.Go(func() error {
for {
select {
case <-egctx.Done():
return egctx.Err()
case f, ok := <-keepNarCh:
if !ok {
return nil
}
keepNars[f] = true
}
}
})
eg.Go(func() error {
defer close(workCh)
defer close(keepNarCh)
for {
select {
case <-egctx.Done():
return egctx.Err()
case r, ok := <-foundRootCh:
if !ok {
return nil
}
if keepNarInfos[r] {
running.Done()
continue
}
keepNarInfos[r] = true
workCh <- r
}
}
})
// Workers
processWork := func(w string) error {
defer running.Done()
// Check if it exists in the set of known narInfoFilenames.
filename := fmt.Sprintf("%s.narinfo", w)
if !narInfoFilenames[filename] {
return fmt.Errorf("no known narinfo %v", filename)
}
// Great, parse it.
r, err := g.bucket.NewReader(egctx, filename, nil)
if err != nil {
return fmt.Errorf("opening narinfo %v: %w", filename, err)
}
defer r.Close()
ni, err := narinfo.ParseNarInfo(r)
if err != nil {
return fmt.Errorf("parsing narinfo: %w", err)
}
keepNarCh <- ni.URL
for _, ref := range ni.References {
refHash := hashExtract(ref)
if exists, err := g.isInPublicCache(egctx, refHash); err != nil {
return fmt.Errorf("checking if %v is in public cache: %v", refHash, err)
} else if !exists {
running.Add(1)
foundRootCh <- refHash
}
}
return nil
}
for n := 0; n < workerCount; n++ {
eg.Go(func() error {
for {
select {
case <-egctx.Done():
return egctx.Err()
case w, ok := <-workCh:
if !ok {
return nil
}
if err := processWork(w); err != nil {
return fmt.Errorf("working on %q: %w", w, err)
}
}
}
})
}
// Enqueue work!
running.Add(1)
go func() {
defer running.Done()
for k := range roots {
running.Add(1)
select {
case foundRootCh <- k:
case <-egctx.Done():
running.Done()
log.Printf("aborting enqueueing %v", egctx.Err())
return
}
}
log.Printf("Enqueued %d roots", len(roots))
}()
// Once there's no work remaining to be done, close the foundRootCh.
go func() {
running.Wait()
log.Println("Work queues drained, shutting down")
close(foundRootCh)
}()
if err := eg.Wait(); err != nil {
return nil, nil, err
}
return keepNarInfos, keepNars, nil
}
func except(src map[string]bool, other map[string]bool) map[string]bool {
out := make(map[string]bool)
for k := range src {
out[k] = true
}
for k := range other {
delete(out, k)
}
return out
}
func (g *gcer) deleteFiles(ctx context.Context, files map[string]bool) error {
const workerCount = 64
deleteCh := make(chan string, workerCount)
eg, egctx := errgroup.WithContext(ctx)
for n := 0; n < workerCount; n++ {
eg.Go(func() error {
ctx := egctx
for f := range deleteCh {
if err := g.bucket.Delete(ctx, f); err != nil {
return err
}
}
return nil
})
}
start := time.Now()
n := len(files)
queueLoop:
for f := range files {
deletedSoFar := len(files) - n
if n%1000 == 0 && deletedSoFar > 0 {
deletedInSeconds := time.Since(start).Seconds()
secondsPerFile := deletedInSeconds / float64(deletedSoFar)
estimatedTimeRemaining := time.Duration(float64(n) * secondsPerFile * float64(time.Second))
log.Printf("%d files left (guesstimating %s for this batch of files)...", n, estimatedTimeRemaining)
}
select {
case <-egctx.Done():
break queueLoop
case deleteCh <- f:
}
n--
}
close(deleteCh)
log.Printf("Done enqueueing %d files for deleting in %s.", len(files), time.Since(start))
if err := eg.Wait(); err != nil {
return err
}
log.Printf("Done deleting %d files in %s.", len(files), time.Since(start))
return nil
}
func (g *gcer) Run(ctx context.Context, roots map[string]bool) error {
log.Println("Iterating through bucket")
start := time.Now()
narInfoFilenames, narFilenames, err := g.enumerateBucket(ctx)
if err != nil {
return fmt.Errorf("enumerateBucket: %w", err)
}
log.Printf("Done loading bucket data; found %d narinfo files and %d nar files in %v", len(narInfoFilenames), len(narFilenames), time.Since(start))
// Expand gcroots to all covered NAR infos.
// Error if gcroots don't have a .narinfo in scope.
log.Println("Expanding gcroots")
roots, nars, err := g.expandRoots(ctx, roots, narInfoFilenames)
if err != nil {
return fmt.Errorf("expandRoots: %w", err)
}
log.Printf("Got %d narinfos to keep, with %d nars", len(roots), len(nars))
// Compute what we're _deleting_.
deleteNarInfos := map[string]bool{}
for k := range roots {
deleteNarInfos[k+".narinfo"] = true
}
deleteNarInfos = except(narInfoFilenames, deleteNarInfos)
deleteNars := except(narFilenames, nars)
log.Printf("Computed %d narinfos and %d nars to delete", len(deleteNarInfos), len(deleteNars))
if *dryRunFlag {
return nil
}
// Delete all the narinfos first, then the NARs. This avoids having narinfos with no actual NAR behind them.
log.Printf("Deleting %d narinfos...", len(deleteNarInfos))
if err := g.deleteFiles(ctx, deleteNarInfos); err != nil {
return fmt.Errorf("deleting narinfos: %w", err)
}
log.Printf("Deleting %d nars...", len(deleteNars))
if err := g.deleteFiles(ctx, deleteNars); err != nil {
return fmt.Errorf("deleting nars: %w", err)
}
return nil
}
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Printf("Using cache URL %q", *blobURLFlag)
bucket, err := blob.OpenBucket(ctx, *blobURLFlag)
if err != nil {
log.Fatalf("opening bucket %q: %v", *blobURLFlag, err)
}
defer bucket.Close()
roots, err := loadGCRoots(ctx, *rootsFlag)
if err != nil {
log.Fatalf("loading GC roots from %q: %v", *rootsFlag, err)
}
log.Printf("Using %d roots from %v", len(roots), *rootsFlag)
g := &gcer{
bucket: bucket,
}
if *trustPublicCacheFlag {
g.publicCaches = publicCaches
}
if err := g.Run(ctx, roots); err != nil {
log.Fatalf("running GC: %v", err)
}
}

View file

@ -0,0 +1,18 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.program {
name = "bcachegc";
srcs = [
./bcachegc.go
];
deps = with depot; [
third_party.gopkgs."gocloud.dev".blob
third_party.gopkgs."gocloud.dev".blob.gcsblob
third_party.gopkgs."golang.org".x.sync.errgroup
third_party.gopkgs."golang.org".x.sync.singleflight
go.nix.nar.narinfo
];
}

View file

@ -5,4 +5,5 @@
args:
{
nar = import ./nar args;
bcachegc = import ./bcachegc args;
}

View file

@ -5,6 +5,7 @@
{ depot, ... }:
depot.third_party.buildGo.package {
name = "narinfo";
path = "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo";
srcs = [
./narinfo.go
];

View file

@ -30,6 +30,7 @@ depot.third_party.buildGo.external {
gopkgs."google.golang.org".api.transport.grpc
gopkgs."google.golang.org".genproto.googleapis.firestore.v1
gopkgs."google.golang.org".genproto.googleapis.iam.v1
gopkgs."google.golang.org".genproto.googleapis.iam.credentials.v1
gopkgs."google.golang.org".genproto.googleapis.storage.v2
gopkgs."google.golang.org".genproto.googleapis.type.expr
gopkgs."google.golang.org".genproto.googleapis.rpc.code

View file

@ -12,12 +12,22 @@ depot.third_party.buildGo.external {
hash = "sha256:13xzadkcw7zj0rj0zc2yalap27wcp7bxcgjhmqaclbcxv0h7ynmj";
};
deps = with depot.third_party; [
gopkgs."cloud.google.com".go.compute.metadata
gopkgs."cloud.google.com".go.iam.credentials.apiv1
gopkgs."cloud.google.com".go.storage
gopkgs."go.opencensus.io".stats
gopkgs."go.opencensus.io".stats.view
gopkgs."go.opencensus.io".tag
gopkgs."go.opencensus.io".trace
gopkgs."go.opencensus.io".plugin.ocgrpc
gopkgs."golang.org".x.xerrors
gopkgs."golang.org".x.oauth2
gopkgs."golang.org".x.oauth2.google
gopkgs."google.golang.org".api.googleapi
gopkgs."google.golang.org".api.iterator
gopkgs."google.golang.org".api.option
gopkgs."google.golang.org".genproto.googleapis.iam.credentials.v1
gopkgs."google.golang.org".grpc
gopkgs."google.golang.org".grpc.codes
gopkgs."google.golang.org".grpc.status
gopkgs."github.com".googleapis.gax-go.v2

View file

@ -19,6 +19,7 @@ depot.third_party.buildGo.external {
gopkgs."google.golang.org".protobuf.reflect.protoreflect
gopkgs."google.golang.org".protobuf.runtime.protoimpl
gopkgs."google.golang.org".protobuf.types.known.anypb
gopkgs."google.golang.org".protobuf.types.known.durationpb
gopkgs."google.golang.org".protobuf.types.known.emptypb
gopkgs."google.golang.org".protobuf.types.known.fieldmaskpb
gopkgs."google.golang.org".protobuf.types.known.structpb