From 3ab9b43f72a89c2951e809e6760603aba86823af Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Wed, 23 Aug 2023 23:00:44 +0000 Subject: [PATCH] go/nix: implement more of nixbuild --- go.work | 3 + go.work.sum | 4 + go/go.mod | 8 +- go/go.sum | 9 + go/nix/bnix/bnix.go | 230 +------- go/nix/bnix/default.nix | 3 + go/nix/bnixbuild/bnixbuild.go | 170 ++++++ go/nix/bnixbuild/default.nix | 16 + go/nix/default.nix | 6 +- go/nix/nar/default.nix | 4 +- go/nix/nar/inmemoryfs.go | 59 ++ go/nix/nar/narinfo/narinfo.go | 41 +- go/nix/nar/{nar.go => narpacker.go} | 3 +- go/nix/nar/{nar_test.go => narpacker_test.go} | 3 +- go/nix/nar/narunpacker.go | 181 ++++++ go/nix/nixbuild/config.go | 72 +++ go/nix/nixbuild/coordinator.go | 78 +++ go/nix/nixbuild/default.nix | 23 + go/nix/nixbuild/httpresolver.go | 106 ++++ go/nix/nixbuild/nixbuild.go | 5 + go/nix/nixbuild/peerresolver.go | 179 ++++++ go/nix/nixbuild/peerresolver_test.go | 62 ++ go/nix/nixbuild/state.go | 67 +++ go/nix/nixbuild/workitem.go | 554 ++++++++++++++++++ go/nix/nixdrv/default.nix | 1 + go/nix/nixdrv/localfs.go | 3 +- go/nix/nixdrv/nixdrv.go | 131 ++++- go/nix/nixdrv/nixdrvhash_test.go | 27 + go/nix/nixhash/default.nix | 15 + go/nix/nixhash/nixhash.go | 32 + go/nix/nixpool/default.nix | 19 + go/nix/nixpool/dialer.go | 134 +++++ go/nix/nixpool/nixpool.go | 119 ++++ go/nix/nixstore/activities.go | 2 +- go/nix/nixstore/default.nix | 2 + go/nix/nixstore/remotestore.go | 458 ++++++++++++++- .../gopkgs/golang.org/x/crypto/default.nix | 4 +- 37 files changed, 2581 insertions(+), 252 deletions(-) create mode 100644 go.work create mode 100644 go.work.sum create mode 100644 go/nix/bnixbuild/bnixbuild.go create mode 100644 go/nix/bnixbuild/default.nix create mode 100644 go/nix/nar/inmemoryfs.go rename go/nix/nar/{nar.go => narpacker.go} (98%) rename go/nix/nar/{nar_test.go => narpacker_test.go} (95%) create mode 100644 go/nix/nar/narunpacker.go create mode 100644 go/nix/nixbuild/config.go create mode 100644 go/nix/nixbuild/coordinator.go create mode 100644 go/nix/nixbuild/default.nix create mode 100644 go/nix/nixbuild/httpresolver.go create mode 100644 go/nix/nixbuild/nixbuild.go create mode 100644 go/nix/nixbuild/peerresolver.go create mode 100644 go/nix/nixbuild/peerresolver_test.go create mode 100644 go/nix/nixbuild/state.go create mode 100644 go/nix/nixbuild/workitem.go create mode 100644 go/nix/nixdrv/nixdrvhash_test.go create mode 100644 go/nix/nixhash/default.nix create mode 100644 go/nix/nixhash/nixhash.go create mode 100644 go/nix/nixpool/default.nix create mode 100644 go/nix/nixpool/dialer.go create mode 100644 go/nix/nixpool/nixpool.go diff --git a/go.work b/go.work new file mode 100644 index 0000000000..09dd41b74a --- /dev/null +++ b/go.work @@ -0,0 +1,3 @@ +go 1.20 + +use ./go diff --git a/go.work.sum b/go.work.sum new file mode 100644 index 0000000000..4efa1e92da --- /dev/null +++ b/go.work.sum @@ -0,0 +1,4 @@ +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= diff --git a/go/go.mod b/go/go.mod index 6e2f06320d..060bb4b641 100644 --- a/go/go.mod +++ b/go/go.mod @@ -23,10 +23,10 @@ require ( github.com/pomerium/sdk-go v0.0.5 github.com/prometheus/client_golang v1.12.1 gocloud.dev v0.24.0 - golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 + golang.org/x/crypto v0.12.0 golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 + golang.org/x/sys v0.11.0 ) require ( @@ -96,8 +96,8 @@ require ( github.com/ryanuber/go-glob v1.0.0 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/text v0.12.0 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/api v0.56.0 // indirect diff --git a/go/go.sum b/go/go.sum index 7700f5a14b..00d0121a28 100644 --- a/go/go.sum +++ b/go/go.sum @@ -651,6 +651,8 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -733,6 +735,8 @@ golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw= golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -831,9 +835,12 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -844,6 +851,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/go/nix/bnix/bnix.go b/go/nix/bnix/bnix.go index 4a33ddfe86..15afa68765 100644 --- a/go/nix/bnix/bnix.go +++ b/go/nix/bnix/bnix.go @@ -5,17 +5,13 @@ import ( "errors" "flag" "fmt" - "io" "log" - "math/rand" - "net/url" "os" "sync" - "time" - "golang.org/x/crypto/ssh" "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixpool" "hg.lukegb.com/lukegb/depot/go/nix/nixstore" ) @@ -54,84 +50,6 @@ func tryEnsure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTra return false, err } -type connectionPool struct { - factory func() (*nixstore.Daemon, error) - - cond *sync.Cond - limit int - generated int - available []*nixstore.Daemon -} - -func newConnectionPool(ctx context.Context, dst string, limit int) *connectionPool { - return &connectionPool{ - factory: func() (*nixstore.Daemon, error) { - var d *nixstore.Daemon - var err error - retryInterval := 10 * time.Millisecond - maxRetry := 10 * time.Second - for n := 0; n < 20; n++ { - d, _, err = connectTo(ctx, dst) - if err == nil { - break - } - time.Sleep(retryInterval + time.Duration((rand.Float64()-0.5)*float64(retryInterval))) - retryInterval = retryInterval * 2 - if retryInterval > maxRetry { - retryInterval = maxRetry - } - } - return d, err - }, - - cond: sync.NewCond(new(sync.Mutex)), - limit: limit, - } -} - -func (p *connectionPool) getFromAvailableLocked() *nixstore.Daemon { - end := len(p.available) - 1 - d := p.available[end] - p.available = p.available[:end] - return d -} - -func (p *connectionPool) Get() (*nixstore.Daemon, error) { - p.cond.L.Lock() - if len(p.available) > 0 { - d := p.getFromAvailableLocked() - p.cond.L.Unlock() - return d, nil - } - - if p.generated < p.limit { - p.generated++ - p.cond.L.Unlock() - d, err := p.factory() - if err != nil { - p.cond.L.Lock() - p.generated-- - p.cond.L.Unlock() - return nil, err - } - return d, nil - } - - for len(p.available) == 0 { - p.cond.Wait() - } - d := p.getFromAvailableLocked() - p.cond.L.Unlock() - return d, nil -} - -func (p *connectionPool) Put(d *nixstore.Daemon) { - p.cond.L.Lock() - p.available = append(p.available, d) - p.cond.Signal() - p.cond.L.Unlock() -} - type workState int const ( @@ -167,9 +85,9 @@ type workItem struct { narInfo *narinfo.NarInfo childWork []*workItem - srcPool *connectionPool + srcPool *nixpool.Pool srcAT *nixstore.ActivityTracker - dstPool *connectionPool + dstPool *nixpool.Pool dstAT *nixstore.ActivityTracker wh *workHolder @@ -309,9 +227,9 @@ type workHolder struct { mu sync.Mutex work map[string]*workItem - srcPool *connectionPool + srcPool *nixpool.Pool srcAT *nixstore.ActivityTracker - dstPool *connectionPool + dstPool *nixpool.Pool dstAT *nixstore.ActivityTracker } @@ -339,7 +257,7 @@ func (wh *workHolder) addWork(ctx context.Context, path string) *workItem { return i } -func copyPath(ctx context.Context, poolFrom *connectionPool, atFrom *nixstore.ActivityTracker, poolTo *connectionPool, atTo *nixstore.ActivityTracker, path string) error { +func copyPath(ctx context.Context, poolFrom *nixpool.Pool, atFrom *nixstore.ActivityTracker, poolTo *nixpool.Pool, atTo *nixstore.ActivityTracker, path string) error { wh := workHolder{ work: make(map[string]*workItem), srcPool: poolFrom, @@ -355,7 +273,7 @@ func copyPath(ctx context.Context, poolFrom *connectionPool, atFrom *nixstore.Ac var rs nixdrv.Resolver func loadDerivation(ctx context.Context, path string) (*nixdrv.Derivation, error) { - return rs.LoadDerivation(path) + return rs.LoadDerivation(ctx, path) } func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error { @@ -364,7 +282,7 @@ func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.Activ return fmt.Errorf("loading derivation: %w", err) } - basicDrv, err := drv.ToBasicDerivation(rs) + basicDrv, err := drv.ToBasicDerivation(ctx, rs) if err != nil { return fmt.Errorf("resolving: %w", err) } @@ -377,115 +295,6 @@ func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.Activ return nil } -func connectTo(ctx context.Context, remote string) (*nixstore.Daemon, nixdrv.Resolver, error) { - u, err := url.Parse(remote) - if err != nil { - return nil, nil, fmt.Errorf("parsing remote %q as URL: %w", remote, err) - } - - switch u.Scheme { - case "unix": - if u.Path == "" { - u.Path = nixstore.DaemonSock - } - d, err := nixstore.OpenDaemon(u.Path) - if err != nil { - return nil, nil, err - } - return d, nixdrv.LocalFSResolver{}, nil - case "ssh-ng": - // Construct a ClientConfig from the URL. - cfg := &ssh.ClientConfig{} - if u.Query().Has("privkey") { - var keys []ssh.Signer - for _, privkeyPath := range u.Query()["privkey"] { - privkeyF, err := os.Open(privkeyPath) - if err != nil { - return nil, nil, fmt.Errorf("opening privkey %q: %w", privkeyPath, err) - } - defer privkeyF.Close() - privkeyB, err := io.ReadAll(privkeyF) - if err != nil { - return nil, nil, fmt.Errorf("reading privkey %q: %w", privkeyPath, err) - } - - privkey, err := ssh.ParsePrivateKey(privkeyB) - if err != nil { - return nil, nil, fmt.Errorf("parsing privkey %q: %w", privkeyPath, err) - } - keys = append(keys, privkey) - } - cfg.Auth = append(cfg.Auth, ssh.PublicKeys(keys...)) - } - if u.User != nil { - cfg.User = u.User.Username() - if pw, ok := u.User.Password(); ok { - cfg.Auth = append(cfg.Auth, ssh.Password(pw)) - } - } - switch { - case u.Query().Has("host-key"): - hkStr := u.Query().Get("host-key") - _, _, hk, _, _, err := ssh.ParseKnownHosts(append([]byte("x "), []byte(hkStr)...)) - if err != nil { - return nil, nil, fmt.Errorf("parsing host-key %q: %w", hkStr, err) - } - cfg.HostKeyCallback = ssh.FixedHostKey(hk) - case u.Query().Has("insecure-allow-any-ssh-host-key"): - cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey() - default: - return nil, nil, fmt.Errorf("some SSH host key configuration is required (?host-key=; ?insecure-allow-any-ssh-host-key)") - } - - // Work out other misc parameters. - // ...remote command. - remoteCmd := "nix-daemon --stdio" - if u.Query().Has("remote-cmd") { - remoteCmd = u.Query().Get("remote-cmd") - } - - // Work out the host:port to connect to. - remote := u.Hostname() - if portStr := u.Port(); portStr != "" { - remote = remote + ":" + portStr - } else { - remote = remote + ":22" - } - - conn, err := ssh.Dial("tcp", remote, cfg) - if err != nil { - return nil, nil, fmt.Errorf("dialing %v via SSH: %w", remote, err) - } - sess, err := conn.NewSession() - if err != nil { - conn.Close() - return nil, nil, fmt.Errorf("opening SSH session to %v: %w", remote, err) - } - stdin, err := sess.StdinPipe() - if err != nil { - conn.Close() - return nil, nil, fmt.Errorf("opening stdin pipe: %w", err) - } - stdout, err := sess.StdoutPipe() - if err != nil { - conn.Close() - return nil, nil, fmt.Errorf("opening stdout pipe: %w", err) - } - if err := sess.Start(remoteCmd); err != nil { - conn.Close() - return nil, nil, fmt.Errorf("starting %q: %w", remoteCmd, err) - } - d, err := nixstore.OpenDaemonWithIOs(stdout, stdin, conn) - if err != nil { - conn.Close() - return nil, nil, fmt.Errorf("establishing connection to daemon: %w", err) - } - return d, nixdrv.LocalFSResolver{}, nil // TODO: change from LocalFSResolver? - default: - return nil, nil, fmt.Errorf("unknown remote %q", remote) - } -} - func main() { flag.Parse() @@ -541,9 +350,18 @@ func main() { if flag.NArg() != 2 { badCall("`copy` needs a store path") } - poolSrc := newConnectionPool(ctx, *remoteFlag, *maxConnsFlag) + factorySrc, err := nixpool.DaemonDialer(ctx, *remoteFlag) + if err != nil { + log.Fatalf("creating dialer for src: %v", err) + } + poolSrc := nixpool.New(ctx, factorySrc, *maxConnsFlag) atSrc := nixstore.NewActivityTracker() - poolDst := newConnectionPool(ctx, *remote2Flag, *maxConnsFlag) + + factoryDst, err := nixpool.DaemonDialer(ctx, *remote2Flag) + if err != nil { + log.Fatalf("creating dialer for dst: %v", err) + } + poolDst := nixpool.New(ctx, factoryDst, *maxConnsFlag) atDst := nixstore.NewActivityTracker() if err := copyPath(ctx, poolSrc, atSrc, poolDst, atDst, flag.Arg(1)); err != nil { @@ -556,12 +374,16 @@ func main() { at := nixstore.NewActivityTracker() - d, rss, err := connectTo(ctx, *remoteFlag) + factory, err := nixpool.DaemonDialer(ctx, *remoteFlag) if err != nil { - log.Fatalf("connectTo(%q): %v", *remoteFlag, err) + log.Fatalf("DaemonDialer(%q): %v", *remoteFlag, err) + } + d, err := factory() + if err != nil { + log.Fatalf("connecting to %q: %v", *remoteFlag, err) } defer d.Close() - rs = rss + rs = nixdrv.LocalFSResolver{} if err := cmd(ctx, d, at); err != nil { log.Fatalf("%s: %s", flag.Arg(0), err) diff --git a/go/nix/bnix/default.nix b/go/nix/bnix/default.nix index d9272a05ad..0f1d130b3e 100644 --- a/go/nix/bnix/default.nix +++ b/go/nix/bnix/default.nix @@ -9,6 +9,9 @@ depot.third_party.buildGo.program { ./bnix.go ]; deps = with depot; [ + go.nix.nar.narinfo + go.nix.nixdrv go.nix.nixstore + go.nix.nixpool ]; } diff --git a/go/nix/bnixbuild/bnixbuild.go b/go/nix/bnixbuild/bnixbuild.go new file mode 100644 index 0000000000..fc0540e920 --- /dev/null +++ b/go/nix/bnixbuild/bnixbuild.go @@ -0,0 +1,170 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "flag" + "log" + "strings" + "sync" + + "hg.lukegb.com/lukegb/depot/go/nix/nixbuild" + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixpool" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +type remoteDefinition struct { + URL string + PermittedResolutions int + PermittedBuilds int + SupportedPlatforms map[string]bool +} + +var ( + debugFlag = flag.Bool("b", false, "debug") +) + +func bmain() { + ctx := context.Background() + remote := remoteDefinition{ + URL: "ssh-ng://lukegb@eu.nixbuild.net?insecure-allow-any-ssh-host-key=true&privkey=/var/lib/secrets/id_ed25519_nixbuild/secret", + PermittedResolutions: 100, + PermittedBuilds: 100, + SupportedPlatforms: map[string]bool{"x86_64-linux": true, "aarch64-linux": true}, + } + local := remoteDefinition{ + URL: "unix://", + PermittedResolutions: 100, + PermittedBuilds: 100, + } + // d = remoteDefinition{ + // URL: "ssh-ng://lukegb@whitby.tvl.fyi?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519", + // PermittedResolutions: 64, + // PermittedBuilds: 64, + // SupportedPlatforms: map[string]bool{"x86_64-linux": true}, + // } + remoteFactory, err := nixpool.DaemonDialer(ctx, remote.URL) + if err != nil { + log.Fatalf("creating dialer for %v: %v", remote.URL, err) + } + remoteConn, err := remoteFactory() + if err != nil { + log.Fatalf("dialing %v: %v", remote.URL, err) + } + defer remoteConn.Close() + + localFactory, err := nixpool.DaemonDialer(ctx, local.URL) + if err != nil { + log.Fatalf("creating dialer for %v: %v", local.URL, err) + } + localConn, err := localFactory() + if err != nil { + log.Fatalf("dialing %v: %v", local.URL, err) + } + defer localConn.Close() + + const storePath = "/nix/store/pl5izkcgln1kvy8hv1850888my0b80qs-golib-golang.org_x_crypto_ed25519" + ni, err := localConn.NARInfo(storePath) + if err != nil { + log.Fatalf("localConn.NARInfo(%q): %v", storePath, err) + } + log.Println(ni, err) + + rc, err := localConn.NARFromPath(storePath) + if err != nil { + log.Fatalf("localConn.NARFromPath(%q): %v", storePath, err) + } + + if err := remoteConn.AddToStoreNar(ni, rc); err != nil { + log.Fatalf("remoteConn.AddToStoreNar: %v", err) + } + rc.Close() +} + +func main() { + flag.Parse() + if *debugFlag { + bmain() + } + + ctx := context.Background() + + remoteDefs := []remoteDefinition{{ + URL: "unix://", + PermittedResolutions: 64, + PermittedBuilds: 0, + SupportedPlatforms: map[string]bool{"x86_64-linux": true}, + }, { + // URL: "ssh-ng://lukegb@whitby.tvl.fyi?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519", + // PermittedResolutions: 64, + // PermittedBuilds: 64, + // SupportedPlatforms: map[string]bool{"x86_64-linux": true}, + // }, { + URL: "ssh-ng://lukegb@eu.nixbuild.net?insecure-allow-any-ssh-host-key=true&privkey=/var/lib/secrets/id_ed25519_nixbuild/secret", + PermittedResolutions: 100, + PermittedBuilds: 100, + SupportedPlatforms: map[string]bool{"x86_64-linux": true, "aarch64-linux": true}, + }} + var builders []*nixbuild.Builder + var resolvers []nixbuild.Resolver + for _, d := range remoteDefs { + factory, err := nixpool.DaemonDialer(ctx, d.URL) + if err != nil { + log.Fatalf("creating dialer for %v: %w", d.URL, err) + } + if d.PermittedResolutions > 0 { + resolverPool := nixpool.New(ctx, factory, d.PermittedResolutions) + resolvers = append(resolvers, nixbuild.PeerResolver{PeerFetcher: nixbuild.PeerFetcher{Pool: resolverPool}}) + } + if d.PermittedBuilds > 0 { + builderPool := nixpool.New(ctx, factory, d.PermittedBuilds) + builders = append(builders, &nixbuild.Builder{Pool: builderPool, SupportedPlatforms: d.SupportedPlatforms}) + } + } + target := &nixbuild.PeerTarget{ + PeerFetcher: resolvers[0].(nixbuild.PeerResolver).PeerFetcher, + ActivityTracker: nixstore.NewActivityTracker(), + } + + cfg := nixbuild.Config{ + Target: target, + ResolveOnTarget: true, + Resolvers: resolvers, + Builders: builders, + ResolveDependenciesOnBuilders: false, + } + at := nixstore.NewActivityTracker() + coord := nixbuild.NewCoordinator(cfg, at) + + localResolver := nixdrv.LocalFSResolver{} + + var wg sync.WaitGroup + for _, arg := range flag.Args() { + arg := arg + wg.Add(1) + go func() { + defer wg.Done() + var wi *nixbuild.WorkItem + if strings.HasSuffix(arg, ".drv") { + drv, err := localResolver.LoadDerivation(ctx, arg) + if err != nil { + log.Fatalf("LoadDerivation(%q): %v", arg, err) + } + bd, err := drv.ToBasicDerivation(ctx, localResolver) + if err != nil { + log.Fatalf("drv.ToBasicDerivation(): %v", err) + } + + wi = coord.AddDerivationWork(ctx, bd, arg) + } else { + wi = coord.AddWork(ctx, arg) + } + <-wi.Done() + }() + } + wg.Wait() +} diff --git a/go/nix/bnixbuild/default.nix b/go/nix/bnixbuild/default.nix new file mode 100644 index 0000000000..1605335dbb --- /dev/null +++ b/go/nix/bnixbuild/default.nix @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: 2023 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.program { + name = "bnixbuild"; + srcs = [ + ./bnixbuild.go + ]; + deps = with depot; [ + go.nix.nixbuild + go.nix.nixpool + go.nix.nixstore + ]; +} diff --git a/go/nix/default.nix b/go/nix/default.nix index dc2a79a382..494bbb66da 100644 --- a/go/nix/default.nix +++ b/go/nix/default.nix @@ -1,14 +1,18 @@ -# SPDX-FileCopyrightText: 2020 Luke Granger-Brown +# SPDX-FileCopyrightText: 2023 Luke Granger-Brown # # SPDX-License-Identifier: Apache-2.0 args: { nar = import ./nar args; + nixbuild = import ./nixbuild args; nixdrv = import ./nixdrv args; + nixhash = import ./nixhash args; + nixpool = import ./nixpool args; nixstore = import ./nixstore args; nixwire = import ./nixwire args; bcachegc = import ./bcachegc args; bcacheup = import ./bcacheup args; bnix = import ./bnix args; + bnixbuild = import ./bnixbuild args; } diff --git a/go/nix/nar/default.nix b/go/nix/nar/default.nix index b022b5871f..70bf02d05e 100644 --- a/go/nix/nar/default.nix +++ b/go/nix/nar/default.nix @@ -7,8 +7,10 @@ name = "nar"; path = "hg.lukegb.com/lukegb/depot/go/nix/nar"; srcs = [ - ./nar.go ./dirfs.go + ./inmemoryfs.go + ./narpacker.go + ./narunpacker.go ]; deps = with depot; [ go.nix.nixwire diff --git a/go/nix/nar/inmemoryfs.go b/go/nix/nar/inmemoryfs.go new file mode 100644 index 0000000000..23fc6c146f --- /dev/null +++ b/go/nix/nar/inmemoryfs.go @@ -0,0 +1,59 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nar + +import "os" + +type InmemoryDirent struct { + SubFS *InmemoryFS + + Content []byte + Executable bool + + Target string +} + +func (dent *InmemoryDirent) Close() error { return nil } +func (dent *InmemoryDirent) Write(bs []byte) (int, error) { + dent.Content = append(dent.Content, bs...) + return len(bs), nil +} +func (dent *InmemoryDirent) MakeExecutable() error { + dent.Executable = true + return nil +} + +type InmemoryFS struct { + Dirent map[string]*InmemoryDirent +} + +func (fs *InmemoryFS) Create(name string) (WriteFile, error) { + if _, ok := fs.Dirent[name]; ok { + return nil, os.ErrExist + } + dent := &InmemoryDirent{} + fs.Dirent[name] = dent + return dent, nil +} +func (fs *InmemoryFS) Symlink(name, target string) error { + if _, ok := fs.Dirent[name]; ok { + return os.ErrExist + } + dent := &InmemoryDirent{Target: target} + fs.Dirent[name] = dent + return nil +} +func (fs *InmemoryFS) Mkdir(name string) (WriteFS, error) { + if _, ok := fs.Dirent[name]; ok { + return nil, os.ErrExist + } + dent := &InmemoryDirent{SubFS: NewInmemoryFS()} + fs.Dirent[name] = dent + return dent.SubFS, nil +} + +func NewInmemoryFS() *InmemoryFS { + return &InmemoryFS{Dirent: make(map[string]*InmemoryDirent)} +} diff --git a/go/nix/nar/narinfo/narinfo.go b/go/nix/nar/narinfo/narinfo.go index e780398885..2c41578eab 100644 --- a/go/nix/nar/narinfo/narinfo.go +++ b/go/nix/nar/narinfo/narinfo.go @@ -52,6 +52,22 @@ func (c CompressionMethod) String() string { return "!!unknown!!" } +func (c CompressionMethod) FileSuffix() (string, error) { + switch c { + case CompressionNone: + return "", nil + case CompressionXz: + return ".xz", nil + case CompressionBzip2: + return ".bz2", nil + case CompressionGzip: + return ".gz", nil + case CompressionBrotli: + return ".br", nil + } + return "", fmt.Errorf("bad compression method %d", int(c)) +} + func compressionFromString(s string) CompressionMethod { switch s { case "none", "": @@ -210,6 +226,20 @@ type NarInfo struct { CA string } +func (ni NarInfo) Sigs() []string { + sigKeys := make([]string, 0, len(ni.Sig)) + for k := range ni.Sig { + sigKeys = append(sigKeys, k) + } + sort.Strings(sigKeys) + sigs := make([]string, 0, len(sigKeys)) + for _, k := range sigKeys { + v := ni.Sig[k] + sigs = append(sigs, fmt.Sprintf("%s:%s", k, base64.StdEncoding.EncodeToString(v))) + } + return sigs +} + func (ni NarInfo) WriteTo(w io.Writer) error { if ni.StorePath != "" { if _, err := fmt.Fprintf(w, "StorePath: %s\n", ni.StorePath); err != nil { @@ -262,14 +292,9 @@ func (ni NarInfo) WriteTo(w io.Writer) error { } } if len(ni.Sig) != 0 { - sigKeys := make([]string, 0, len(ni.Sig)) - for k := range ni.Sig { - sigKeys = append(sigKeys, k) - } - sort.Strings(sigKeys) - for _, k := range sigKeys { - v := ni.Sig[k] - if _, err := fmt.Fprintf(w, "Sig: %s:%s\n", k, base64.StdEncoding.EncodeToString(v)); err != nil { + sigs := ni.Sigs() + for _, sig := range sigs { + if _, err := fmt.Fprintf(w, "Sig: %s\n", sig); err != nil { return err } } diff --git a/go/nix/nar/nar.go b/go/nix/nar/narpacker.go similarity index 98% rename from go/nix/nar/nar.go rename to go/nix/nar/narpacker.go index 8a35fdb0ff..5e0dd1c923 100644 --- a/go/nix/nar/nar.go +++ b/go/nix/nar/narpacker.go @@ -18,7 +18,6 @@ type FS interface { } func packFile(sw nixwire.Serializer, root FS, fn string, stat fs.FileInfo) (int64, error) { - var nSoFar int64 write := func(data ...any) (int64, error) { for _, datum := range data { @@ -146,7 +145,7 @@ func packFile(sw nixwire.Serializer, root FS, fn string, stat fs.FileInfo) (int6 } func Pack(w io.Writer, fs FS, fn string) (int64, error) { - sw := nixwire.Serializer{w} + sw := nixwire.Serializer{Writer: w} n, err := sw.WriteString("nix-archive-1") if err != nil { diff --git a/go/nix/nar/nar_test.go b/go/nix/nar/narpacker_test.go similarity index 95% rename from go/nix/nar/nar_test.go rename to go/nix/nar/narpacker_test.go index d3d8c39dc7..e08f14f666 100644 --- a/go/nix/nar/nar_test.go +++ b/go/nix/nar/narpacker_test.go @@ -7,12 +7,13 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "hg.lukegb.com/lukegb/depot/go/nix/nixwire" ) func TestHeader(t *testing.T) { var b bytes.Buffer - sw := serializeWriter{&b} + sw := nixwire.Serializer{Writer: &b} wrote, err := sw.WriteString("nix-archive-1") if err != nil { t.Fatalf("WriteString: %v", err) diff --git a/go/nix/nar/narunpacker.go b/go/nix/nar/narunpacker.go new file mode 100644 index 0000000000..4caa2d30bc --- /dev/null +++ b/go/nix/nar/narunpacker.go @@ -0,0 +1,181 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nar + +import ( + "bytes" + "fmt" + "io" + "strings" + + "hg.lukegb.com/lukegb/depot/go/nix/nixwire" +) + +type WriteFile interface { + io.WriteCloser + MakeExecutable() error +} + +type WriteFS interface { + Create(name string) (WriteFile, error) + Mkdir(name string) (WriteFS, error) + Symlink(name, target string) error +} + +type NullFS struct{} + +func (NullFS) Close() error { return nil } +func (NullFS) MakeExecutable() error { return nil } +func (NullFS) Write(p []byte) (n int, err error) { return len(p), nil } +func (f NullFS) Create(name string) (WriteFile, error) { return f, nil } +func (f NullFS) Mkdir(name string) (WriteFS, error) { return f, nil } +func (NullFS) Symlink(name string, target string) error { return nil } + +func unpackFile(dw nixwire.Deserializer, fs WriteFS, fn string) error { + s, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading file tuple open: %w", err) + } + if s != "(" { + return fmt.Errorf("expected file tuple open '('; got %q", s) + } + + var fileType string + var regularFile WriteFile + var directoryFS WriteFS + var directoryLastName string +parseLoop: + for { + s, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading next file tuple part: %w", err) + } + switch { + case s == ")": + break parseLoop + case s == "type": + if fileType != "" { + return fmt.Errorf("multiple file types for %s", fileType) + } + + t, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading file type: %w", err) + } + switch t { + case "regular": + regularFile, err = fs.Create(fn) + if err != nil { + return fmt.Errorf("creating file %v: %w", fn, err) + } + case "directory": + directoryFS, err = fs.Mkdir(fn) + if err != nil { + return fmt.Errorf("creating directory %v: %w", fn, err) + } + case "symlink": + // Do nothing until we have a target. + default: + return fmt.Errorf("bad file type %q; want regular/directory/symlink", t) + } + fileType = t + case s == "contents" && fileType == "regular": + size, err := dw.ReadUint64() + if err != nil { + return fmt.Errorf("reading file size: %w", err) + } + if _, err := io.CopyN(regularFile, dw.Reader, int64(size)); err != nil { + return fmt.Errorf("reading file content: %w", err) + } + if err := dw.ReadPadding(size); err != nil { + return fmt.Errorf("reading file padding: %w", err) + } + case s == "executable" && fileType == "regular": + v, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading executable emptystring: %w", err) + } + if v != "" { + return fmt.Errorf("executable emptystring is not empty: %v", v) + } + if err := regularFile.MakeExecutable(); err != nil { + return fmt.Errorf("making target file %v executable: %w", fn, err) + } + case s == "entry" && fileType == "directory": + // TODO + dirOpen, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading dirent open: %w", err) + } + if dirOpen != "(" { + return fmt.Errorf("dirent open was %q; wanted (", dirOpen) + } + var name string + direntLoop: + for { + s, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading next dirent tuple part: %w", err) + } + switch s { + case ")": + break direntLoop + case "name": + name, err = dw.ReadString() + if err != nil { + return fmt.Errorf("reading dirent name: %w", err) + } + switch { + case name == "", name == ".", name == "..", strings.Contains(name, "/"), bytes.Contains([]byte(name), []byte{0}): + return fmt.Errorf("invalid file name %q", name) + case name <= directoryLastName: + return fmt.Errorf("NAR not sorted (%q came after %q)", name, directoryLastName) + } + directoryLastName = name + case "node": + if name == "" { + return fmt.Errorf("entry name must come before node") + } + if err := unpackFile(dw, directoryFS, name); err != nil { + return fmt.Errorf("%s: %w", fn, err) + } + default: + return fmt.Errorf("%s: unknown field %s", fn, s) + } + } + case s == "target" && fileType == "symlink": + target, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading symlink target: %w", err) + } + if err := fs.Symlink(fn, target); err != nil { + return fmt.Errorf("creating symlink %q to %q: %w", fn, target, err) + } + default: + return fmt.Errorf("unknown field %v (filetype %q, filename %q)", s, fileType, fn) + } + } + + if regularFile != nil { + if err := regularFile.Close(); err != nil { + return fmt.Errorf("closing file: %w", err) + } + } + return nil +} + +func Unpack(r io.Reader, fs WriteFS, fn string) error { + dw := nixwire.Deserializer{Reader: r} + + s, err := dw.ReadString() + if err != nil { + return fmt.Errorf("reading magic: %w", err) + } + if want := "nix-archive-1"; s != want { + return fmt.Errorf("invalid NAR magic %q (wanted %q)", s, want) + } + + return unpackFile(dw, fs, fn) +} diff --git a/go/nix/nixbuild/config.go b/go/nix/nixbuild/config.go new file mode 100644 index 0000000000..8843087a4c --- /dev/null +++ b/go/nix/nixbuild/config.go @@ -0,0 +1,72 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild + +import ( + "context" + "io" + "strings" + + "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixpool" +) + +type Fetcher interface { + FetchNarInfo(ctx context.Context, path string) (*narinfo.NarInfo, error) + FetchNar(ctx context.Context, ni *narinfo.NarInfo) (io.ReadCloser, error) +} + +type DerivationFetcher interface { + nixdrv.Resolver + FetchDerivationForPath(ctx context.Context, path string) (*nixdrv.BasicDerivation, string, error) +} + +type Resolver interface { + Fetcher + RelativePriority() int +} + +type Target interface { + ValidPaths(ctx context.Context, paths []string) ([]string, error) + EnsurePaths(ctx context.Context, paths []string) ([]string, error) + PutNar(ctx context.Context, ni *narinfo.NarInfo, w io.Reader) error + Fetcher +} + +type Builder struct { + Pool *nixpool.Pool + SupportedPlatforms map[string]bool +} + +type Config struct { + // Target is the endpoint which should end up having all the built packages. + // If Target is nil, we will build things if necessary and leave them on the builders. Things may end up in a weird state if Target is nil and the Builders are not present in Resolvers, since built dependencies may not be available. + Target Target + + // Allow target to resolve packages on its own, from its locally configured caches? + ResolveOnTarget bool + + // Things to resolve packages from. + // You should probably put Target and Builders in this list. + Resolvers []Resolver + + // Peers to build packages on. + // If empty, building will be disabled. + Builders []*Builder + + // Allow builders to resolve dependencies on their own, from their locally configured caches? + ResolveDependenciesOnBuilders bool + + // Store path. Usually /nix/store, which is the default if empty. + StorePath string +} + +func (c Config) storePath() string { + if c.StorePath == "" { + return "/nix/store" + } + return strings.TrimSuffix(c.StorePath, "/") +} diff --git a/go/nix/nixbuild/coordinator.go b/go/nix/nixbuild/coordinator.go new file mode 100644 index 0000000000..256d603442 --- /dev/null +++ b/go/nix/nixbuild/coordinator.go @@ -0,0 +1,78 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild + +import ( + "context" + "sort" + "sync" + + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +type Coordinator struct { + cfg Config + + mu sync.Mutex + work map[string]*WorkItem + + at *nixstore.ActivityTracker +} + +func NewCoordinator(cfg Config, at *nixstore.ActivityTracker) *Coordinator { + return &Coordinator{ + cfg: cfg, + work: make(map[string]*WorkItem), + at: at, + } +} + +func (c *Coordinator) AddDerivationWork(ctx context.Context, drv *nixdrv.BasicDerivation, drvPath string) *WorkItem { + var outputNames []string + for o := range drv.Outputs { + outputNames = append(outputNames, o) + } + sort.Strings(outputNames) + + path := drv.Outputs[outputNames[0]].Path + + c.mu.Lock() + defer c.mu.Unlock() + if i := c.work[path]; i != nil { + return i + } + + i := &WorkItem{ + doneCh: make(chan struct{}), + path: path, + + coord: c, + at: c.at, + } + c.work[path] = i + go i.run(ctx) + return i +} + +func (c *Coordinator) AddWork(ctx context.Context, path string) *WorkItem { + c.mu.Lock() + defer c.mu.Unlock() + + if i := c.work[path]; i != nil { + return i + } + + i := &WorkItem{ + doneCh: make(chan struct{}), + path: path, + + coord: c, + at: c.at, + } + c.work[path] = i + go i.run(ctx) + return i +} diff --git a/go/nix/nixbuild/default.nix b/go/nix/nixbuild/default.nix new file mode 100644 index 0000000000..822e8190c6 --- /dev/null +++ b/go/nix/nixbuild/default.nix @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: 2023 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.package { + name = "nixbuild"; + path = "hg.lukegb.com/lukegb/depot/go/nix/nixbuild"; + srcs = [ + ./config.go + ./coordinator.go + ./httpresolver.go + ./nixbuild.go + ./peerresolver.go + ./state.go + ./workitem.go + ]; + deps = with depot; [ + go.nix.nixstore + go.nix.nixpool + go.nix.nar + ]; +} diff --git a/go/nix/nixbuild/httpresolver.go b/go/nix/nixbuild/httpresolver.go new file mode 100644 index 0000000000..793b3aad0d --- /dev/null +++ b/go/nix/nixbuild/httpresolver.go @@ -0,0 +1,106 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "regexp" + + "github.com/numtide/go-nix/nixbase32" + "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" +) + +var ( + hashExtractRegexp = regexp.MustCompile(`(^|/)([0-9a-df-np-sv-z]{32})([-.].*)?$`) +) + +func hashExtract(s string) string { + res := hashExtractRegexp.FindStringSubmatch(s) + if len(res) == 0 { + return "" + } + return res[2] +} + +func keyForPath(storePath string) (string, error) { + fileHash := hashExtract(storePath) + if fileHash == "" { + return "", fmt.Errorf("store path %v seems to be invalid: couldn't extract hash", storePath) + } + + return fmt.Sprintf("%s.narinfo", fileHash), nil +} + +type HTTPCacheResolver struct { + Endpoint string + HTTPClient *http.Client + Priority int +} + +var _ Resolver = ((*HTTPCacheResolver)(nil)) + +func (r *HTTPCacheResolver) RelativePriority() int { return r.Priority } + +func (r *HTTPCacheResolver) FetchNarInfo(ctx context.Context, path string) (*narinfo.NarInfo, error) { + key, err := keyForPath(path) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%v/%v", r.Endpoint, key), nil) + if err != nil { + return nil, fmt.Errorf("constructing request for %v/%v: %v", r.Endpoint, key, err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("making request for %v/%v: %v", r.Endpoint, key, err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusNotFound: + return nil, os.ErrNotExist + case http.StatusOK: + // continue with function + default: + return nil, fmt.Errorf("%v/%v: HTTP %v", r.Endpoint, key, resp.Status) + } + + return narinfo.ParseNarInfo(resp.Body) +} + +func (r *HTTPCacheResolver) FetchNar(ctx context.Context, ni *narinfo.NarInfo) (io.ReadCloser, error) { + compressionSuffix, err := ni.Compression.FileSuffix() + if err != nil { + return nil, fmt.Errorf("determining filename: %w", err) + } + key := fmt.Sprintf("nar/%s.nar%s", nixbase32.EncodeToString(ni.FileHash.Hash), compressionSuffix) + + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%v/%v", r.Endpoint, key), nil) + if err != nil { + return nil, fmt.Errorf("constructing request for %v/%v: %v", r.Endpoint, key, err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("making request for %v/%v: %v", r.Endpoint, key, err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + resp.Body.Close() + return nil, os.ErrNotExist + case http.StatusOK: + return resp.Body, nil + default: + resp.Body.Close() + return nil, fmt.Errorf("downloading %v/%v: HTTP status code %v", r.Endpoint, key, resp.Status) + } +} diff --git a/go/nix/nixbuild/nixbuild.go b/go/nix/nixbuild/nixbuild.go new file mode 100644 index 0000000000..db326ecce4 --- /dev/null +++ b/go/nix/nixbuild/nixbuild.go @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild diff --git a/go/nix/nixbuild/peerresolver.go b/go/nix/nixbuild/peerresolver.go new file mode 100644 index 0000000000..93071e0014 --- /dev/null +++ b/go/nix/nixbuild/peerresolver.go @@ -0,0 +1,179 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "path" + + "hg.lukegb.com/lukegb/depot/go/nix/nar" + "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixpool" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +type PeerFetcher struct { + Pool *nixpool.Pool +} + +var _ Fetcher = ((*PeerFetcher)(nil)) +var _ DerivationFetcher = ((*PeerFetcher)(nil)) + +func (r PeerFetcher) FetchNarInfo(ctx context.Context, path string) (*narinfo.NarInfo, error) { + d, err := r.Pool.Get() + if err != nil { + return nil, err + } + defer r.Pool.Put(d) + + return d.NARInfo(path) +} + +type poolReturningReadCloser struct { + io.ReadCloser + pool *nixpool.Pool + d *nixstore.Daemon +} + +func (prc *poolReturningReadCloser) Close() error { + err := prc.ReadCloser.Close() + prc.pool.Put(prc.d) + return err +} + +func (r PeerFetcher) FetchNar(ctx context.Context, ni *narinfo.NarInfo) (io.ReadCloser, error) { + d, err := r.Pool.Get() + if err != nil { + return nil, err + } + + rc, err := d.NARFromPath(ni.StorePath) + if err != nil { + return nil, err + } + return &poolReturningReadCloser{ + ReadCloser: rc, + pool: r.Pool, + d: d, + }, nil +} + +func (r PeerFetcher) FetchDerivationForPath(ctx context.Context, path string) (*nixdrv.BasicDerivation, string, error) { + d, err := r.Pool.Get() + if err != nil { + return nil, "", err + } + + derivers, err := d.QueryValidDerivers(path) + r.Pool.Put(d) + if err != nil { + return nil, "", err + } + if len(derivers) == 0 { + return nil, "", fmt.Errorf("don't know how to build %v", path) + } + + // Do we need to do something smarter than take the first derivation? + drvPath := derivers[0] + drv, err := r.LoadDerivation(ctx, drvPath) + if err != nil { + return nil, "", fmt.Errorf("fetching derivation %v for %v: %w", drvPath, path, err) + } + bd, err := drv.ToBasicDerivation(ctx, r) + if err != nil { + return nil, "", fmt.Errorf("converting %v to a basic derivation: %w", drvPath, err) + } + //drvBase := strings.TrimSuffix(drvPath, filepath.Ext(drvPath)) + return bd, drvPath, nil +} + +func (r PeerFetcher) LoadDerivation(ctx context.Context, drvPath string) (*nixdrv.Derivation, error) { + ni, err := r.FetchNarInfo(ctx, drvPath) + if err != nil { + return nil, fmt.Errorf("fetching narinfo for %v: %w", drvPath, err) + } + rc, err := r.FetchNar(ctx, ni) + if err != nil { + return nil, fmt.Errorf("fetching nar for %v: %w", drvPath, err) + } + defer rc.Close() + + imfs := nar.NewInmemoryFS() + fn := path.Base(ni.StorePath) + if err := nar.Unpack(rc, imfs, fn); err != nil { + return nil, fmt.Errorf("unpacking nar for %v: %w", drvPath, err) + } + dent, ok := imfs.Dirent[fn] + if !ok { + return nil, fmt.Errorf("unpacking nar for %v yielded no file", drvPath) + } else if dent.Content == nil { + return nil, fmt.Errorf("unpacking nar for %v yielded non-file", drvPath) + } + + return nixdrv.Load(bufio.NewReader(bytes.NewReader(dent.Content))) +} + +type PeerResolver struct { + PeerFetcher + Priority int +} + +var _ Resolver = ((*PeerResolver)(nil)) + +func (r PeerResolver) RelativePriority() int { return r.Priority } + +type PeerTarget struct { + PeerFetcher + ActivityTracker *nixstore.ActivityTracker +} + +var _ Target = ((*PeerTarget)(nil)) + +func (t *PeerTarget) EnsurePaths(ctx context.Context, paths []string) ([]string, error) { + d, err := t.Pool.Get() + if err != nil { + return nil, err + } + defer t.Pool.Put(d) + + var ensuredPaths []string + for _, p := range paths { + if err := d.EnsurePath(t.ActivityTracker, p); err != nil { + var nv nixstore.PathNotValidError + if !errors.As(err, &nv) { + return nil, fmt.Errorf("ensuring %v: %w", p, err) + } + } else { + ensuredPaths = append(ensuredPaths, p) + } + } + return ensuredPaths, nil +} + +func (t *PeerTarget) PutNar(ctx context.Context, ni *narinfo.NarInfo, w io.Reader) error { + d, err := t.Pool.Get() + if err != nil { + return err + } + defer t.Pool.Put(d) + + return d.AddToStoreNar(ni, w) +} + +func (t *PeerTarget) ValidPaths(ctx context.Context, paths []string) ([]string, error) { + d, err := t.Pool.Get() + if err != nil { + return nil, err + } + defer t.Pool.Put(d) + + return d.ValidPaths(paths) +} diff --git a/go/nix/nixbuild/peerresolver_test.go b/go/nix/nixbuild/peerresolver_test.go new file mode 100644 index 0000000000..4eeb6fdf09 --- /dev/null +++ b/go/nix/nixbuild/peerresolver_test.go @@ -0,0 +1,62 @@ +package nixbuild + +import ( + "context" + "testing" + + "hg.lukegb.com/lukegb/depot/go/nix/nixpool" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +// DELETE ME +const remote = "ssh-ng://lukegb@localhost?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519" + +// const storePath = "/nix/store/hx2qbqfjpv08w2aw6d5md6vnjwppzccb-golib-nixbuild" +const drvPath = "/nix/store/bi4dbkxdy94wk5qbqcq54rby0y0dql0j-ci.drv" + +func TestPeerResolver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + factory, err := nixpool.DaemonDialer(ctx, remote) + if err != nil { + t.Fatalf("DaemonDialer(%q): %v", remote, err) + } + pool := nixpool.New(ctx, factory, 1) + builderPool := nixpool.New(ctx, factory, 1) + d, err := factory() + if err != nil { + t.Fatalf("factory: %v", err) + } + d.Close() + + res := PeerResolver{PeerFetcher: PeerFetcher{pool}} + drv, err := res.LoadDerivation(ctx, drvPath) + if err != nil { + t.Fatalf("LoadDerivation(%q): %v", drvPath, err) + } + bd, err := drv.ToBasicDerivation(ctx, res) + if err != nil { + t.Fatalf("drv.ToBasicDerivation(): %v", err) + } + + at := nixstore.NewActivityTracker() + cfg := Config{ + Target: &PeerTarget{ + PeerFetcher: PeerFetcher{ + Pool: pool, + }, + ActivityTracker: at, + }, + ResolveOnTarget: true, + Resolvers: []Resolver{ + PeerResolver{PeerFetcher: PeerFetcher{pool}}, + }, + Builders: []*nixpool.Pool{builderPool}, + ResolveDependenciesOnBuilders: false, + } + coord := NewCoordinator(cfg, at) + //wi := coord.AddWork(ctx, storePath) + wi := coord.AddDerivationWork(ctx, bd, drvPath) + <-wi.doneCh +} diff --git a/go/nix/nixbuild/state.go b/go/nix/nixbuild/state.go new file mode 100644 index 0000000000..fbdcf3ecb7 --- /dev/null +++ b/go/nix/nixbuild/state.go @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild + +type State int + +const ( + // Does this path (or path(s)) exist on the target already? + StateCheckingOnTarget State = iota + StateDoneAlreadyExisted + + StateResolvingOnTarget + StateDoneResolvedOnTarget + + // Does this path exist in any configured resolvers? + StateCheckingResolvers + StateFetchingReferencesFromResolvers + StateFetchingFromResolver + StateDoneFromResolver + + // If we don't have the derivation for this path, fetch it. + // This state only matters if we only have the path and not the derivation. + StateLookingUpDerivation + StateChoosingBuilder + StateResolvingReferencesOnBuilder + StateCopyingReferencesToBuilder + StateBuildingDerivation + StateCopying + StateDoneBuilt + + StateFailed +) + +func (s State) String() string { + return map[State]string{ + StateCheckingOnTarget: "checking if already on target", + StateDoneAlreadyExisted: "done (already on target)", + + StateResolvingOnTarget: "resolving on target", + StateDoneResolvedOnTarget: "done (resolved on target)", + + StateCheckingResolvers: "checking if present on any resolvers", + StateFetchingReferencesFromResolvers: "copying references from resolvers", + StateFetchingFromResolver: "copying from resolver", + StateDoneFromResolver: "done (copied from resolver)", + + StateLookingUpDerivation: "looking up derivation", + StateChoosingBuilder: "picking a builder", + StateResolvingReferencesOnBuilder: "resolving build-time references on builder", + StateCopyingReferencesToBuilder: "copying build-time references to builder", + StateBuildingDerivation: "building", + StateCopying: "copying from builder", + StateDoneBuilt: "done (built)", + + StateFailed: "failed :(", + }[s] +} + +func (s State) Terminal() bool { + switch s { + case StateDoneAlreadyExisted, StateDoneResolvedOnTarget, StateDoneFromResolver, StateDoneBuilt, StateFailed: + return true + } + return false +} diff --git a/go/nix/nixbuild/workitem.go b/go/nix/nixbuild/workitem.go new file mode 100644 index 0000000000..1e8a32d2af --- /dev/null +++ b/go/nix/nixbuild/workitem.go @@ -0,0 +1,554 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixbuild + +import ( + "context" + "errors" + "fmt" + "log" + "math/rand" + "path" + "sort" + "sync" + + "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +type WorkItem struct { + doneCh chan struct{} + state State + + // Either (path) or (drv, drvPath) must be set. + path string + drv *nixdrv.BasicDerivation + drvPath string + + err error + cleanup []func() + + // StateFetchingReferencesFromResolvers, StateFetchingFromResolver, StateDoneFromResolver + resolver Resolver + narInfos []*narinfo.NarInfo + + // StateFetchingReferencesFromResolvers, StateCopyingReferencesToBuilder + childWork []*WorkItem + + // StateChoosingBuilder, StateResolvingReferencesOnBuilder, StateCopyingReferencesToBuilder, StateBuildingDerivation, StateCopying, StateDoneBuilt + builder *Builder + builderDaemon *nixstore.Daemon + builderNeededReferences []string + + coord *Coordinator + at *nixstore.ActivityTracker +} + +func (i *WorkItem) paths() []string { + if i.path != "" { + return []string{i.path} + } + os := make([]string, 0, len(i.drv.Outputs)) + for _, o := range i.drv.Outputs { + os = append(os, o.Path) + } + return os +} + +func (i *WorkItem) checkOnTarget(ctx context.Context) (State, error) { + // i.state == StateCheckingOnTarget + if i.coord.cfg.Target == nil { + return StateResolvingOnTarget, nil + } + + wantPaths := i.paths() + gotPaths, err := i.coord.cfg.Target.ValidPaths(ctx, wantPaths) + if err != nil { + return StateFailed, err + } + if len(wantPaths) == len(gotPaths) { + return StateDoneAlreadyExisted, nil + } + return StateResolvingOnTarget, nil +} + +func (i *WorkItem) resolveOnTarget(ctx context.Context) (State, error) { + // i.state == StateResolvingOnTarget + if i.coord.cfg.Target == nil || !i.coord.cfg.ResolveOnTarget { + return StateCheckingResolvers, nil + } + + wantPaths := i.paths() + resolvedPaths, err := i.coord.cfg.Target.EnsurePaths(ctx, wantPaths) + if err != nil { + var ne nixstore.NixError + if !errors.As(err, &ne) { + return StateFailed, err + } + } + if len(wantPaths) == len(resolvedPaths) { + return StateDoneResolvedOnTarget, nil + } + return StateCheckingResolvers, nil +} + +func (i *WorkItem) checkResolvers(ctx context.Context) (State, error) { + // i.state == StateCheckingResolvers + if len(i.coord.cfg.Resolvers) == 0 { + return StateLookingUpDerivation, nil + } + + type result struct { + r Resolver + ni []*narinfo.NarInfo + } + validResolversCh := make(chan result, len(i.coord.cfg.Resolvers)) + var wg sync.WaitGroup + for _, r := range i.coord.cfg.Resolvers { + r := r + wg.Add(1) + go func() { + defer wg.Done() + res := result{r: r} + for _, p := range i.paths() { + ni, err := r.FetchNarInfo(ctx, p) + if err != nil { + // Nope. + return + } + res.ni = append(res.ni, ni) + } + validResolversCh <- res + }() + } + wg.Wait() + close(validResolversCh) + + validResolvers := make([]result, 0, len(i.coord.cfg.Resolvers)) + for r := range validResolversCh { + validResolvers = append(validResolvers, r) + } + if len(validResolvers) == 0 { + return StateLookingUpDerivation, nil + } + sort.Slice(validResolvers, func(i, j int) bool { + return validResolvers[i].r.RelativePriority() < validResolvers[j].r.RelativePriority() + }) + i.resolver = validResolvers[0].r + i.narInfos = validResolvers[0].ni + return StateFetchingReferencesFromResolvers, nil +} + +func (i *WorkItem) pathSet() map[string]bool { + s := make(map[string]bool) + for _, p := range i.paths() { + s[p] = true + } + return s +} + +func (i *WorkItem) fetchReferencesFromResolvers(ctx context.Context) (State, error) { + // i.state == StateFetchingReferencesFromResolvers + // i.narInfos populated + if i.coord.cfg.Target == nil { + // Nowhere to fetch to. + return StateFetchingFromResolver, nil + } + + ownPaths := i.pathSet() + refs := make(map[string]bool) + var wis []*WorkItem + sp := i.coord.cfg.storePath() + for _, ni := range i.narInfos { + for _, ref := range ni.References { + refPath := path.Join(sp, ref) + if refs[refPath] || ownPaths[refPath] { + continue + } + refs[refPath] = true + wis = append(wis, i.coord.AddWork(ctx, refPath)) + } + } + if len(refs) == 0 { + // No references to fetch. + return StateFetchingFromResolver, nil + } + i.childWork = wis + + if err := i.awaitChildWorkItems(ctx); err != nil { + return StateFailed, err + } + return StateFetchingFromResolver, nil +} + +func (i *WorkItem) awaitChildWorkItems(ctx context.Context) error { + // i.childWork populated + for _, wi := range i.childWork { + select { + case <-ctx.Done(): + return ctx.Err() + case <-wi.doneCh: + if wi.err != nil { + return fmt.Errorf("dependency %v failed: %w", wi.paths(), wi.err) + } + } + } + return nil +} + +func (i *WorkItem) fetchFromResolver(ctx context.Context) (State, error) { + // i.state == StateFetchingFromResolver + // i.resolver populated + // i.narInfos populated + if i.coord.cfg.Target == nil { + // Present on a resolver and nowhere to copy to; we're done here. + return StateDoneFromResolver, nil + } + + type result struct { + err error + } + resCh := make(chan result, len(i.narInfos)) + + var wg sync.WaitGroup + for _, ni := range i.narInfos { + ni := ni + wg.Add(1) + go func() { + defer wg.Done() + rc, err := i.resolver.FetchNar(ctx, ni) + if err != nil { + resCh <- result{err} + return + } + if err := i.coord.cfg.Target.PutNar(ctx, ni, rc); err != nil { + resCh <- result{err} + return + } + rc.Close() + resCh <- result{} + }() + } + wg.Wait() + close(resCh) + for res := range resCh { + if res.err != nil { + return StateFailed, res.err + } + } + return StateDoneFromResolver, nil +} + +func (i *WorkItem) lookUpDerivation(ctx context.Context) (State, error) { + // i.state == StateLookingUpDerivation + if i.drv != nil { + return StateChoosingBuilder, nil + } + type result struct { + drv *nixdrv.BasicDerivation + drvPath string + } + var wg sync.WaitGroup + resCh := make(chan result, len(i.coord.cfg.Resolvers)) + for _, res := range i.coord.cfg.Resolvers { + res, ok := res.(DerivationFetcher) + if !ok { + continue + } + wg.Add(1) + go func() { + defer wg.Done() + drv, drvPath, err := res.FetchDerivationForPath(ctx, i.path) + if err != nil { + return + } + resCh <- result{drv, drvPath} + }() + } + wg.Wait() + close(resCh) + + for res := range resCh { + i.drv = res.drv + i.drvPath = res.drvPath + return StateChoosingBuilder, nil + } + return StateFailed, fmt.Errorf("couldn't find someone who knows how to build %v", i.path) +} + +func (i *WorkItem) chooseBuilder(ctx context.Context) (State, error) { + // i.state == StateChoosingBuilder + // i.drv non-empty + if len(i.coord.cfg.Builders) == 0 { + return StateFailed, fmt.Errorf("no builders configured") + } + var err error + // TODO: a better algorithm for picking a builder than "the first entry in the list" :) + type builderCandidate struct { + builder *Builder + busyness float64 + } + var candidates []builderCandidate + for _, b := range i.coord.cfg.Builders { + if !b.SupportedPlatforms[i.drv.Platform] { + continue + } + candidates = append(candidates, builderCandidate{ + builder: b, + busyness: b.Pool.Busyness(), + }) + } + if len(candidates) == 0 { + return StateFailed, fmt.Errorf("no builders support platform %q", i.drv.Platform) + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].busyness < candidates[j].busyness + }) + minBusyness := candidates[0].busyness + sameBusynessCandidates := candidates + for n, c := range candidates { + if c.busyness != minBusyness { + sameBusynessCandidates = candidates[:n] + break + } + } + rand.Shuffle(len(sameBusynessCandidates), func(i, j int) { + sameBusynessCandidates[i], sameBusynessCandidates[j] = sameBusynessCandidates[j], sameBusynessCandidates[i] + }) + i.builder = sameBusynessCandidates[0].builder + i.builderDaemon, err = sameBusynessCandidates[0].builder.Pool.Get() + if err != nil { + return StateFailed, err + } + i.cleanup = append(i.cleanup, func() { + if i.builderDaemon != nil { + i.builder.Pool.Put(i.builderDaemon) + } + i.builderDaemon = nil + }) + return StateResolvingReferencesOnBuilder, nil +} + +func (i *WorkItem) ensureReferencesOnBuilder(ctx context.Context) (State, error) { + // i.state == StateEnsuringReferencesOnBuilder + // i.builder non-empty + // i.drv non-empty + for path := range i.drv.InputSrcs { + if !i.coord.cfg.ResolveDependenciesOnBuilders { + i.builderNeededReferences = append(i.builderNeededReferences, path) + continue + } + + if err := i.builderDaemon.EnsurePath(i.at, path); err != nil { + var pnv nixstore.PathNotValidError + var ne nixstore.NixError + if errors.As(err, &pnv) || errors.As(err, &ne) { + i.builderNeededReferences = append(i.builderNeededReferences, path) + } else { + return StateFailed, fmt.Errorf("path %v: %w", path, err) + } + } + } + return StateCopyingReferencesToBuilder, nil +} + +func (i *WorkItem) copyReferencesToBuilder(ctx context.Context) (State, error) { + // i.state == StateCopyingReferencesToBuilder + // i.builder, i.builderPool non-empty + if len(i.builderNeededReferences) == 0 { + return StateBuildingDerivation, nil + } + + i.builder.Pool.Put(i.builderDaemon) + i.builderDaemon = nil + var wis []*WorkItem + for _, p := range i.builderNeededReferences { + // Must be in same order in wis/i.childWork as i.builderNeededReferences! + wis = append(wis, i.coord.AddWork(ctx, p)) + } + i.childWork = wis + if err := i.awaitChildWorkItems(ctx); err != nil { + return StateFailed, err + } + var err error + i.builderDaemon, err = i.builder.Pool.Get() + if err != nil { + return StateFailed, fmt.Errorf("reacquiring builder connection: %w", err) + } + + // Recheck which things we need to fetch; maybe we built or resolved some of them on the box. + validPaths, err := i.builderDaemon.ValidPaths(i.builderNeededReferences) + if err != nil { + return StateFailed, fmt.Errorf("checking valid paths on builder: %w", err) + } + validOnBuilder := make(map[string]bool) + for _, vp := range validPaths { + validOnBuilder[vp] = true + } + + for n, wi := range i.childWork { + path := i.builderNeededReferences[n] + if validOnBuilder[path] { + continue + } + + var f Fetcher + var ni *narinfo.NarInfo + switch wi.state { + case StateDoneAlreadyExisted, StateDoneResolvedOnTarget: + // copy from Target + f = i.coord.cfg.Target + case StateDoneFromResolver: + // copy from Resolver (wi.resolver) + f = wi.resolver + // We should already have the NAR info, so skip a step. + for _, wini := range wi.narInfos { + if wini.StorePath == path { + ni = wini + } + } + case StateDoneBuilt: + // copy from Builder (wi.builderPool) or Target + if wi.builder == i.builder { + // Nothing to do, we built the dep on the same builder. + continue + } + + if i.coord.cfg.Target != nil { + f = i.coord.cfg.Target + } else { + f = PeerFetcher{wi.builder.Pool} + } + default: + return StateFailed, fmt.Errorf("don't know where the target of %s is (for path %s)", wi.state, wi.paths()) + } + + if ni == nil { + var err error + ni, err = f.FetchNarInfo(ctx, path) + if err != nil { + return StateFailed, fmt.Errorf("looking up dependency NAR info for %v: %w", path, err) + } + } + rc, err := f.FetchNar(ctx, ni) + if err != nil { + return StateFailed, fmt.Errorf("fetching NAR for dependency %v: %w", wi.paths(), err) + } + if err := i.builderDaemon.AddToStoreNar(ni, rc); err != nil { + return StateFailed, fmt.Errorf("copying NAR to builder for dependency %v: %w", wi.paths(), err) + } + rc.Close() + } + + return StateBuildingDerivation, nil +} + +func (i *WorkItem) build(ctx context.Context) (State, error) { + // i.state == StateBuildingDerivation + // i.builder non-empty + + brs, err := i.builderDaemon.BuildDerivation(i.at, i.drvPath, i.drv, nixstore.BMNormal) + if err != nil { + return StateFailed, err + } + if !brs.IsSuccess() { + return StateFailed, fmt.Errorf("build failed: %v", brs) + } + return StateCopying, nil +} + +func (i *WorkItem) copyFromBuilder(ctx context.Context) (State, error) { + // i.state == StateCopying + // i.builder non-empty + if i.coord.cfg.Target == nil { + return StateDoneBuilt, nil + } + + // Check if these paths are already valid on target. + validPaths, err := i.coord.cfg.Target.ValidPaths(ctx, i.paths()) + if err != nil { + return StateFailed, fmt.Errorf("checking valid paths on target: %w", err) + } + validPathsSet := make(map[string]bool) + for _, p := range validPaths { + validPathsSet[p] = true + } + + for _, path := range i.paths() { + if validPathsSet[path] { + continue + } + ni, err := i.builderDaemon.NARInfo(path) + if err != nil { + return StateFailed, fmt.Errorf("fetching narinfo for %v: %w", path, err) + } + rc, err := i.builderDaemon.NARFromPath(path) + if err != nil { + return StateFailed, fmt.Errorf("fetching nar for %v: %w", path, err) + } + if err := i.coord.cfg.Target.PutNar(ctx, ni, rc); err != nil { + return StateFailed, fmt.Errorf("putting nar for %v to target: %w", path, err) + } + } + return StateDoneBuilt, nil +} + +func (i *WorkItem) run(ctx context.Context) error { + defer close(i.doneCh) + defer func() { + for _, cleanup := range i.cleanup { + cleanup() + } + }() + i.state = StateCheckingOnTarget + for !i.state.Terminal() { + if ctx.Err() != nil { + return ctx.Err() + } + + log.Printf("%s: running worker for %s", i.path, i.state) + var nextstate State + var err error + switch i.state { + case StateCheckingOnTarget: + nextstate, err = i.checkOnTarget(ctx) + case StateResolvingOnTarget: + nextstate, err = i.resolveOnTarget(ctx) + case StateCheckingResolvers: + nextstate, err = i.checkResolvers(ctx) + case StateFetchingReferencesFromResolvers: + nextstate, err = i.fetchReferencesFromResolvers(ctx) + case StateFetchingFromResolver: + nextstate, err = i.fetchFromResolver(ctx) + case StateLookingUpDerivation: + nextstate, err = i.lookUpDerivation(ctx) + case StateChoosingBuilder: + nextstate, err = i.chooseBuilder(ctx) + case StateResolvingReferencesOnBuilder: + nextstate, err = i.ensureReferencesOnBuilder(ctx) + case StateCopyingReferencesToBuilder: + nextstate, err = i.copyReferencesToBuilder(ctx) + case StateBuildingDerivation: + nextstate, err = i.build(ctx) + case StateCopying: + nextstate, err = i.copyFromBuilder(ctx) + default: + log.Printf("%s: ended up in unimplemented state %s", i.path, i.state) + <-make(chan struct{}) + } + prevstate := i.state + i.state = nextstate + if err != nil { + log.Printf("%s: transitioning to %s: %s", i.path, nextstate, err) + i.err = fmt.Errorf("%s: %w", prevstate, err) + return i.err + } + log.Printf("%s: transitioning to %s", i.path, nextstate) + } + return nil +} + +func (wi *WorkItem) Done() <-chan struct{} { return wi.doneCh } diff --git a/go/nix/nixdrv/default.nix b/go/nix/nixdrv/default.nix index eb4cfb4f99..c339619b1f 100644 --- a/go/nix/nixdrv/default.nix +++ b/go/nix/nixdrv/default.nix @@ -11,5 +11,6 @@ depot.third_party.buildGo.package { ./localfs.go ]; deps = with depot; [ + go.nix.nixhash ]; } diff --git a/go/nix/nixdrv/localfs.go b/go/nix/nixdrv/localfs.go index 61a09a3588..54ed2dc29a 100644 --- a/go/nix/nixdrv/localfs.go +++ b/go/nix/nixdrv/localfs.go @@ -2,12 +2,13 @@ package nixdrv import ( "bufio" + "context" "os" ) type LocalFSResolver struct{} -func (LocalFSResolver) LoadDerivation(path string) (*Derivation, error) { +func (LocalFSResolver) LoadDerivation(ctx context.Context, path string) (*Derivation, error) { f, err := os.Open(path) if err != nil { return nil, err diff --git a/go/nix/nixdrv/nixdrv.go b/go/nix/nixdrv/nixdrv.go index 277de6a895..fc9f13b504 100644 --- a/go/nix/nixdrv/nixdrv.go +++ b/go/nix/nixdrv/nixdrv.go @@ -8,8 +8,13 @@ package nixdrv import ( "bufio" "bytes" + "context" "fmt" + "path" + "sort" "strings" + + "hg.lukegb.com/lukegb/depot/go/nix/nixhash" ) type Output struct { @@ -30,8 +35,6 @@ type BasicDerivation struct { Builder string Args []string Env map[string]string - - InputDerivations []InputDerivation } type Derivation struct { @@ -69,13 +72,13 @@ func (drv *Derivation) Clone() *Derivation { } type Resolver interface { - LoadDerivation(path string) (*Derivation, error) + LoadDerivation(ctx context.Context, path string) (*Derivation, error) } -func (drv *Derivation) ToBasicDerivation(r Resolver) (*BasicDerivation, error) { +func (drv *Derivation) ToBasicDerivation(ctx context.Context, r Resolver) (*BasicDerivation, error) { o := drv.BasicDerivation.Clone() for _, inp := range drv.InputDerivations { - inpDrv, err := r.LoadDerivation(inp.Path) + inpDrv, err := r.LoadDerivation(ctx, inp.Path) if err != nil { return nil, fmt.Errorf("resolving %v: %w", inp.Path, err) } @@ -92,6 +95,21 @@ func (drv *Derivation) ToBasicDerivation(r Resolver) (*BasicDerivation, error) { return o, nil } +func (drv *Derivation) StoreHash(name, storeRoot string) string { + var references []string + for ref := range drv.InputSrcs { + references = append(references, ref) + } + for _, ref := range drv.InputDerivations { + references = append(references, ref.Path) + } + return nixhash.StorePathForText(name+".drv", drv.String(), storeRoot, references) +} + +func (drv *Derivation) StorePath(name, storeRoot string) string { + return path.Join(storeRoot, fmt.Sprintf("%s-%s.drv", drv.StoreHash(name, storeRoot), name)) +} + func expect(f *bufio.Reader, s string) error { buf := make([]byte, len(s)) _, err := f.Read(buf) @@ -194,6 +212,103 @@ loop: return s.String(), nil } +func writeTuple(sb *strings.Builder, ss ...string) { + sb.WriteByte('(') + if len(ss) >= 1 { + writeString(sb, ss[0]) + } + for _, s := range ss[1:] { + sb.WriteByte(',') + writeString(sb, s) + } + sb.WriteByte(')') +} + +func writeList[T any](sb *strings.Builder, elements []T, writeElement func(*strings.Builder, T)) { + sb.WriteByte('[') + if len(elements) >= 1 { + writeElement(sb, elements[0]) + } + for _, e := range elements[1:] { + sb.WriteByte(',') + writeElement(sb, e) + } + sb.WriteByte(']') +} + +var ( + escaperReplacer = strings.NewReplacer("\"", "\\\"", + "\\", "\\\\", + "\n", "\\n", + "\r", "\\r", + "\t", "\\t") +) + +func escapeString(s string) string { + return "\"" + escaperReplacer.Replace(s) + "\"" +} + +func writeUnescapedString(sb *strings.Builder, s string) { + sb.WriteString(escapeString(s)) +} + +func writeString(sb *strings.Builder, s string) { + sb.WriteString(s) +} + +func (d *Derivation) String() string { + var sb strings.Builder + sb.WriteString("Derive(") + + var outputKeys []string + for key := range d.Outputs { + outputKeys = append(outputKeys, key) + } + sort.Strings(outputKeys) + writeList(&sb, outputKeys, func(sb *strings.Builder, k string) { + v := d.Outputs[k] + writeTuple(sb, escapeString(k), escapeString(v.Path), escapeString(v.HashAlgorithm), escapeString(v.Hash)) + }) + sb.WriteByte(',') + + writeList(&sb, d.InputDerivations, func(sb *strings.Builder, drv InputDerivation) { + var outputsBuilder strings.Builder + writeList(&outputsBuilder, drv.Outputs, writeUnescapedString) + writeTuple(sb, escapeString(drv.Path), outputsBuilder.String()) + }) + sb.WriteByte(',') + + var inputSrcs []string + for is := range d.InputSrcs { + inputSrcs = append(inputSrcs, is) + } + sort.Strings(inputSrcs) + writeList(&sb, inputSrcs, writeUnescapedString) + sb.WriteByte(',') + + writeUnescapedString(&sb, d.Platform) + sb.WriteByte(',') + + writeUnescapedString(&sb, d.Builder) + sb.WriteByte(',') + + writeList(&sb, d.Args, writeUnescapedString) + sb.WriteByte(',') + + var env []string + for k := range d.Env { + env = append(env, k) + } + sort.Strings(env) + writeList(&sb, env, func(sb *strings.Builder, k string) { + v := d.Env[k] + writeTuple(sb, escapeString(k), escapeString(v)) + }) + + sb.WriteByte(')') // Derive( + return sb.String() +} + func Load(f *bufio.Reader) (*Derivation, error) { drv := &Derivation{} @@ -278,6 +393,9 @@ func Load(f *bufio.Reader) (*Derivation, error) { } return id, nil }) + if err != nil { + return nil, err + } if err := expectChar(f, ','); err != nil { return nil, fmt.Errorf(", after input derivations list: %w", err) } @@ -342,6 +460,9 @@ func Load(f *bufio.Reader) (*Derivation, error) { } return kv, nil }) + if err != nil { + return nil, err + } drv.Env = make(map[string]string) for _, kv := range envList { drv.Env[kv.key] = kv.value diff --git a/go/nix/nixdrv/nixdrvhash_test.go b/go/nix/nixdrv/nixdrvhash_test.go new file mode 100644 index 0000000000..bf74caa246 --- /dev/null +++ b/go/nix/nixdrv/nixdrvhash_test.go @@ -0,0 +1,27 @@ +package nixdrv + +import ( + "bufio" + "path" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" +) + +const testData = `Derive([("out","/nix/store/prcng4aaimpjmnn7w0r8p4yfsqknmd64-golib-nixbuild","","")],[("/nix/store/0d0g9c5311rsll1vbw1nckf8xci5px78-golib-golang.org_x_crypto_ssh.drv",["out"]),("/nix/store/2yf7xmgj0pdx1mxk8dxaf2nxkw5gb0h0-golib-github.com_numtide_go-nix_nixbase32.drv",["out"]),("/nix/store/a5r8sb3xk6p0is4z6pw7jflc97v623k0-golib-golang.org_x_crypto_chacha20.drv",["out"]),("/nix/store/dv6n6shvw8qdk7bc00ygyjqwk26jv7lh-golib-golang.org_x_crypto_blowfish.drv",["out"]),("/nix/store/dwfjpdgw8by8f4aidinqaxfkxwg7cz02-golib-golang.org_x_crypto_curve25519.drv",["out"]),("/nix/store/flrv8lx2775bgdmxhbspbzc7jh3fg2x7-golib-golang.org_x_crypto_ed25519.drv",["out"]),("/nix/store/ghdqkh5fgfscd4n2yzsv2z03v23vymxa-golib-github.com_mattn_go-sqlite3.drv",["out"]),("/nix/store/ix0pi11lhgm9m8nqi99gzfn07vjnhk6l-golib-golang.org_x_crypto_poly1305.drv",["out"]),("/nix/store/jfi4am6v5qhqk0gjq5ycrpb6y8754qxf-bash-5.2-p15.drv",["out"]),("/nix/store/jwz0c3yq7rwn8xjq5hb19awach94d559-golib-narinfo.drv",["out"]),("/nix/store/nvf2z167xbgr57jmyi4ywnd3pn73wsli-golib-nixstore.drv",["out"]),("/nix/store/q2kpxbf93znb9raraxj5kjfa20amfry0-golib-nixdrv.drv",["out"]),("/nix/store/q653yq6f7dxwir77q9ryi1gvbcdcs9b0-golib-golang.org_x_crypto_ssh_internal_bcrypt_pbkdf.drv",["out"]),("/nix/store/qym7bn0pqh3381c079dr5yv4fi6nlmkj-go-1.19.12.drv",["out"]),("/nix/store/s7y5gb05ka3rcpazvkax3hm1bz64d8f8-golib-nixwire.drv",["out"]),("/nix/store/sw37l040k8zh7v02xvfc3db6yyim8wcy-stdenv-linux.drv",["out"]),("/nix/store/walxgj9piwz01f03ldk6x4fn1l5hmx7f-golib-golang.org_x_crypto_internal_subtle.drv",["out"]),("/nix/store/zb4czqkdd83asjdmf2984djz47n2wph6-golib-nixpool.drv",["out"])],["/nix/store/4n7mpxfsgjv0mjssiagldv26m7255d5z-nixbuild.go","/nix/store/6xg259477c90a229xwmb53pdfkn6ig3g-default-builder.sh","/nix/store/96bcb71plkmp73gwinl9awb4kx0kx41b-peerresolver.go","/nix/store/m3x8b8mqliy6gw1ka4gcg83qgbcl0irz-state.go","/nix/store/mmnbiyvnd1bvxw4p1vdhlvkph4q28q29-workitem.go","/nix/store/qs17yzkmv8imp1h93723b4j79j043l18-httpresolver.go","/nix/store/s22qhmbxm275lxbpxxxk1i4s2nan27pl-config.go","/nix/store/zms41ag0wqm1x40g6z740c5w3pgpc0a4-coordinator.go"],"x86_64-linux","/nix/store/ir0j7zqlw9dc49grmwplppc7gh0s40yf-bash-5.2-p15/bin/bash",["-e","/nix/store/6xg259477c90a229xwmb53pdfkn6ig3g-default-builder.sh"],[("__structuredAttrs",""),("buildCommand","mkdir -p $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild\nEXTRAGO=\"\"\ncp /nix/store/s22qhmbxm275lxbpxxxk1i4s2nan27pl-config.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/config.go\ncp /nix/store/zms41ag0wqm1x40g6z740c5w3pgpc0a4-coordinator.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/coordinator.go\ncp /nix/store/qs17yzkmv8imp1h93723b4j79j043l18-httpresolver.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/httpresolver.go\ncp /nix/store/4n7mpxfsgjv0mjssiagldv26m7255d5z-nixbuild.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/nixbuild.go\ncp /nix/store/96bcb71plkmp73gwinl9awb4kx0kx41b-peerresolver.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/peerresolver.go\ncp /nix/store/m3x8b8mqliy6gw1ka4gcg83qgbcl0irz-state.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/state.go\ncp /nix/store/mmnbiyvnd1bvxw4p1vdhlvkph4q28q29-workitem.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/workitem.go\n\n\n/nix/store/71wgdw4snmvcbqkvly4lb5kham3nk5l9-go-1.19.12/bin/go tool compile -pack -o $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild.a -trimpath=$PWD -trimpath=/nix/store/71wgdw4snmvcbqkvly4lb5kham3nk5l9-go-1.19.12 -trimpath=$out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild -p hg.lukegb.com/lukegb/depot/go/nix/nixbuild -I /nix/store/rkna1n7ky45szab5lypfdmk0sjb8msqs-golib-nixstore -I /nix/store/d2w6m90zp0lpfd1h6mry95260ilgfs8h-golib-nixpool -I /nix/store/4r1c7qrgbj4h62srinpz7aps02amz7jd-golib-narinfo -I /nix/store/2icy6ahd2mbfim6s4r6yxpq95i4rlvgg-golib-nixwire -I /nix/store/0crllg0pzndv8p74scdvv41jy182b7mj-golib-github.com_mattn_go-sqlite3 -I /nix/store/wzb7f00nwkjwf1bya6wq7zswji3vcyis-golib-golang.org_x_crypto_ssh -I /nix/store/130v52aarlilgxdyjp6mmlpjiq4gw7df-golib-github.com_numtide_go-nix_nixbase32 -I /nix/store/7yhdfd5x9w1b3f6nb99kfb87myy58bk4-golib-nixdrv -I /nix/store/5ccrqcj75fhwzd5kdwa23pr0wj1h77cl-golib-golang.org_x_crypto_chacha20 -I /nix/store/7glvixy5lxmwdk2av9jymccdqrfq3v40-golib-golang.org_x_crypto_curve25519 -I /nix/store/7rfgqvkj8qxhl69wv2g6nfh9grcdvn4w-golib-golang.org_x_crypto_ed25519 -I /nix/store/1153gzqv9xqrhj4y1fdsxgl5zyjpkipl-golib-golang.org_x_crypto_poly1305 -I /nix/store/ddli4283d0r26m4wd16z6pcdrwplq4p5-golib-golang.org_x_crypto_ssh_internal_bcrypt_pbkdf -I /nix/store/1rykqga28bpn2cz2n1zygibrqbxms11j-golib-golang.org_x_crypto_internal_subtle -I /nix/store/cz6qi20aqqyzvdazy9xfnqz855zx3wj1-golib-golang.org_x_crypto_blowfish $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/config.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/coordinator.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/httpresolver.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/nixbuild.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/peerresolver.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/state.go $out/hg.lukegb.com/lukegb/depot/go/nix/nixbuild/workitem.go $EXTRAGO\n\n\n"),("buildInputs",""),("builder","/nix/store/ir0j7zqlw9dc49grmwplppc7gh0s40yf-bash-5.2-p15/bin/bash"),("cmakeFlags",""),("configureFlags",""),("depsBuildBuild",""),("depsBuildBuildPropagated",""),("depsBuildTarget",""),("depsBuildTargetPropagated",""),("depsHostHost",""),("depsHostHostPropagated",""),("depsTargetTarget",""),("depsTargetTargetPropagated",""),("doCheck",""),("doInstallCheck",""),("enableParallelBuilding","1"),("enableParallelChecking","1"),("enableParallelInstalling","1"),("mesonFlags",""),("name","golib-nixbuild"),("nativeBuildInputs",""),("out","/nix/store/prcng4aaimpjmnn7w0r8p4yfsqknmd64-golib-nixbuild"),("outputs","out"),("passAsFile","buildCommand"),("patches",""),("propagatedBuildInputs",""),("propagatedNativeBuildInputs",""),("stdenv","/nix/store/nfcsib9abh3qf6k30401sw7aw5vhi06v-stdenv-linux"),("strictDeps",""),("system","x86_64-linux")])` +const testPath = "/nix/store/c8xiwx5mlflhs0gxqa8v335w99izsbbj-golib-nixbuild.drv" + +func TestHash(t *testing.T) { + drv, err := Load(bufio.NewReader(strings.NewReader(testData))) + if err != nil { + t.Fatalf("Load: %v", err) + } + if diff := cmp.Diff(testData, drv.String()); diff != "" { + t.Fatal(diff) + } + + if got := drv.StorePath("golib-nixbuild", path.Dir(testPath)); got != testPath { + t.Fatalf("%q != %q", got, testPath) + } +} diff --git a/go/nix/nixhash/default.nix b/go/nix/nixhash/default.nix new file mode 100644 index 0000000000..45e3df0062 --- /dev/null +++ b/go/nix/nixhash/default.nix @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: 2023 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.package { + name = "nixhash"; + path = "hg.lukegb.com/lukegb/depot/go/nix/nixhash"; + srcs = [ + ./nixhash.go + ]; + deps = with depot; [ + third_party.gopkgs."github.com".numtide.go-nix.nixbase32 + ]; +} diff --git a/go/nix/nixhash/nixhash.go b/go/nix/nixhash/nixhash.go new file mode 100644 index 0000000000..826a7298b3 --- /dev/null +++ b/go/nix/nixhash/nixhash.go @@ -0,0 +1,32 @@ +package nixhash + +import ( + "crypto/sha256" + "encoding/hex" + "sort" + "strings" + + "github.com/numtide/go-nix/nixbase32" +) + +func StorePathForText(name, s, storePath string, references []string) string { + strHash := sha256.Sum256([]byte(s)) + sort.Strings(references) + + var typeStrBuilder strings.Builder + typeStrBuilder.WriteString("text") + for _, ref := range references { + typeStrBuilder.WriteString(":") + typeStrBuilder.WriteString(ref) + } + typeStr := typeStrBuilder.String() + sStr := typeStr + ":sha256:" + hex.EncodeToString(strHash[:]) + ":" + storePath + ":" + name + + sHash := sha256.Sum256([]byte(sStr)) + sCompressedHash := make([]byte, 20) + for i := range sHash { + sCompressedHash[i%len(sCompressedHash)] ^= sHash[i] + } + + return nixbase32.EncodeToString(sCompressedHash[:]) +} diff --git a/go/nix/nixpool/default.nix b/go/nix/nixpool/default.nix new file mode 100644 index 0000000000..e96615bc31 --- /dev/null +++ b/go/nix/nixpool/default.nix @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: 2023 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.package { + name = "nixpool"; + path = "hg.lukegb.com/lukegb/depot/go/nix/nixpool"; + srcs = [ + ./dialer.go + ./nixpool.go + ]; + deps = with depot; [ + go.nix.nixdrv + go.nix.nixstore + + third_party.gopkgs."golang.org".x.crypto.ssh + ]; +} diff --git a/go/nix/nixpool/dialer.go b/go/nix/nixpool/dialer.go new file mode 100644 index 0000000000..46b2edd874 --- /dev/null +++ b/go/nix/nixpool/dialer.go @@ -0,0 +1,134 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixpool + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + + "golang.org/x/crypto/ssh" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +// DaemonFactory is the shape of a factory function. +type DaemonFactory func() (*nixstore.Daemon, error) + +// DaemonDialer creates a factory function from the provided remote. +func DaemonDialer(ctx context.Context, remote string) (DaemonFactory, error) { + u, err := url.Parse(remote) + if err != nil { + return nil, fmt.Errorf("parsing remote %q as URL: %w", remote, err) + } + + switch u.Scheme { + case "unix": + if u.Path == "" { + u.Path = nixstore.DaemonSock + } + return func() (*nixstore.Daemon, error) { + d, err := nixstore.OpenDaemon(u.Path) + if err != nil { + return nil, err + } + return d, nil + }, nil + case "ssh-ng": + // Construct a ClientConfig from the URL. + cfg := &ssh.ClientConfig{} + cfg.Ciphers = []string{"chacha20-poly1305@openssh.com"} + if u.Query().Has("privkey") { + var keys []ssh.Signer + for _, privkeyPath := range u.Query()["privkey"] { + privkeyF, err := os.Open(privkeyPath) + if err != nil { + return nil, fmt.Errorf("opening privkey %q: %w", privkeyPath, err) + } + defer privkeyF.Close() + privkeyB, err := io.ReadAll(privkeyF) + if err != nil { + return nil, fmt.Errorf("reading privkey %q: %w", privkeyPath, err) + } + + privkey, err := ssh.ParsePrivateKey(privkeyB) + if err != nil { + return nil, fmt.Errorf("parsing privkey %q: %w", privkeyPath, err) + } + keys = append(keys, privkey) + } + cfg.Auth = append(cfg.Auth, ssh.PublicKeys(keys...)) + } + if u.User != nil { + cfg.User = u.User.Username() + if pw, ok := u.User.Password(); ok { + cfg.Auth = append(cfg.Auth, ssh.Password(pw)) + } + } + switch { + case u.Query().Has("host-key"): + hkStr := u.Query().Get("host-key") + _, _, hk, _, _, err := ssh.ParseKnownHosts(append([]byte("x "), []byte(hkStr)...)) + if err != nil { + return nil, fmt.Errorf("parsing host-key %q: %w", hkStr, err) + } + cfg.HostKeyCallback = ssh.FixedHostKey(hk) + case u.Query().Has("insecure-allow-any-ssh-host-key"): + cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey() + default: + return nil, fmt.Errorf("some SSH host key configuration is required (?host-key=; ?insecure-allow-any-ssh-host-key)") + } + + // Work out other misc parameters. + // ...remote command. + remoteCmd := "nix-daemon --stdio" + if u.Query().Has("remote-cmd") { + remoteCmd = u.Query().Get("remote-cmd") + } + + // Work out the host:port to connect to. + remote := u.Hostname() + if portStr := u.Port(); portStr != "" { + remote = remote + ":" + portStr + } else { + remote = remote + ":22" + } + + return func() (*nixstore.Daemon, error) { + conn, err := ssh.Dial("tcp", remote, cfg) + if err != nil { + return nil, fmt.Errorf("dialing %v via SSH: %w", remote, err) + } + sess, err := conn.NewSession() + if err != nil { + conn.Close() + return nil, fmt.Errorf("opening SSH session to %v: %w", remote, err) + } + stdin, err := sess.StdinPipe() + if err != nil { + conn.Close() + return nil, fmt.Errorf("opening stdin pipe: %w", err) + } + stdout, err := sess.StdoutPipe() + if err != nil { + conn.Close() + return nil, fmt.Errorf("opening stdout pipe: %w", err) + } + if err := sess.Start(remoteCmd); err != nil { + conn.Close() + return nil, fmt.Errorf("starting %q: %w", remoteCmd, err) + } + d, err := nixstore.OpenDaemonWithIOs(stdout, stdin, conn) + if err != nil { + conn.Close() + return nil, fmt.Errorf("establishing connection to daemon: %w", err) + } + return d, nil + }, nil + default: + return nil, fmt.Errorf("unknown remote %q", remote) + } +} diff --git a/go/nix/nixpool/nixpool.go b/go/nix/nixpool/nixpool.go new file mode 100644 index 0000000000..ad9b85499f --- /dev/null +++ b/go/nix/nixpool/nixpool.go @@ -0,0 +1,119 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +package nixpool + +import ( + "context" + "log" + "math/rand" + "runtime" + "sync" + "time" + + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +type Pool struct { + factory DaemonFactory + + cond *sync.Cond + limit int + generated int + available []*nixstore.Daemon +} + +func New(ctx context.Context, factory DaemonFactory, limit int) *Pool { + return &Pool{ + factory: func() (*nixstore.Daemon, error) { + var d *nixstore.Daemon + var err error + retryInterval := 10 * time.Millisecond + maxRetry := 10 * time.Second + for n := 0; n < 20; n++ { + d, err = factory() + if err == nil { + break + } + log.Printf("failed to connect: %v", err) + time.Sleep(retryInterval + time.Duration((rand.Float64()-0.5)*float64(retryInterval))) + retryInterval = retryInterval * 2 + if retryInterval > maxRetry { + retryInterval = maxRetry + } + } + return d, err + }, + + cond: sync.NewCond(new(sync.Mutex)), + limit: limit, + } +} + +func (p *Pool) getFromAvailableLocked() *nixstore.Daemon { + end := len(p.available) - 1 + d := p.available[end] + p.available = p.available[:end] + return d +} + +func (p *Pool) log(ln string) { + if false { + _, file, line, _ := runtime.Caller(2) + log.Printf("%p (%s:%d): %s", p, file, line, ln) + } +} + +func (p *Pool) Get() (*nixstore.Daemon, error) { + p.cond.L.Lock() + + for { + if len(p.available) > 0 { + d := p.getFromAvailableLocked() + p.log("pool.Get: from available") + p.cond.L.Unlock() + return d, nil + } + + if p.generated < p.limit { + p.generated++ + p.cond.L.Unlock() + d, err := p.factory() + if err != nil { + p.cond.L.Lock() + p.generated-- + p.cond.L.Unlock() + return nil, err + } + p.log("pool.Get: generated new") + return d, nil + } + + p.cond.Wait() + } +} + +func (p *Pool) Put(d *nixstore.Daemon) { + p.cond.L.Lock() + if d.Err() != nil { + d.Close() + p.log("pool.Put: broken") + p.generated-- + } else { + p.log("pool.Put: returned to pool") + p.available = append(p.available, d) + } + p.cond.Signal() + p.cond.L.Unlock() +} + +func (p *Pool) Busyness() float64 { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + currentlyAvailable := len(p.available) + currentlyCheckedOut := p.generated - currentlyAvailable + proportionCheckedOut := float64(currentlyCheckedOut) / float64(p.limit) + return proportionCheckedOut +} diff --git a/go/nix/nixstore/activities.go b/go/nix/nixstore/activities.go index efac722758..7d3199c5de 100644 --- a/go/nix/nixstore/activities.go +++ b/go/nix/nixstore/activities.go @@ -197,7 +197,7 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) { llt = LogLinePostBuild } if VerboseActivities { - log.Printf("%s:%d> %s", am.ActivityID, fields[0].(string)) + log.Printf("%s:%d> %s", am.String, am.ActivityID, fields[0].(string)) } am.logs = append(am.logs, &ActivityLog{ Timestamp: now, diff --git a/go/nix/nixstore/default.nix b/go/nix/nixstore/default.nix index 29882edc28..b3b264cf79 100644 --- a/go/nix/nixstore/default.nix +++ b/go/nix/nixstore/default.nix @@ -13,7 +13,9 @@ depot.third_party.buildGo.package { ./sqlitestore.go ]; deps = with depot; [ + go.nix.nar go.nix.nar.narinfo + go.nix.nixdrv go.nix.nixwire third_party.gopkgs."github.com".mattn.go-sqlite3 third_party.gopkgs."golang.org".x.crypto.ssh diff --git a/go/nix/nixstore/remotestore.go b/go/nix/nixstore/remotestore.go index 3eb6e74ae8..8b5fcedda3 100644 --- a/go/nix/nixstore/remotestore.go +++ b/go/nix/nixstore/remotestore.go @@ -1,14 +1,19 @@ package nixstore import ( + "bytes" "encoding/base64" + "encoding/hex" + "errors" "fmt" "io" + "log" "net" "path" "strings" "sync" + "hg.lukegb.com/lukegb/depot/go/nix/nar" "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" "hg.lukegb.com/lukegb/depot/go/nix/nixwire" @@ -18,12 +23,18 @@ const ( DaemonSock = "/nix/var/nix/daemon-socket/socket" WorkerMagic1 = 0x6e697863 WorkerMagic2 = 0x6478696f - ProtocolVersion = 0x115 + ProtocolVersion = 0x120 // 0x115 + WopAddMultipleToStore = 44 + WopAddToStoreNar = 39 WopBuildDerivation = 36 WopEnsurePath = 10 + WopIsValidPath = 1 + WopNarFromPath = 38 WopQueryPathInfo = 26 WopQuerySubstitutablePathInfo = 21 + WopQueryValidDerivers = 33 + WopQueryValidPaths = 31 MaxBuf = 1024 * 1024 // 1 MB ) @@ -34,6 +45,8 @@ type Daemon struct { r *nixwire.Deserializer mu sync.Mutex err error + + peerMinorVersion int } type PathNotValidError struct{ Path string } @@ -42,23 +55,31 @@ func (err PathNotValidError) Error() string { return fmt.Sprintf("path %q is not valid", err.Path) } +func (d *Daemon) Err() error { + return d.err +} + func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { d.mu.Lock() defer d.mu.Unlock() - if _, err := d.w.WriteUint64(WopQueryPathInfo); err != nil { + if _, err := d.writeOp(WopQueryPathInfo); err != nil { + d.err = err return nil, fmt.Errorf("writing worker op WopQueryPathInfo: %w", err) } if _, err := d.w.WriteString(storePath); err != nil { + d.err = err return nil, fmt.Errorf("writing store path query %v: %w", storePath, err) } if err := d.processStderr(nil, nil, nil); err != nil { + d.err = err return nil, fmt.Errorf("reading stderr from WopQueryPathInfo: %w", err) } validInt, err := d.r.ReadUint64() if err != nil { + d.err = err return nil, fmt.Errorf("reading path validity: %w", err) } valid := validInt == uint64(1) @@ -70,6 +91,7 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { StorePath: storePath, } if ni.Deriver, err = d.r.ReadString(); err != nil { + d.err = err return nil, fmt.Errorf("reading deriver: %w", err) } ni.Deriver = path.Base(ni.Deriver) @@ -79,14 +101,17 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { hashStr, err := d.r.ReadString() if err != nil { + d.err = err return nil, fmt.Errorf("reading NAR hash: %w", err) } if ni.NarHash, err = narinfo.HashFromString("sha256:" + hashStr); err != nil { + d.err = err return nil, fmt.Errorf("parsing NAR hash %q: %w", hashStr, err) } refs, err := d.r.ReadStrings() if err != nil { + d.err = err return nil, fmt.Errorf("reading referrers: %w", err) } for n, ref := range refs { @@ -95,19 +120,25 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { ni.References = refs if _, err := d.r.ReadUint64(); err != nil { + d.err = err return nil, fmt.Errorf("reading registration time: %w", err) } if ni.NarSize, err = d.r.ReadUint64(); err != nil { + d.err = err return nil, fmt.Errorf("reading narsize: %w", err) } if _, err := d.r.ReadUint64(); err != nil { + d.err = err return nil, fmt.Errorf("reading ultimate: %w", err) } sigs, err := d.r.ReadStrings() if err != nil { + d.err = err return nil, fmt.Errorf("reading sigs: %w", err) } - if _, err := d.r.ReadUint64(); err != nil { + ni.CA, err = d.r.ReadString() + if err != nil { + d.err = err return nil, fmt.Errorf("reading CA: %w", err) } @@ -124,6 +155,71 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { return ni, nil } +func (d *Daemon) NARFromPath(storePath string) (io.ReadCloser, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, err := d.writeOp(WopNarFromPath); err != nil { + d.err = err + return nil, fmt.Errorf("writing worker op WopNarFromPath: %w", err) + } + if _, err := d.w.WriteString(storePath); err != nil { + d.err = err + return nil, fmt.Errorf("writing store path query %v: %w", storePath, err) + } + if err := d.processStderr(nil, nil, nil); err != nil { + return nil, fmt.Errorf("waiting for NAR packing: %w", err) + } + + pr, pw := io.Pipe() + go func() { + err := nar.Unpack(io.TeeReader(d.r.Reader, pw), nar.NullFS{}, "irrelevant") + if err != nil { + pw.CloseWithError(err) + } else { + pw.Close() + } + }() + + return io.NopCloser(pr), nil +} + +func (d *Daemon) writeOp(op uint64) (int64, error) { + return d.w.WriteUint64(op) +} + +func (d *Daemon) ValidPaths(storePaths []string) ([]string, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, err := d.writeOp(WopQueryValidPaths); err != nil { + d.err = err + return nil, fmt.Errorf("writing worker op WopQueryValidPaths: %w", err) + } + if _, err := d.w.WriteStrings(storePaths); err != nil { + d.err = err + return nil, fmt.Errorf("writing store path query %v: %w", storePaths, err) + } + if d.peerMinorVersion >= 27 { + // Substitute flag + if _, err := d.w.WriteUint64(0); err != nil { + d.err = err + return nil, fmt.Errorf("writing substitute flag: %w", err) + } + } + + if err := d.processStderr(nil, nil, nil); err != nil { + return nil, fmt.Errorf("reading stderr from WopQueryValidPaths: %w", err) + } + + validPaths, err := d.r.ReadStrings() + if err != nil { + d.err = err + return nil, fmt.Errorf("reading valid paths: %w", err) + } + return validPaths, nil +} + func (d *Daemon) Close() error { d.mu.Lock() defer d.mu.Unlock() @@ -175,27 +271,33 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re for { msg, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("read stderr msg code: %w", err) } switch msg { case 0x64617416: // STDERR_WRITE if stdout == nil { + d.err = err return fmt.Errorf("STDERR_WRITE requested") } bs, err := d.r.ReadBytes() if err != nil { + d.err = err return fmt.Errorf("read bytes: %w", err) } _, err = stdout.Write(bs) if err != nil { + d.err = err return fmt.Errorf("write bytes into stdout: %w", err) } case 0x64617461: // STDERR_READ if stdin == nil { + d.err = err return fmt.Errorf("STDERR_READ requested") } readSizeU64, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_READ reading uint64: %w", err) } readSize := int(readSizeU64) @@ -205,52 +307,111 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re buf := make([]byte, readSize) readSize, err = stdin.Read(buf) if err != nil { + d.err = err return fmt.Errorf("STDERR_READ reading from stdin: %w", err) } if _, err := d.w.WriteBytes(buf[:readSize]); err != nil { + d.err = err return fmt.Errorf("STDERR_READ writing stdin to socket: %w", err) } case 0x63787470: // STDERR_ERROR - errStr, err := d.r.ReadString() - if err != nil { - return fmt.Errorf("STDERR_ERROR reading error string: %w", err) + if d.peerMinorVersion >= 26 { + errStr, err := d.r.ReadString() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading 'Error' string: %w", err) + } + errLevel, err := d.r.ReadUint64() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading error level: %w", err) + } + errName, err := d.r.ReadString() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading error name: %w", err) + } + errMsg, err := d.r.ReadString() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading error message: %w", err) + } + errPos, err := d.r.ReadUint64() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading error position: %w", err) + } + log.Printf("error? Error=%s Level=%d Name=%s Msg=%s Pos=%d", errStr, errLevel, errName, errMsg, errPos) + errTraceNum, err := d.r.ReadUint64() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading trace count: %w", err) + } + for n := 0; n < int(errTraceNum); n++ { + tracePos, err := d.r.ReadUint64() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading trace position: %w", err) + } + traceHint, err := d.r.ReadString() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading trace hint: %w", err) + } + log.Printf("error? trace %d/%d: Pos=%d Hint=%s", n+1, errTraceNum, tracePos, traceHint) + } + return fmt.Errorf("nix Error=%s Level=%d Name=%s Msg=%s Pos=%d", errStr, errLevel, errName, errMsg, errPos) + } else { + errStr, err := d.r.ReadString() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading error string: %w", err) + } + status, err := d.r.ReadUint64() + if err != nil { + d.err = err + return fmt.Errorf("STDERR_ERROR reading uint64: %w", err) + } + al.AddError(status, errStr) + return NixError{status, errStr} } - status, err := d.r.ReadUint64() - if err != nil { - return fmt.Errorf("STDERR_ERROR reading uint64: %w", err) - } - al.AddError(status, errStr) - return NixError{status, errStr} case 0x6f6c6d67: // STDERR_NEXT msg, err := d.r.ReadString() if err != nil { + d.err = err return fmt.Errorf("STDERR_NEXT reading log string: %w", err) } al.AddLog(msg) case 0x53545254: // STDERR_START_ACTIVITY activity, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_START_ACTIVITY reading ActivityId: %w", err) } lvl, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_START_ACTIVITY reading level: %w", err) } actTypeU64, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_START_ACTIVITY reading activity type: %w", err) } actType := ActivityType(actTypeU64) s, err := d.r.ReadString() if err != nil { + d.err = err return fmt.Errorf("STDERR_START_ACTIVITY reading s: %w", err) } fields, err := d.readFields() if err != nil { + d.err = err return fmt.Errorf("STDERR_START_ACTIVITY fields: %w", err) } parentActivity, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_START_ACTIVITY reading parent activity: %w", err) } al.StartActivity(actType, ActivityMeta{ @@ -263,21 +424,25 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re case 0x53544f50: // STDERR_STOP_ACTIVITY activity, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_STOP_ACTIVITY reading ActivityId: %w", err) } al.EndActivity(al.Activity(activity)) case 0x52534c54: // STDERR_RESULT activity, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_RESULT reading ActivityId: %w", err) } resultTypU64, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("STDERR_RESULT reading result: %w", err) } resultTyp := ResultType(resultTypU64) fields, err := d.readFields() if err != nil { + d.err = err return fmt.Errorf("STDERR_RESULT fields: %w", err) } al.ActivityResult(al.Activity(activity), resultTyp, fields) @@ -290,44 +455,83 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re func (d *Daemon) hello() error { _, err := d.w.WriteUint64(WorkerMagic1) if err != nil { + d.err = err return fmt.Errorf("writing magic 1: %w", err) } magic2, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("reading magic 2: %w", err) } if magic2 != WorkerMagic2 { + d.err = err return fmt.Errorf("magic 2 mismatch: got %x, wanted %x", magic2, WorkerMagic2) } daemonVersion, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("reading daemon version: %w", err) } majorVersion := int((daemonVersion >> 8) & 0xff) minorVersion := int(daemonVersion & 0xff) if majorVersion != 1 { + d.err = err return fmt.Errorf("daemon major version mismatch: got %d, want 1", majorVersion) } if minorVersion < 21 { + d.err = err return fmt.Errorf("daemon minor version too old: got %d, want at least 21", minorVersion) } + d.peerMinorVersion = minorVersion if _, err := d.w.WriteUint64(ProtocolVersion); err != nil { + d.err = err return fmt.Errorf("writing protocol version: %w", err) } if _, err := d.w.WriteUint64(0); err != nil { + d.err = err return fmt.Errorf("writing obsolete CPU affinity: %w", err) } if _, err := d.w.WriteUint64(0); err != nil { + d.err = err return fmt.Errorf("writing obsolete reserveSpace: %w", err) } return d.processStderr(nil, nil, nil) } +func (d *Daemon) IsValidPath(at *ActivityTracker, storePath string) (bool, error) { + al := at.StartAction(fmt.Sprintf("IsValidPath(%q)", storePath)) + defer al.Close() + + d.mu.Lock() + defer d.mu.Unlock() + + if _, err := d.writeOp(WopIsValidPath); err != nil { + d.err = err + return false, fmt.Errorf("writing worker op WopIsValidPath: %w", err) + } + if _, err := d.w.WriteString(storePath); err != nil { + d.err = err + return false, fmt.Errorf("writing store path %v: %w", storePath, err) + } + + if err := d.processStderr(al, nil, nil); err != nil { + return false, fmt.Errorf("reading stderr from WopIsValidPath: %w", err) + } + + validInt, err := d.r.ReadUint64() + if err != nil { + d.err = err + return false, fmt.Errorf("reading path validity: %w", err) + } + return validInt != 0, nil + +} + func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error { al := at.StartAction(fmt.Sprintf("EnsurePath(%q)", storePath)) defer al.Close() @@ -335,10 +539,12 @@ func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error { d.mu.Lock() defer d.mu.Unlock() - if _, err := d.w.WriteUint64(WopEnsurePath); err != nil { + if _, err := d.writeOp(WopEnsurePath); err != nil { + d.err = err return fmt.Errorf("writing worker op WopEnsurePath: %w", err) } if _, err := d.w.WriteString(storePath); err != nil { + d.err = err return fmt.Errorf("writing store path %v: %w", storePath, err) } @@ -348,10 +554,11 @@ func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error { validInt, err := d.r.ReadUint64() if err != nil { + d.err = err return fmt.Errorf("reading path validity: %w", err) } if validInt != 1 { - return fmt.Errorf("path validity was %d; wanted 1", validInt) + return PathNotValidError{storePath} } return nil } @@ -422,16 +629,20 @@ func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, der d.mu.Lock() defer d.mu.Unlock() - if _, err := d.w.WriteUint64(WopBuildDerivation); err != nil { + if _, err := d.writeOp(WopBuildDerivation); err != nil { + d.err = err return BRSMiscFailure, fmt.Errorf("writing worker op WopBuildDerivation: %w", err) } if _, err := d.w.WriteString(derivationPath); err != nil { + d.err = err return BRSMiscFailure, fmt.Errorf("writing derivation store path %v: %w", derivationPath, err) } if _, err := d.w.WriteDerivation(derivation); err != nil { + d.err = err return BRSMiscFailure, fmt.Errorf("writing derivation content of %v: %w", derivationPath, err) } if _, err := d.w.WriteUint64(uint64(buildMode)); err != nil { + d.err = err return BRSMiscFailure, fmt.Errorf("writing build mode %v: %w", buildMode, err) } @@ -441,11 +652,13 @@ func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, der buildStatusU64, err := d.r.ReadUint64() if err != nil { + d.err = err return BRSMiscFailure, fmt.Errorf("reading build status code: %w", err) } buildStatus := BuildResultStatus(buildStatusU64) buildErrorMsg, err := d.r.ReadString() if err != nil { + d.err = err return BRSMiscFailure, fmt.Errorf("reading build error message: %w", err) } @@ -456,23 +669,224 @@ func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, der return buildStatus, nil } +func (d *Daemon) writeNarInfo(ni *narinfo.NarInfo, w *nixwire.Serializer) error { + + storeRoot := path.Dir(ni.StorePath) + + if _, err := w.WriteString(ni.StorePath); err != nil { + d.err = err + return fmt.Errorf("writing store path: %w", err) + } + var deriverPath string + if ni.Deriver != "" { + deriverPath = path.Join(storeRoot, ni.Deriver) + } + if _, err := w.WriteString(deriverPath); err != nil { + d.err = err + return fmt.Errorf("writing deriver: %w", err) + } + if _, err := w.WriteString(hex.EncodeToString(ni.NarHash.Hash)); err != nil { + d.err = err + return fmt.Errorf("writing NAR hash: %w", err) + } + rootedRefs := make([]string, len(ni.References)) + for n, ref := range ni.References { + rootedRefs[n] = path.Join(storeRoot, ref) + } + if _, err := w.WriteStrings(rootedRefs); err != nil { + d.err = err + return fmt.Errorf("writing references: %w", err) + } + if _, err := w.WriteUint64(0); err != nil { + d.err = err + return fmt.Errorf("writing dummy registration time: %w", err) + } + if _, err := w.WriteUint64(ni.NarSize); err != nil { + d.err = err + return fmt.Errorf("writing NAR size: %w", err) + } + if _, err := w.WriteUint64(1); err != nil { + d.err = err + return fmt.Errorf("writing ultimate (true): %w", err) + } + if _, err := w.WriteStrings(ni.Sigs()); err != nil { + d.err = err + return fmt.Errorf("writing signatures: %w", err) + } + if _, err := w.WriteString(ni.CA); err != nil { + d.err = err + return fmt.Errorf("writing content-addressable: %w", err) + } + return nil +} + +type bufferingReader struct { + r io.Reader +} + +func (r *bufferingReader) Read(p []byte) (int, error) { + pos := 0 + for { + n, err := r.r.Read(p[pos:]) + pos += n + if err != nil { + if pos > 0 && errors.Is(err, io.EOF) { + return pos, nil + } + return pos, err + } else if pos >= len(p) { + break + } + } + return pos, nil +} + +func (d *Daemon) AddToStoreNar(ni *narinfo.NarInfo, r io.Reader) error { + d.mu.Lock() + defer d.mu.Unlock() + + var preBuffer []byte + + if d.peerMinorVersion >= 32 { + if _, err := d.writeOp(WopAddMultipleToStore); err != nil { + d.err = err + return fmt.Errorf("writing worker op WopAddMultipleToStore: %w", err) + } + if _, err := d.w.WriteUint64(0); err != nil { + d.err = err + return fmt.Errorf("writing repair (false): %w", err) + } + if _, err := d.w.WriteUint64(1); err != nil { + d.err = err + return fmt.Errorf("writing dont-check-sigs (true): %w", err) + } + buf := new(bytes.Buffer) + bufs := &nixwire.Serializer{Writer: buf} + if _, err := bufs.WriteUint64(1); err != nil { + d.err = err + return fmt.Errorf("writing count (1): %w", err) + } + if err := d.writeNarInfo(ni, bufs); err != nil { + return fmt.Errorf("buffering NAR info: %w", err) + } + preBuffer = buf.Bytes() + } else { + if _, err := d.writeOp(WopAddToStoreNar); err != nil { + d.err = err + return fmt.Errorf("writing worker op WopAddToStoreNar: %w", err) + } + if err := d.writeNarInfo(ni, d.w); err != nil { + return fmt.Errorf("writing NAR info: %w", err) + } + if _, err := d.w.WriteUint64(0); err != nil { + d.err = err + return fmt.Errorf("writing repair (false): %w", err) + } + if _, err := d.w.WriteUint64(1); err != nil { + d.err = err + return fmt.Errorf("writing dont-check-sigs (true): %w", err) + } + } + + if d.peerMinorVersion >= 23 { + // FramedSink + errCh := make(chan error) + mr := &bufferingReader{io.MultiReader(bytes.NewReader(preBuffer), r)} + go func() { + defer close(errCh) + buf := make([]byte, MaxBuf) + for { + n, err := mr.Read(buf) + if errors.Is(err, io.EOF) { + break + } else if err != nil { + err = fmt.Errorf("reading: %w", err) + d.err = err + errCh <- err + return + } + if _, err := d.w.WriteBytes(buf[:n]); err != nil { + err = fmt.Errorf("writing payload: %w", err) + d.err = err + errCh <- err + return + } + } + if _, err := d.w.WriteUint64(0); err != nil { + err = fmt.Errorf("sending framed EOF: %w", err) + d.err = err + errCh <- err + return + } + }() + if err := d.processStderr(nil, nil, nil); err != nil { + return fmt.Errorf("reading (framed) stderr from WopAddToStoreNar: %w", err) + } + return <-errCh + } else { + if err := d.processStderr(nil, nil, r); err != nil { + return fmt.Errorf("reading stderr from WopAddToStoreNar: %w", err) + } + } + return nil +} + +func (d *Daemon) QueryValidDerivers(path string) ([]string, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, err := d.writeOp(WopQueryValidDerivers); err != nil { + d.err = err + return nil, fmt.Errorf("writing worker op WopQueryValidDerivers: %w", err) + } + if _, err := d.w.WriteString(path); err != nil { + d.err = err + return nil, fmt.Errorf("writing store path: %w", err) + } + + if err := d.processStderr(nil, nil, nil); err != nil { + return nil, fmt.Errorf("reading stderr from WopQueryValidDerivers: %w", err) + } + + drvPaths, err := d.r.ReadStrings() + if err != nil { + d.err = err + return nil, fmt.Errorf("reading deriver paths: %w", err) + } + return drvPaths, nil +} + func OpenDaemon(path string) (*Daemon, error) { conn, err := net.Dial("unix", path) if err != nil { return nil, fmt.Errorf("dialing %v: %w", path, err) } - d := &Daemon{conn: conn, w: &nixwire.Serializer{conn}, r: &nixwire.Deserializer{conn}} - if err := d.hello(); err != nil { - d.Close() - return nil, fmt.Errorf("sending hello to %v: %w", path, err) - } + return OpenDaemonWithIOs(conn, conn, conn) +} - return d, nil +type hexDumpingWriter struct { + w io.Writer + enabled bool +} + +func (w *hexDumpingWriter) Write(p []byte) (int, error) { + n, err := w.w.Write(p) + if err != nil { + return 0, err + } + if w.enabled { + log.Printf(">>\n>> %s", strings.TrimSuffix(strings.ReplaceAll(hex.Dump(p), "\n", "\n>> "), "\n>> ")) + } + return n, nil } func OpenDaemonWithIOs(r io.Reader, w io.Writer, c io.Closer) (*Daemon, error) { - d := &Daemon{conn: c, w: &nixwire.Serializer{w}, r: &nixwire.Deserializer{r}} + d := &Daemon{ + conn: c, + w: &nixwire.Serializer{Writer: &hexDumpingWriter{w: w}}, + r: &nixwire.Deserializer{Reader: r}, + } if err := d.hello(); err != nil { d.Close() return nil, fmt.Errorf("sending hello: %w", err) diff --git a/third_party/gopkgs/golang.org/x/crypto/default.nix b/third_party/gopkgs/golang.org/x/crypto/default.nix index 3029e66d5a..1f734b9b84 100644 --- a/third_party/gopkgs/golang.org/x/crypto/default.nix +++ b/third_party/gopkgs/golang.org/x/crypto/default.nix @@ -8,8 +8,8 @@ depot.third_party.buildGo.external { src = depot.third_party.nixpkgs.fetchFromGitHub { owner = "golang"; repo = "crypto"; - rev = "7f63de1d35b0f77fa2b9faea3e7deb402a2383c8"; - sha256 = "1dr89jfs4dmpr3jqfshqqvfpzzdx4r76nkzhrvmixfrmn6wxrnd1"; + rev = "b4ddeeda5bc71549846db71ba23e83ecb26f36ed"; + sha256 = "00cg67w0n01a64fc4kqg5j7r47fx5y9vyqlanwb60513dv6lzacs"; }; deps = with depot.third_party; [ gopkgs."golang.org".x.sys.cpu