bcacheup: use a limited channel rather than a sync.Cond

This commit is contained in:
Luke Granger-Brown 2023-04-16 00:36:34 +00:00
parent 642ad1c6b7
commit de8756c4e6

View file

@ -34,7 +34,7 @@ var (
blobURLFlag = flag.String("cache_url", "", "Cache URL") blobURLFlag = flag.String("cache_url", "", "Cache URL")
stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.") stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.")
deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.") deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.")
maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 32, "Limit for the number of concurrent uploads.") maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 64, "Limit for the number of concurrent uploads.")
) )
var ( var (
@ -164,9 +164,7 @@ type uploader struct {
storePath string storePath string
st stateTracker st stateTracker
uploadInFlight int uploadLimiter chan int
uploadMu sync.Mutex
uploadCond *sync.Cond
uploadSF singleflight.Group uploadSF singleflight.Group
deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too. deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too.
@ -318,20 +316,11 @@ func (u *uploader) uploadRefs(ctx context.Context, current string, refs []string
return eg.Wait() return eg.Wait()
} }
func (u *uploader) getUploadSlot() struct{} { func (u *uploader) getUploadSlot() func() {
u.uploadMu.Lock() slot := <-u.uploadLimiter
for u.uploadInFlight >= *maximumConcurrentUploads { return func() {
u.uploadCond.Wait() u.uploadLimiter <- slot
} }
u.uploadInFlight++
u.uploadMu.Unlock()
return struct{}{}
}
func (u *uploader) returnUploadSlot(slot struct{}) {
u.uploadMu.Lock()
u.uploadInFlight--
u.uploadMu.Unlock()
} }
func (u *uploader) upload(ctx context.Context, path string) error { func (u *uploader) upload(ctx context.Context, path string) error {
@ -369,7 +358,7 @@ func (u *uploader) upload(ctx context.Context, path string) error {
} }
u.st.SetState(path, stateWaitingForUploadSlot) u.st.SetState(path, stateWaitingForUploadSlot)
defer u.returnUploadSlot(u.getUploadSlot()) defer u.getUploadSlot()()
u.st.SetState(path, stateUploadingContent) u.st.SetState(path, stateUploadingContent)
if !ni.NarHash.Valid() { if !ni.NarHash.Valid() {
@ -466,9 +455,11 @@ func main() {
store: store, store: store,
storePath: "/nix/store", storePath: "/nix/store",
deepCheckGalactic: *deepCheckGalacticFlag, deepCheckGalactic: *deepCheckGalacticFlag,
uploadLimiter: make(chan int, *maximumConcurrentUploads),
}
for n := 0; n < *maximumConcurrentUploads; n++ {
u.uploadLimiter <- n
} }
u.uploadCond = sync.NewCond(&u.uploadMu)
go func() { go func() {
t := time.NewTicker(*stateSummaryIntervalFlag) t := time.NewTicker(*stateSummaryIntervalFlag)