From de8756c4e64901ebe104bc5f0c4cdc72d065aec1 Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Sun, 16 Apr 2023 00:36:34 +0000 Subject: [PATCH] bcacheup: use a limited channel rather than a sync.Cond --- go/nix/bcacheup/bcacheup.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/go/nix/bcacheup/bcacheup.go b/go/nix/bcacheup/bcacheup.go index 5c68e0e72b..cd60f9ff48 100644 --- a/go/nix/bcacheup/bcacheup.go +++ b/go/nix/bcacheup/bcacheup.go @@ -34,7 +34,7 @@ var ( blobURLFlag = flag.String("cache_url", "", "Cache URL") stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.") deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.") - maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 32, "Limit for the number of concurrent uploads.") + maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 64, "Limit for the number of concurrent uploads.") ) var ( @@ -164,9 +164,7 @@ type uploader struct { storePath string st stateTracker - uploadInFlight int - uploadMu sync.Mutex - uploadCond *sync.Cond + uploadLimiter chan int uploadSF singleflight.Group deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too. @@ -318,20 +316,11 @@ func (u *uploader) uploadRefs(ctx context.Context, current string, refs []string return eg.Wait() } -func (u *uploader) getUploadSlot() struct{} { - u.uploadMu.Lock() - for u.uploadInFlight >= *maximumConcurrentUploads { - u.uploadCond.Wait() +func (u *uploader) getUploadSlot() func() { + slot := <-u.uploadLimiter + return func() { + u.uploadLimiter <- slot } - u.uploadInFlight++ - u.uploadMu.Unlock() - return struct{}{} -} - -func (u *uploader) returnUploadSlot(slot struct{}) { - u.uploadMu.Lock() - u.uploadInFlight-- - u.uploadMu.Unlock() } func (u *uploader) upload(ctx context.Context, path string) error { @@ -369,7 +358,7 @@ func (u *uploader) upload(ctx context.Context, path string) error { } u.st.SetState(path, stateWaitingForUploadSlot) - defer u.returnUploadSlot(u.getUploadSlot()) + defer u.getUploadSlot()() u.st.SetState(path, stateUploadingContent) if !ni.NarHash.Valid() { @@ -466,9 +455,11 @@ func main() { store: store, storePath: "/nix/store", deepCheckGalactic: *deepCheckGalacticFlag, + uploadLimiter: make(chan int, *maximumConcurrentUploads), + } + for n := 0; n < *maximumConcurrentUploads; n++ { + u.uploadLimiter <- n } - - u.uploadCond = sync.NewCond(&u.uploadMu) go func() { t := time.NewTicker(*stateSummaryIntervalFlag)