diff --git a/go/nix/bnix/bnix.go b/go/nix/bnix/bnix.go index 7542daa5a5..4a33ddfe86 100644 --- a/go/nix/bnix/bnix.go +++ b/go/nix/bnix/bnix.go @@ -2,26 +2,356 @@ package main import ( "context" + "errors" "flag" "fmt" "io" "log" + "math/rand" "net/url" "os" + "sync" + "time" "golang.org/x/crypto/ssh" + "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" "hg.lukegb.com/lukegb/depot/go/nix/nixstore" ) var ( - remoteFlag = flag.String("remote", "unix://", "Remote.") + remoteFlag = flag.String("remote", "unix://", "Remote.") + remote2Flag = flag.String("remote2", "unix://", "Remote. But the second one. The destination, for copy.") + maxConnsFlag = flag.Int("max_conns", 10, "Max connections.") + + verboseFlag = flag.Bool("verbose", false, "Verbose.") ) func ensure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error { return d.EnsurePath(at, path) } +func narInfo(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error { + ni, err := d.NARInfo(path) + if err != nil { + return err + } + fmt.Printf("%s\n", ni) + return nil +} + +func tryEnsure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) (bool, error) { + err := d.EnsurePath(at, path) + if err == nil { + return true, nil + } + var ne nixstore.NixError + if errors.As(err, &ne) { + if ne.Code == 1 { + return false, nil + } + } + return false, err +} + +type connectionPool struct { + factory func() (*nixstore.Daemon, error) + + cond *sync.Cond + limit int + generated int + available []*nixstore.Daemon +} + +func newConnectionPool(ctx context.Context, dst string, limit int) *connectionPool { + return &connectionPool{ + factory: func() (*nixstore.Daemon, error) { + var d *nixstore.Daemon + var err error + retryInterval := 10 * time.Millisecond + maxRetry := 10 * time.Second + for n := 0; n < 20; n++ { + d, _, err = connectTo(ctx, dst) + if err == nil { + break + } + time.Sleep(retryInterval + time.Duration((rand.Float64()-0.5)*float64(retryInterval))) + retryInterval = retryInterval * 2 + if retryInterval > maxRetry { + retryInterval = maxRetry + } + } + return d, err + }, + + cond: sync.NewCond(new(sync.Mutex)), + limit: limit, + } +} + +func (p *connectionPool) getFromAvailableLocked() *nixstore.Daemon { + end := len(p.available) - 1 + d := p.available[end] + p.available = p.available[:end] + return d +} + +func (p *connectionPool) Get() (*nixstore.Daemon, error) { + p.cond.L.Lock() + if len(p.available) > 0 { + d := p.getFromAvailableLocked() + p.cond.L.Unlock() + return d, nil + } + + if p.generated < p.limit { + p.generated++ + p.cond.L.Unlock() + d, err := p.factory() + if err != nil { + p.cond.L.Lock() + p.generated-- + p.cond.L.Unlock() + return nil, err + } + return d, nil + } + + for len(p.available) == 0 { + p.cond.Wait() + } + d := p.getFromAvailableLocked() + p.cond.L.Unlock() + return d, nil +} + +func (p *connectionPool) Put(d *nixstore.Daemon) { + p.cond.L.Lock() + p.available = append(p.available, d) + p.cond.Signal() + p.cond.L.Unlock() +} + +type workState int + +const ( + StateCheckingAlreadyExists workState = iota + StateFetchingPathInfo + StateEnqueueingReferences + StateAwaitingReferences + StateCopying + StateDone + StateFailed +) + +func (s workState) String() string { + return map[workState]string{ + StateCheckingAlreadyExists: "checking already exists", + StateFetchingPathInfo: "fetching path info", + StateEnqueueingReferences: "enqueueing references", + StateAwaitingReferences: "awaiting references", + StateCopying: "copying", + StateDone: "done!", + StateFailed: "failed :(", + }[s] +} + +func (s workState) Terminal() bool { return s == StateDone || s == StateFailed } + +type workItem struct { + doneCh chan struct{} + state workState + path string + + err error + narInfo *narinfo.NarInfo + childWork []*workItem + + srcPool *connectionPool + srcAT *nixstore.ActivityTracker + dstPool *connectionPool + dstAT *nixstore.ActivityTracker + + wh *workHolder +} + +func (i *workItem) checkAlreadyExists(ctx context.Context) (workState, error) { + // i.state == StateCheckingAlreadyExists + dst, err := i.dstPool.Get() + if err != nil { + return StateFailed, err + } + defer i.dstPool.Put(dst) + + ok, err := tryEnsure(ctx, dst, i.dstAT, i.path) + if err != nil { + return StateFailed, err + } + + if ok { + return StateDone, nil + } + + return StateFetchingPathInfo, nil +} + +func (i *workItem) fetchPathInfo(ctx context.Context) (workState, error) { + // i.state == StateFetchingPathInfo + src, err := i.srcPool.Get() + if err != nil { + return StateFailed, err + } + defer i.srcPool.Put(src) + + ni, err := src.NARInfo(i.path) + if err != nil { + return StateFailed, err + } + + i.narInfo = ni + return StateEnqueueingReferences, nil +} + +func (i *workItem) enqueueReferences(ctx context.Context) (workState, error) { + // i.state == StateEnqueueingReferences + const nixStorePrefix = "/nix/store/" + var childWork []*workItem + for _, ref := range i.narInfo.References { + refPath := nixStorePrefix + ref + if i.path == refPath { + continue + } + + childWork = append(childWork, i.wh.addWork(ctx, refPath)) + } + i.childWork = childWork + if len(i.childWork) == 0 { + return StateCopying, nil + } + return StateAwaitingReferences, nil +} + +func (i *workItem) awaitReferences(ctx context.Context) (workState, error) { + // i.state == StateAwaitingReferences + for _, ci := range i.childWork { + select { + case <-ctx.Done(): + return StateFailed, ctx.Err() + case <-ci.doneCh: + if ci.state == StateFailed { + return StateFailed, fmt.Errorf("reference %v failed", ci.path) + } + if !ci.state.Terminal() { + panic(fmt.Sprintf("%v: reference %v declared done in non-terminal state %v", i.path, ci.path, ci.state)) + } + } + } + return StateCopying, nil +} + +func (i *workItem) performCopy(ctx context.Context) (workState, error) { + // i.state == StateCopying + src, err := i.srcPool.Get() + if err != nil { + return StateFailed, err + } + defer i.srcPool.Put(src) + + dst, err := i.dstPool.Get() + if err != nil { + return StateFailed, err + } + defer i.dstPool.Put(dst) + + // TODO: the rest of the owl + + return StateDone, nil +} + +func (i *workItem) run(ctx context.Context) error { + defer close(i.doneCh) + i.state = StateCheckingAlreadyExists + for !i.state.Terminal() { + if ctx.Err() != nil { + return ctx.Err() + } + + log.Printf("%s: running worker for %s", i.path, i.state) + var nextState workState + var err error + switch i.state { + case StateCheckingAlreadyExists: + nextState, err = i.checkAlreadyExists(ctx) + case StateFetchingPathInfo: + nextState, err = i.fetchPathInfo(ctx) + case StateEnqueueingReferences: + nextState, err = i.enqueueReferences(ctx) + case StateAwaitingReferences: + nextState, err = i.awaitReferences(ctx) + case StateCopying: + nextState, err = i.performCopy(ctx) + default: + log.Printf("%s: ended up in unimplemented state %s", i.path, i.state) + <-make(chan struct{}) + } + i.state = nextState + if err != nil { + log.Printf("%s: transitioning to %s: %s", i.path, nextState, err) + i.err = err + return fmt.Errorf("%s: %w", i.state, err) + } + log.Printf("%s: transitioning to %s", i.path, nextState) + } + return nil +} + +type workHolder struct { + mu sync.Mutex + work map[string]*workItem + + srcPool *connectionPool + srcAT *nixstore.ActivityTracker + dstPool *connectionPool + dstAT *nixstore.ActivityTracker +} + +func (wh *workHolder) addWork(ctx context.Context, path string) *workItem { + wh.mu.Lock() + defer wh.mu.Unlock() + + if i := wh.work[path]; i != nil { + return i + } + + i := &workItem{ + doneCh: make(chan struct{}), + path: path, + + srcPool: wh.srcPool, + srcAT: wh.srcAT, + dstPool: wh.dstPool, + dstAT: wh.dstAT, + + wh: wh, + } + wh.work[path] = i + go i.run(ctx) + return i +} + +func copyPath(ctx context.Context, poolFrom *connectionPool, atFrom *nixstore.ActivityTracker, poolTo *connectionPool, atTo *nixstore.ActivityTracker, path string) error { + wh := workHolder{ + work: make(map[string]*workItem), + srcPool: poolFrom, + srcAT: atFrom, + dstPool: poolTo, + dstAT: atTo, + } + i := wh.addWork(ctx, path) + <-i.doneCh + return i.err +} + var rs nixdrv.Resolver func loadDerivation(ctx context.Context, path string) (*nixdrv.Derivation, error) { @@ -159,6 +489,8 @@ func connectTo(ctx context.Context, remote string) (*nixstore.Daemon, nixdrv.Res func main() { flag.Parse() + nixstore.VerboseActivities = *verboseFlag + badCall := func(f string, xs ...interface{}) { fmt.Fprintf(os.Stderr, f+"\n", xs...) flag.Usage() @@ -169,6 +501,7 @@ func main() { badCall("need a subcommand") } + ctx := context.Background() var cmd func(context.Context, *nixstore.Daemon, *nixstore.ActivityTracker) error switch flag.Arg(0) { case "ensure": @@ -197,13 +530,32 @@ func main() { cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error { return buildDerivation(ctx, d, at, flag.Arg(1)) } + case "nar-info": + if flag.NArg() != 2 { + badCall("`nar-info` needs a store path") + } + cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error { + return narInfo(ctx, d, at, flag.Arg(1)) + } + case "copy": + if flag.NArg() != 2 { + badCall("`copy` needs a store path") + } + poolSrc := newConnectionPool(ctx, *remoteFlag, *maxConnsFlag) + atSrc := nixstore.NewActivityTracker() + poolDst := newConnectionPool(ctx, *remote2Flag, *maxConnsFlag) + atDst := nixstore.NewActivityTracker() + + if err := copyPath(ctx, poolSrc, atSrc, poolDst, atDst, flag.Arg(1)); err != nil { + log.Fatalf("copy: %v", err) + } + return default: badCall("bad subcommand %s", flag.Arg(0)) } at := nixstore.NewActivityTracker() - ctx := context.Background() d, rss, err := connectTo(ctx, *remoteFlag) if err != nil { log.Fatalf("connectTo(%q): %v", *remoteFlag, err) diff --git a/go/nix/nixstore/activities.go b/go/nix/nixstore/activities.go index 9f1b35991a..efac722758 100644 --- a/go/nix/nixstore/activities.go +++ b/go/nix/nixstore/activities.go @@ -10,6 +10,8 @@ const ( progressLogRateLimit = 1 * time.Second ) +var VerboseActivities = false + type ActivityType uint64 const ( @@ -194,6 +196,9 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) { } else { llt = LogLinePostBuild } + if VerboseActivities { + log.Printf("%s:%d> %s", am.ActivityID, fields[0].(string)) + } am.logs = append(am.logs, &ActivityLog{ Timestamp: now, LogLineType: llt, @@ -206,6 +211,10 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) { NewPhase: phase, }) am.phase = phase + + if VerboseActivities { + log.Printf("%d [NEW PHASE] %v", am.ActivityID, phase) + } case ResProgress: am.progress = fields @@ -231,6 +240,9 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) { am.lastProgressLog.Progress = fields if time.Since(am.lastProgressLogCreated) > progressLogRateLimit { // If our last progress log was more than rate limit ago, bump the log entry forwards and mark that we should create a new one. + if VerboseActivities { + log.Printf("%d [PROGRESS] %v", am.ActivityID, fields) + } am.lastProgressLog = nil } } @@ -240,6 +252,9 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) { Timestamp: now, Expected: fields, }) + if VerboseActivities { + log.Printf("%d [EXPECTED] %v", am.ActivityID, fields) + } } am.mu.Unlock() } @@ -401,6 +416,10 @@ func (al *ActivityLogger) StartActivity(at ActivityType, am ActivityMeta) Activi al.activities[am.ActivityID] = a al.mu.Unlock() + if VerboseActivities { + log.Printf("%d [START] %s", am.ActivityID, at) + } + return a } @@ -429,6 +448,10 @@ func (al *ActivityLogger) EndActivity(a Activity) { return } + if VerboseActivities { + log.Printf("%d [END] %s", a.Meta().ActivityID, a.ActivityType()) + } + al.mu.Lock() al.logs = append(al.logs, ActivityMetaLog{ Timestamp: time.Now(), diff --git a/go/nix/nixstore/remotestore.go b/go/nix/nixstore/remotestore.go index 46e42c406a..3eb6e74ae8 100644 --- a/go/nix/nixstore/remotestore.go +++ b/go/nix/nixstore/remotestore.go @@ -36,6 +36,12 @@ type Daemon struct { err error } +type PathNotValidError struct{ Path string } + +func (err PathNotValidError) Error() string { + return fmt.Sprintf("path %q is not valid", err.Path) +} + func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { d.mu.Lock() defer d.mu.Unlock() @@ -57,7 +63,7 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { } valid := validInt == uint64(1) if !valid { - return nil, fmt.Errorf("path %q is not valid", storePath) + return nil, PathNotValidError{storePath} } ni := &narinfo.NarInfo{ @@ -156,6 +162,15 @@ func (d *Daemon) readFields() ([]any, error) { return fields, nil } +type NixError struct { + Code uint64 + Message string +} + +func (ne NixError) Error() string { + return fmt.Sprintf("nix error %d: %v", ne.Code, ne.Message) +} + func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Reader) error { for { msg, err := d.r.ReadUint64() @@ -205,7 +220,7 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re return fmt.Errorf("STDERR_ERROR reading uint64: %w", err) } al.AddError(status, errStr) - return fmt.Errorf("error code %d: %v", status, errStr) + return NixError{status, errStr} case 0x6f6c6d67: // STDERR_NEXT msg, err := d.r.ReadString() if err != nil {