bcacheup: limit upload concurrency to avoid OOMing

This commit is contained in:
Luke Granger-Brown 2023-04-15 15:10:06 +00:00
parent 2d1bf2ffae
commit 642ad1c6b7

View file

@ -34,6 +34,7 @@ var (
blobURLFlag = flag.String("cache_url", "", "Cache URL") blobURLFlag = flag.String("cache_url", "", "Cache URL")
stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.") stateSummaryIntervalFlag = flag.Duration("state_summary_interval", 10*time.Second, "Time between state summary outputs.")
deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.") deepCheckGalacticFlag = flag.Bool("deep_check_galactic", false, "Ensure that all references are available in the cache before skipping, rather than just checking that the path itself is available.")
maximumConcurrentUploads = flag.Int("maximum_concurrent_uploads", 32, "Limit for the number of concurrent uploads.")
) )
var ( var (
@ -58,6 +59,7 @@ const (
stateUnknown state = iota stateUnknown state = iota
stateCheckingShouldUpload stateCheckingShouldUpload
stateUploadingReferences stateUploadingReferences
stateWaitingForUploadSlot
stateUploadingContent stateUploadingContent
stateCopyingContent stateCopyingContent
stateUploadingNarinfo stateUploadingNarinfo
@ -75,6 +77,7 @@ func (s state) String() string {
stateUnknown: "unknown", stateUnknown: "unknown",
stateCheckingShouldUpload: "determining if upload required", stateCheckingShouldUpload: "determining if upload required",
stateUploadingReferences: "uploading references", stateUploadingReferences: "uploading references",
stateWaitingForUploadSlot: "waiting for upload slot",
stateUploadingContent: "uploading content", stateUploadingContent: "uploading content",
stateCopyingContent: "copying content", stateCopyingContent: "copying content",
stateUploadingNarinfo: "uploading narinfo", stateUploadingNarinfo: "uploading narinfo",
@ -161,6 +164,10 @@ type uploader struct {
storePath string storePath string
st stateTracker st stateTracker
uploadInFlight int
uploadMu sync.Mutex
uploadCond *sync.Cond
uploadSF singleflight.Group uploadSF singleflight.Group
deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too. deepCheckGalactic bool // if true, don't skip if this item is already present; always check the references to make sure they exist too.
} }
@ -311,6 +318,22 @@ func (u *uploader) uploadRefs(ctx context.Context, current string, refs []string
return eg.Wait() return eg.Wait()
} }
func (u *uploader) getUploadSlot() struct{} {
u.uploadMu.Lock()
for u.uploadInFlight >= *maximumConcurrentUploads {
u.uploadCond.Wait()
}
u.uploadInFlight++
u.uploadMu.Unlock()
return struct{}{}
}
func (u *uploader) returnUploadSlot(slot struct{}) {
u.uploadMu.Lock()
u.uploadInFlight--
u.uploadMu.Unlock()
}
func (u *uploader) upload(ctx context.Context, path string) error { func (u *uploader) upload(ctx context.Context, path string) error {
u.st.SetState(path, stateCheckingShouldUpload) u.st.SetState(path, stateCheckingShouldUpload)
@ -345,6 +368,9 @@ func (u *uploader) upload(ctx context.Context, path string) error {
return nil return nil
} }
u.st.SetState(path, stateWaitingForUploadSlot)
defer u.returnUploadSlot(u.getUploadSlot())
u.st.SetState(path, stateUploadingContent) u.st.SetState(path, stateUploadingContent)
if !ni.NarHash.Valid() { if !ni.NarHash.Valid() {
u.st.SetState(path, stateFailed) u.st.SetState(path, stateFailed)
@ -442,6 +468,8 @@ func main() {
deepCheckGalactic: *deepCheckGalacticFlag, deepCheckGalactic: *deepCheckGalacticFlag,
} }
u.uploadCond = sync.NewCond(&u.uploadMu)
go func() { go func() {
t := time.NewTicker(*stateSummaryIntervalFlag) t := time.NewTicker(*stateSummaryIntervalFlag)
defer t.Stop() defer t.Stop()