depot/go/nix/bcacheup/bcacheup.go

486 lines
12 KiB
Go
Raw Normal View History

// Binary bcachegc garbage collects a Nix binary cache.
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"time"
"github.com/jamespfennell/xz"
"github.com/numtide/go-nix/nixbase32"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
2024-11-16 15:30:41 +00:00
"git.lukegb.com/lukegb/depot/go/nix/nar"
"git.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"git.lukegb.com/lukegb/depot/go/nix/nixstore"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/gcsblob"
2024-11-16 15:30:41 +00:00
_ "git.lukegb.com/lukegb/depot/go/vault/vaultgcsblob"
)
var (
2022-10-09 17:10:46 +00:00
blobURLFlag = flag.String("cache_url", "", "Cache URL")
stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.")
deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.")
maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 64, "Limit for the number of concurrent uploads.")
)
var (
hashExtractRegexp = regexp.MustCompile(`(^|/)([0-9a-df-np-sv-z]{32})([-.].*)?$`)
trustedCaches = []string{
"https://cache.nixos.org",
}
)
func hashExtract(s string) string {
res := hashExtractRegexp.FindStringSubmatch(s)
if len(res) == 0 {
return ""
}
return res[2]
}
type state int
const (
stateUnknown state = iota
stateCheckingShouldUpload
stateUploadingReferences
stateWaitingForUploadSlot
stateUploadingContent
stateCopyingContent
stateUploadingNarinfo
stateSkipped
stateFailed
stateUploaded
stateMax
)
func (s state) Terminal() bool {
return s == stateSkipped || s == stateUploaded || s == stateFailed
}
func (s state) String() string {
return map[state]string{
stateUnknown: "unknown",
stateCheckingShouldUpload: "determining if upload required",
stateUploadingReferences: "uploading references",
stateWaitingForUploadSlot: "waiting for upload slot",
stateUploadingContent: "uploading content",
stateCopyingContent: "copying content",
stateUploadingNarinfo: "uploading narinfo",
stateSkipped: "skipped",
stateFailed: "failed",
stateUploaded: "uploaded",
}[s]
}
type stateInfo struct {
Current state
Since time.Time
Path string
}
type stateTracker struct {
mu sync.Mutex
pathState map[string]stateInfo
}
func (t *stateTracker) SetState(p string, s state) {
si := stateInfo{
Current: s,
Since: time.Now(),
Path: p,
}
t.mu.Lock()
if t.pathState == nil {
t.pathState = make(map[string]stateInfo)
}
t.pathState[p] = si
t.mu.Unlock()
}
func (t *stateTracker) CurrentState() map[string]stateInfo {
out := make(map[string]stateInfo, len(t.pathState))
t.mu.Lock()
for k, v := range t.pathState {
out[k] = v
}
t.mu.Unlock()
return out
}
func (t *stateTracker) StateSummary() string {
states := t.CurrentState()
countByState := map[state]int{}
var oldestActive []stateInfo
for _, s := range states {
countByState[s.Current]++
if !s.Current.Terminal() && s.Current != stateUploadingReferences {
oldestActive = append(oldestActive, s)
}
}
sort.Slice(oldestActive, func(i, j int) bool {
a, b := oldestActive[i], oldestActive[j]
return a.Since.Before(b.Since)
})
var firstLineBits []string
for n := stateUnknown; n < stateMax; n++ {
c := countByState[n]
if c != 0 {
firstLineBits = append(firstLineBits, fmt.Sprintf("%d %s", c, n))
}
}
lines := []string{
strings.Join(firstLineBits, ", "),
}
for n := 0; n < len(oldestActive) && n < 20; n++ {
si := oldestActive[n]
lines = append(lines, fmt.Sprintf("\t%s: %s (for %s)", si.Path, si.Current, time.Since(si.Since).Truncate(time.Second)))
}
return strings.Join(lines, "\n")
}
type uploader struct {
bucket *blob.Bucket
2022-10-09 23:49:44 +00:00
store nixstore.Store
storePath string
st stateTracker
uploadLimiter chan int
uploadSF singleflight.Group
deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too.
}
type byteCounterWriter struct{ n uint64 }
func (w *byteCounterWriter) Write(b []byte) (int, error) {
w.n += uint64(len(b))
return len(b), nil
}
func (u *uploader) inStore(ctx context.Context, path string) (bool, error) {
// Check if the narinfo exists.
key, err := keyForPath(path)
if err != nil {
return false, fmt.Errorf("computing narinfo key for %v: %w", path, err)
}
return u.bucket.Exists(ctx, key)
}
func (u *uploader) inTrustedCaches(ctx context.Context, path string) (bool, error) {
key, err := keyForPath(path)
if err != nil {
return false, fmt.Errorf("computing narinfo key for %v: %w", path, err)
}
for _, c := range trustedCaches {
req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%v/%v", c, key), nil)
if err != nil {
return false, fmt.Errorf("constructing request for %v/%v: %v", c, key, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, fmt.Errorf("making request for %v/%v: %v", c, key, err)
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true, nil
}
}
return false, nil
}
func (u *uploader) shouldUpload(ctx context.Context, path string) (bool, error) {
inStore, err := u.inStore(ctx, path)
if err != nil {
return false, err
}
if inStore {
return false, nil
}
inTrustedCaches, err := u.inTrustedCaches(ctx, path)
if err != nil {
return false, err
}
if inTrustedCaches {
return false, nil
}
return true, nil
}
func (u *uploader) uploadContent(ctx context.Context, ni *narinfo.NarInfo, path string, dst io.Writer) error {
if !ni.NarHash.Valid() {
return fmt.Errorf("nar hash for %v is not valid", path)
}
narHasher := ni.NarHash.Algorithm.Hash()
fileHasher := ni.NarHash.Algorithm.Hash()
fileByteCounter := &byteCounterWriter{}
xzWriter := xz.NewWriter(io.MultiWriter(fileHasher, fileByteCounter, dst))
w := io.MultiWriter(narHasher, xzWriter)
narSize, err := nar.Pack(w, nar.DirFS(u.storePath), filepath.Base(path))
if err != nil {
return fmt.Errorf("packing %v as NAR: %w", path, err)
}
if err := xzWriter.Close(); err != nil {
return fmt.Errorf("compressing with xz: %w", err)
}
// Check the NAR hash is correct.
if uint64(narSize) != ni.NarSize {
return fmt.Errorf("uploaded nar was %d bytes; expected %d bytes", narSize, ni.NarSize)
}
narHash := narinfo.Hash{
Hash: narHasher.Sum(nil),
Algorithm: ni.NarHash.Algorithm,
}
if len(narHash.Hash) != len(ni.NarHash.Hash) {
return fmt.Errorf("uploaded nar hash length was %d bytes; expected %d bytes", len(narHash.Hash), len(ni.NarHash.Hash))
}
if got, want := narHash.String(), ni.NarHash.String(); got != want {
return fmt.Errorf("uploaded nar hash was %v; wanted %v", got, want)
}
ni.Compression = narinfo.CompressionXz
ni.FileHash = narinfo.Hash{
Hash: fileHasher.Sum(nil),
Algorithm: ni.NarHash.Algorithm,
}
ni.FileSize = fileByteCounter.n
return nil
}
func keyForPath(storePath string) (string, error) {
fileHash := hashExtract(storePath)
if fileHash == "" {
return "", fmt.Errorf("store path %v seems to be invalid: couldn't extract hash", storePath)
}
return fmt.Sprintf("%s.narinfo", fileHash), nil
}
func (u *uploader) uploadNARInfo(ctx context.Context, ni *narinfo.NarInfo) error {
key, err := keyForPath(ni.StorePath)
if err != nil {
return err
}
return u.bucket.WriteAll(ctx, key, []byte(ni.String()), nil)
}
func (u *uploader) uploadRefs(ctx context.Context, current string, refs []string) error {
if len(refs) == 0 {
return nil
}
eg, egctx := errgroup.WithContext(ctx)
for _, ref := range refs {
refPath := filepath.Join(u.storePath, ref)
if current == refPath {
// We depend on ourselves, which is fine.
continue
}
eg.Go(func() error {
return u.Upload(egctx, refPath)
})
}
return eg.Wait()
}
func (u *uploader) getUploadSlot() func() {
slot := <-u.uploadLimiter
return func() {
u.uploadLimiter <- slot
}
}
func (u *uploader) upload(ctx context.Context, path string) error {
u.st.SetState(path, stateCheckingShouldUpload)
shouldUploadThis, err := u.shouldUpload(ctx, path)
if err != nil {
u.st.SetState(path, stateFailed)
return fmt.Errorf("determining if we should upload %v: %w", path, err)
}
if !shouldUploadThis && !u.deepCheckGalactic {
u.st.SetState(path, stateSkipped)
return nil
}
if shouldUploadThis {
log.Printf("Uploading %v", path)
}
ni, err := u.store.NARInfo(path)
if err != nil {
u.st.SetState(path, stateFailed)
return fmt.Errorf("getting narinfo for %v: %w", path, err)
}
u.st.SetState(path, stateUploadingReferences)
if err := u.uploadRefs(ctx, ni.StorePath, ni.References); err != nil {
u.st.SetState(path, stateFailed)
return fmt.Errorf("uploading references for %v: %w", path, err)
}
if !shouldUploadThis {
u.st.SetState(path, stateSkipped)
return nil
}
u.st.SetState(path, stateWaitingForUploadSlot)
defer u.getUploadSlot()()
u.st.SetState(path, stateUploadingContent)
if !ni.NarHash.Valid() {
u.st.SetState(path, stateFailed)
return fmt.Errorf("nar hash is invalid")
}
tmpPath := fmt.Sprintf("tmp-uploading/%s", filepath.Base(path))
dst, err := u.bucket.NewWriter(ctx, tmpPath, nil)
if err != nil {
u.st.SetState(path, stateFailed)
return fmt.Errorf("creating new writer for upload of %v: %w", path, err)
}
defer dst.Close()
if err := u.uploadContent(ctx, ni, path, dst); err != nil {
u.st.SetState(path, stateFailed)
if err := dst.Close(); err == nil {
u.bucket.Delete(ctx, tmpPath)
}
return err
}
if err := dst.Close(); err != nil {
u.bucket.Delete(ctx, tmpPath)
u.st.SetState(path, stateFailed)
return fmt.Errorf("completing tmp write of %v: %w", path, err)
}
// Copy to the "correct" place.
u.st.SetState(path, stateCopyingContent)
finalDstKey := fmt.Sprintf("nar/%s.nar.xz", nixbase32.EncodeToString(ni.FileHash.Hash))
if err := u.bucket.Copy(ctx, finalDstKey, tmpPath, nil); err != nil {
u.bucket.Delete(ctx, tmpPath)
u.st.SetState(path, stateFailed)
return fmt.Errorf("copying tmp write of %v from %v to %v: %w", path, tmpPath, finalDstKey, err)
}
if err := u.bucket.Delete(ctx, tmpPath); err != nil {
u.bucket.Delete(ctx, finalDstKey)
u.st.SetState(path, stateFailed)
return fmt.Errorf("cleaning up tmp write of %v at %v: %w", path, tmpPath, err)
}
ni.URL = finalDstKey
u.st.SetState(path, stateUploadingNarinfo)
if err := u.uploadNARInfo(ctx, ni); err != nil {
u.bucket.Delete(ctx, finalDstKey)
u.st.SetState(path, stateFailed)
return fmt.Errorf("uploading narinfo for %v: %w", path, err)
}
u.st.SetState(path, stateUploaded)
return nil
}
func (u *uploader) Upload(ctx context.Context, path string) error {
resCh := u.uploadSF.DoChan(path, func() (any, error) {
err := u.upload(ctx, path)
if err != nil {
log.Printf("Uploading %v: %v", path, err)
}
return nil, err
})
select {
case <-ctx.Done():
return ctx.Err()
case res := <-resCh:
return res.Err
}
}
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Printf("Using cache URL %q", *blobURLFlag)
bucket, err := blob.OpenBucket(ctx, *blobURLFlag)
if err != nil {
log.Fatalf("opening bucket %q: %v", *blobURLFlag, err)
}
defer bucket.Close()
2022-10-09 23:49:44 +00:00
store, err := nixstore.Open()
if err != nil {
log.Fatalf("opening Nix store: %v", err)
}
defer store.Close()
u := &uploader{
bucket: bucket,
store: store,
storePath: "/nix/store",
deepCheckGalactic: *deepCheckGalacticFlag,
uploadLimiter: make(chan int, *maximumConcurrentUploads),
}
for n := 0; n < *maximumConcurrentUploads; n++ {
u.uploadLimiter <- n
}
go func() {
2022-10-09 17:10:46 +00:00
t := time.NewTicker(*stateSummaryIntervalFlag)
defer t.Stop()
for {
select {
case <-t.C:
log.Print(u.st.StateSummary())
}
}
}()
for _, p := range flag.Args() {
realPath, err := os.Readlink(p)
if err != nil {
log.Fatalf("Readlink(%q): %v", p, err)
}
if err := u.Upload(ctx, realPath); err != nil {
log.Fatalf("upload(%q): %v", p, err)
}
}
}