diff --git a/go/nix/bcacheup/bcacheup.go b/go/nix/bcacheup/bcacheup.go index 8adbafe7d4..5c68e0e72b 100644 --- a/go/nix/bcacheup/bcacheup.go +++ b/go/nix/bcacheup/bcacheup.go @@ -34,6 +34,7 @@ var ( blobURLFlag = flag.String("cache_url", "", "Cache URL") stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.") deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.") + maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 32, "Limit for the number of concurrent uploads.") ) var ( @@ -58,6 +59,7 @@ const ( stateUnknown state = iota stateCheckingShouldUpload stateUploadingReferences + stateWaitingForUploadSlot stateUploadingContent stateCopyingContent stateUploadingNarinfo @@ -75,6 +77,7 @@ func (s state) String() string { stateUnknown: "unknown", stateCheckingShouldUpload: "determining if upload required", stateUploadingReferences: "uploading references", + stateWaitingForUploadSlot: "waiting for upload slot", stateUploadingContent: "uploading content", stateCopyingContent: "copying content", stateUploadingNarinfo: "uploading narinfo", @@ -161,6 +164,10 @@ type uploader struct { storePath string st stateTracker + uploadInFlight int + uploadMu sync.Mutex + uploadCond *sync.Cond + uploadSF singleflight.Group deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too. } @@ -311,6 +318,22 @@ func (u *uploader) uploadRefs(ctx context.Context, current string, refs []string return eg.Wait() } +func (u *uploader) getUploadSlot() struct{} { + u.uploadMu.Lock() + for u.uploadInFlight >= *maximumConcurrentUploads { + u.uploadCond.Wait() + } + u.uploadInFlight++ + u.uploadMu.Unlock() + return struct{}{} +} + +func (u *uploader) returnUploadSlot(slot struct{}) { + u.uploadMu.Lock() + u.uploadInFlight-- + u.uploadMu.Unlock() +} + func (u *uploader) upload(ctx context.Context, path string) error { u.st.SetState(path, stateCheckingShouldUpload) @@ -345,6 +368,9 @@ func (u *uploader) upload(ctx context.Context, path string) error { return nil } + u.st.SetState(path, stateWaitingForUploadSlot) + defer u.returnUploadSlot(u.getUploadSlot()) + u.st.SetState(path, stateUploadingContent) if !ni.NarHash.Valid() { u.st.SetState(path, stateFailed) @@ -442,6 +468,8 @@ func main() { deepCheckGalactic: *deepCheckGalacticFlag, } + u.uploadCond = sync.NewCond(&u.uploadMu) + go func() { t := time.NewTicker(*stateSummaryIntervalFlag) defer t.Stop()