go/nix: implement more of nixbuild

This commit is contained in:
Luke Granger-Brown 2023-08-23 23:00:44 +00:00
parent bb084d5aab
commit 3ab9b43f72
37 changed files with 2581 additions and 252 deletions

3
go.work Normal file
View file

@ -0,0 +1,3 @@
go 1.20
use ./go

4
go.work.sum Normal file
View file

@ -0,0 +1,4 @@
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=

View file

@ -23,10 +23,10 @@ require (
github.com/pomerium/sdk-go v0.0.5
github.com/prometheus/client_golang v1.12.1
gocloud.dev v0.24.0
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5
golang.org/x/crypto v0.12.0
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
golang.org/x/sys v0.11.0
)
require (
@ -96,8 +96,8 @@ require (
github.com/ryanuber/go-glob v1.0.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.56.0 // indirect

View file

@ -651,6 +651,8 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -733,6 +735,8 @@ golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw=
golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -831,9 +835,12 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -844,6 +851,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

View file

@ -5,17 +5,13 @@ import (
"errors"
"flag"
"fmt"
"io"
"log"
"math/rand"
"net/url"
"os"
"sync"
"time"
"golang.org/x/crypto/ssh"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixpool"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
@ -54,84 +50,6 @@ func tryEnsure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTra
return false, err
}
type connectionPool struct {
factory func() (*nixstore.Daemon, error)
cond *sync.Cond
limit int
generated int
available []*nixstore.Daemon
}
func newConnectionPool(ctx context.Context, dst string, limit int) *connectionPool {
return &connectionPool{
factory: func() (*nixstore.Daemon, error) {
var d *nixstore.Daemon
var err error
retryInterval := 10 * time.Millisecond
maxRetry := 10 * time.Second
for n := 0; n < 20; n++ {
d, _, err = connectTo(ctx, dst)
if err == nil {
break
}
time.Sleep(retryInterval + time.Duration((rand.Float64()-0.5)*float64(retryInterval)))
retryInterval = retryInterval * 2
if retryInterval > maxRetry {
retryInterval = maxRetry
}
}
return d, err
},
cond: sync.NewCond(new(sync.Mutex)),
limit: limit,
}
}
func (p *connectionPool) getFromAvailableLocked() *nixstore.Daemon {
end := len(p.available) - 1
d := p.available[end]
p.available = p.available[:end]
return d
}
func (p *connectionPool) Get() (*nixstore.Daemon, error) {
p.cond.L.Lock()
if len(p.available) > 0 {
d := p.getFromAvailableLocked()
p.cond.L.Unlock()
return d, nil
}
if p.generated < p.limit {
p.generated++
p.cond.L.Unlock()
d, err := p.factory()
if err != nil {
p.cond.L.Lock()
p.generated--
p.cond.L.Unlock()
return nil, err
}
return d, nil
}
for len(p.available) == 0 {
p.cond.Wait()
}
d := p.getFromAvailableLocked()
p.cond.L.Unlock()
return d, nil
}
func (p *connectionPool) Put(d *nixstore.Daemon) {
p.cond.L.Lock()
p.available = append(p.available, d)
p.cond.Signal()
p.cond.L.Unlock()
}
type workState int
const (
@ -167,9 +85,9 @@ type workItem struct {
narInfo *narinfo.NarInfo
childWork []*workItem
srcPool *connectionPool
srcPool *nixpool.Pool
srcAT *nixstore.ActivityTracker
dstPool *connectionPool
dstPool *nixpool.Pool
dstAT *nixstore.ActivityTracker
wh *workHolder
@ -309,9 +227,9 @@ type workHolder struct {
mu sync.Mutex
work map[string]*workItem
srcPool *connectionPool
srcPool *nixpool.Pool
srcAT *nixstore.ActivityTracker
dstPool *connectionPool
dstPool *nixpool.Pool
dstAT *nixstore.ActivityTracker
}
@ -339,7 +257,7 @@ func (wh *workHolder) addWork(ctx context.Context, path string) *workItem {
return i
}
func copyPath(ctx context.Context, poolFrom *connectionPool, atFrom *nixstore.ActivityTracker, poolTo *connectionPool, atTo *nixstore.ActivityTracker, path string) error {
func copyPath(ctx context.Context, poolFrom *nixpool.Pool, atFrom *nixstore.ActivityTracker, poolTo *nixpool.Pool, atTo *nixstore.ActivityTracker, path string) error {
wh := workHolder{
work: make(map[string]*workItem),
srcPool: poolFrom,
@ -355,7 +273,7 @@ func copyPath(ctx context.Context, poolFrom *connectionPool, atFrom *nixstore.Ac
var rs nixdrv.Resolver
func loadDerivation(ctx context.Context, path string) (*nixdrv.Derivation, error) {
return rs.LoadDerivation(path)
return rs.LoadDerivation(ctx, path)
}
func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error {
@ -364,7 +282,7 @@ func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.Activ
return fmt.Errorf("loading derivation: %w", err)
}
basicDrv, err := drv.ToBasicDerivation(rs)
basicDrv, err := drv.ToBasicDerivation(ctx, rs)
if err != nil {
return fmt.Errorf("resolving: %w", err)
}
@ -377,115 +295,6 @@ func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.Activ
return nil
}
func connectTo(ctx context.Context, remote string) (*nixstore.Daemon, nixdrv.Resolver, error) {
u, err := url.Parse(remote)
if err != nil {
return nil, nil, fmt.Errorf("parsing remote %q as URL: %w", remote, err)
}
switch u.Scheme {
case "unix":
if u.Path == "" {
u.Path = nixstore.DaemonSock
}
d, err := nixstore.OpenDaemon(u.Path)
if err != nil {
return nil, nil, err
}
return d, nixdrv.LocalFSResolver{}, nil
case "ssh-ng":
// Construct a ClientConfig from the URL.
cfg := &ssh.ClientConfig{}
if u.Query().Has("privkey") {
var keys []ssh.Signer
for _, privkeyPath := range u.Query()["privkey"] {
privkeyF, err := os.Open(privkeyPath)
if err != nil {
return nil, nil, fmt.Errorf("opening privkey %q: %w", privkeyPath, err)
}
defer privkeyF.Close()
privkeyB, err := io.ReadAll(privkeyF)
if err != nil {
return nil, nil, fmt.Errorf("reading privkey %q: %w", privkeyPath, err)
}
privkey, err := ssh.ParsePrivateKey(privkeyB)
if err != nil {
return nil, nil, fmt.Errorf("parsing privkey %q: %w", privkeyPath, err)
}
keys = append(keys, privkey)
}
cfg.Auth = append(cfg.Auth, ssh.PublicKeys(keys...))
}
if u.User != nil {
cfg.User = u.User.Username()
if pw, ok := u.User.Password(); ok {
cfg.Auth = append(cfg.Auth, ssh.Password(pw))
}
}
switch {
case u.Query().Has("host-key"):
hkStr := u.Query().Get("host-key")
_, _, hk, _, _, err := ssh.ParseKnownHosts(append([]byte("x "), []byte(hkStr)...))
if err != nil {
return nil, nil, fmt.Errorf("parsing host-key %q: %w", hkStr, err)
}
cfg.HostKeyCallback = ssh.FixedHostKey(hk)
case u.Query().Has("insecure-allow-any-ssh-host-key"):
cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
default:
return nil, nil, fmt.Errorf("some SSH host key configuration is required (?host-key=; ?insecure-allow-any-ssh-host-key)")
}
// Work out other misc parameters.
// ...remote command.
remoteCmd := "nix-daemon --stdio"
if u.Query().Has("remote-cmd") {
remoteCmd = u.Query().Get("remote-cmd")
}
// Work out the host:port to connect to.
remote := u.Hostname()
if portStr := u.Port(); portStr != "" {
remote = remote + ":" + portStr
} else {
remote = remote + ":22"
}
conn, err := ssh.Dial("tcp", remote, cfg)
if err != nil {
return nil, nil, fmt.Errorf("dialing %v via SSH: %w", remote, err)
}
sess, err := conn.NewSession()
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("opening SSH session to %v: %w", remote, err)
}
stdin, err := sess.StdinPipe()
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("opening stdin pipe: %w", err)
}
stdout, err := sess.StdoutPipe()
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("opening stdout pipe: %w", err)
}
if err := sess.Start(remoteCmd); err != nil {
conn.Close()
return nil, nil, fmt.Errorf("starting %q: %w", remoteCmd, err)
}
d, err := nixstore.OpenDaemonWithIOs(stdout, stdin, conn)
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("establishing connection to daemon: %w", err)
}
return d, nixdrv.LocalFSResolver{}, nil // TODO: change from LocalFSResolver?
default:
return nil, nil, fmt.Errorf("unknown remote %q", remote)
}
}
func main() {
flag.Parse()
@ -541,9 +350,18 @@ func main() {
if flag.NArg() != 2 {
badCall("`copy` needs a store path")
}
poolSrc := newConnectionPool(ctx, *remoteFlag, *maxConnsFlag)
factorySrc, err := nixpool.DaemonDialer(ctx, *remoteFlag)
if err != nil {
log.Fatalf("creating dialer for src: %v", err)
}
poolSrc := nixpool.New(ctx, factorySrc, *maxConnsFlag)
atSrc := nixstore.NewActivityTracker()
poolDst := newConnectionPool(ctx, *remote2Flag, *maxConnsFlag)
factoryDst, err := nixpool.DaemonDialer(ctx, *remote2Flag)
if err != nil {
log.Fatalf("creating dialer for dst: %v", err)
}
poolDst := nixpool.New(ctx, factoryDst, *maxConnsFlag)
atDst := nixstore.NewActivityTracker()
if err := copyPath(ctx, poolSrc, atSrc, poolDst, atDst, flag.Arg(1)); err != nil {
@ -556,12 +374,16 @@ func main() {
at := nixstore.NewActivityTracker()
d, rss, err := connectTo(ctx, *remoteFlag)
factory, err := nixpool.DaemonDialer(ctx, *remoteFlag)
if err != nil {
log.Fatalf("connectTo(%q): %v", *remoteFlag, err)
log.Fatalf("DaemonDialer(%q): %v", *remoteFlag, err)
}
d, err := factory()
if err != nil {
log.Fatalf("connecting to %q: %v", *remoteFlag, err)
}
defer d.Close()
rs = rss
rs = nixdrv.LocalFSResolver{}
if err := cmd(ctx, d, at); err != nil {
log.Fatalf("%s: %s", flag.Arg(0), err)

View file

@ -9,6 +9,9 @@ depot.third_party.buildGo.program {
./bnix.go
];
deps = with depot; [
go.nix.nar.narinfo
go.nix.nixdrv
go.nix.nixstore
go.nix.nixpool
];
}

View file

@ -0,0 +1,170 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"flag"
"log"
"strings"
"sync"
"hg.lukegb.com/lukegb/depot/go/nix/nixbuild"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixpool"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
type remoteDefinition struct {
URL string
PermittedResolutions int
PermittedBuilds int
SupportedPlatforms map[string]bool
}
var (
debugFlag = flag.Bool("b", false, "debug")
)
func bmain() {
ctx := context.Background()
remote := remoteDefinition{
URL: "ssh-ng://lukegb@eu.nixbuild.net?insecure-allow-any-ssh-host-key=true&privkey=/var/lib/secrets/id_ed25519_nixbuild/secret",
PermittedResolutions: 100,
PermittedBuilds: 100,
SupportedPlatforms: map[string]bool{"x86_64-linux": true, "aarch64-linux": true},
}
local := remoteDefinition{
URL: "unix://",
PermittedResolutions: 100,
PermittedBuilds: 100,
}
// d = remoteDefinition{
// URL: "ssh-ng://lukegb@whitby.tvl.fyi?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519",
// PermittedResolutions: 64,
// PermittedBuilds: 64,
// SupportedPlatforms: map[string]bool{"x86_64-linux": true},
// }
remoteFactory, err := nixpool.DaemonDialer(ctx, remote.URL)
if err != nil {
log.Fatalf("creating dialer for %v: %v", remote.URL, err)
}
remoteConn, err := remoteFactory()
if err != nil {
log.Fatalf("dialing %v: %v", remote.URL, err)
}
defer remoteConn.Close()
localFactory, err := nixpool.DaemonDialer(ctx, local.URL)
if err != nil {
log.Fatalf("creating dialer for %v: %v", local.URL, err)
}
localConn, err := localFactory()
if err != nil {
log.Fatalf("dialing %v: %v", local.URL, err)
}
defer localConn.Close()
const storePath = "/nix/store/pl5izkcgln1kvy8hv1850888my0b80qs-golib-golang.org_x_crypto_ed25519"
ni, err := localConn.NARInfo(storePath)
if err != nil {
log.Fatalf("localConn.NARInfo(%q): %v", storePath, err)
}
log.Println(ni, err)
rc, err := localConn.NARFromPath(storePath)
if err != nil {
log.Fatalf("localConn.NARFromPath(%q): %v", storePath, err)
}
if err := remoteConn.AddToStoreNar(ni, rc); err != nil {
log.Fatalf("remoteConn.AddToStoreNar: %v", err)
}
rc.Close()
}
func main() {
flag.Parse()
if *debugFlag {
bmain()
}
ctx := context.Background()
remoteDefs := []remoteDefinition{{
URL: "unix://",
PermittedResolutions: 64,
PermittedBuilds: 0,
SupportedPlatforms: map[string]bool{"x86_64-linux": true},
}, {
// URL: "ssh-ng://lukegb@whitby.tvl.fyi?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519",
// PermittedResolutions: 64,
// PermittedBuilds: 64,
// SupportedPlatforms: map[string]bool{"x86_64-linux": true},
// }, {
URL: "ssh-ng://lukegb@eu.nixbuild.net?insecure-allow-any-ssh-host-key=true&privkey=/var/lib/secrets/id_ed25519_nixbuild/secret",
PermittedResolutions: 100,
PermittedBuilds: 100,
SupportedPlatforms: map[string]bool{"x86_64-linux": true, "aarch64-linux": true},
}}
var builders []*nixbuild.Builder
var resolvers []nixbuild.Resolver
for _, d := range remoteDefs {
factory, err := nixpool.DaemonDialer(ctx, d.URL)
if err != nil {
log.Fatalf("creating dialer for %v: %w", d.URL, err)
}
if d.PermittedResolutions > 0 {
resolverPool := nixpool.New(ctx, factory, d.PermittedResolutions)
resolvers = append(resolvers, nixbuild.PeerResolver{PeerFetcher: nixbuild.PeerFetcher{Pool: resolverPool}})
}
if d.PermittedBuilds > 0 {
builderPool := nixpool.New(ctx, factory, d.PermittedBuilds)
builders = append(builders, &nixbuild.Builder{Pool: builderPool, SupportedPlatforms: d.SupportedPlatforms})
}
}
target := &nixbuild.PeerTarget{
PeerFetcher: resolvers[0].(nixbuild.PeerResolver).PeerFetcher,
ActivityTracker: nixstore.NewActivityTracker(),
}
cfg := nixbuild.Config{
Target: target,
ResolveOnTarget: true,
Resolvers: resolvers,
Builders: builders,
ResolveDependenciesOnBuilders: false,
}
at := nixstore.NewActivityTracker()
coord := nixbuild.NewCoordinator(cfg, at)
localResolver := nixdrv.LocalFSResolver{}
var wg sync.WaitGroup
for _, arg := range flag.Args() {
arg := arg
wg.Add(1)
go func() {
defer wg.Done()
var wi *nixbuild.WorkItem
if strings.HasSuffix(arg, ".drv") {
drv, err := localResolver.LoadDerivation(ctx, arg)
if err != nil {
log.Fatalf("LoadDerivation(%q): %v", arg, err)
}
bd, err := drv.ToBasicDerivation(ctx, localResolver)
if err != nil {
log.Fatalf("drv.ToBasicDerivation(): %v", err)
}
wi = coord.AddDerivationWork(ctx, bd, arg)
} else {
wi = coord.AddWork(ctx, arg)
}
<-wi.Done()
}()
}
wg.Wait()
}

View file

@ -0,0 +1,16 @@
# SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.program {
name = "bnixbuild";
srcs = [
./bnixbuild.go
];
deps = with depot; [
go.nix.nixbuild
go.nix.nixpool
go.nix.nixstore
];
}

View file

@ -1,14 +1,18 @@
# SPDX-FileCopyrightText: 2020 Luke Granger-Brown <depot@lukegb.com>
# SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
args:
{
nar = import ./nar args;
nixbuild = import ./nixbuild args;
nixdrv = import ./nixdrv args;
nixhash = import ./nixhash args;
nixpool = import ./nixpool args;
nixstore = import ./nixstore args;
nixwire = import ./nixwire args;
bcachegc = import ./bcachegc args;
bcacheup = import ./bcacheup args;
bnix = import ./bnix args;
bnixbuild = import ./bnixbuild args;
}

View file

@ -7,8 +7,10 @@
name = "nar";
path = "hg.lukegb.com/lukegb/depot/go/nix/nar";
srcs = [
./nar.go
./dirfs.go
./inmemoryfs.go
./narpacker.go
./narunpacker.go
];
deps = with depot; [
go.nix.nixwire

59
go/nix/nar/inmemoryfs.go Normal file
View file

@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nar
import "os"
type InmemoryDirent struct {
SubFS *InmemoryFS
Content []byte
Executable bool
Target string
}
func (dent *InmemoryDirent) Close() error { return nil }
func (dent *InmemoryDirent) Write(bs []byte) (int, error) {
dent.Content = append(dent.Content, bs...)
return len(bs), nil
}
func (dent *InmemoryDirent) MakeExecutable() error {
dent.Executable = true
return nil
}
type InmemoryFS struct {
Dirent map[string]*InmemoryDirent
}
func (fs *InmemoryFS) Create(name string) (WriteFile, error) {
if _, ok := fs.Dirent[name]; ok {
return nil, os.ErrExist
}
dent := &InmemoryDirent{}
fs.Dirent[name] = dent
return dent, nil
}
func (fs *InmemoryFS) Symlink(name, target string) error {
if _, ok := fs.Dirent[name]; ok {
return os.ErrExist
}
dent := &InmemoryDirent{Target: target}
fs.Dirent[name] = dent
return nil
}
func (fs *InmemoryFS) Mkdir(name string) (WriteFS, error) {
if _, ok := fs.Dirent[name]; ok {
return nil, os.ErrExist
}
dent := &InmemoryDirent{SubFS: NewInmemoryFS()}
fs.Dirent[name] = dent
return dent.SubFS, nil
}
func NewInmemoryFS() *InmemoryFS {
return &InmemoryFS{Dirent: make(map[string]*InmemoryDirent)}
}

View file

@ -52,6 +52,22 @@ func (c CompressionMethod) String() string {
return "!!unknown!!"
}
func (c CompressionMethod) FileSuffix() (string, error) {
switch c {
case CompressionNone:
return "", nil
case CompressionXz:
return ".xz", nil
case CompressionBzip2:
return ".bz2", nil
case CompressionGzip:
return ".gz", nil
case CompressionBrotli:
return ".br", nil
}
return "", fmt.Errorf("bad compression method %d", int(c))
}
func compressionFromString(s string) CompressionMethod {
switch s {
case "none", "":
@ -210,6 +226,20 @@ type NarInfo struct {
CA string
}
func (ni NarInfo) Sigs() []string {
sigKeys := make([]string, 0, len(ni.Sig))
for k := range ni.Sig {
sigKeys = append(sigKeys, k)
}
sort.Strings(sigKeys)
sigs := make([]string, 0, len(sigKeys))
for _, k := range sigKeys {
v := ni.Sig[k]
sigs = append(sigs, fmt.Sprintf("%s:%s", k, base64.StdEncoding.EncodeToString(v)))
}
return sigs
}
func (ni NarInfo) WriteTo(w io.Writer) error {
if ni.StorePath != "" {
if _, err := fmt.Fprintf(w, "StorePath: %s\n", ni.StorePath); err != nil {
@ -262,14 +292,9 @@ func (ni NarInfo) WriteTo(w io.Writer) error {
}
}
if len(ni.Sig) != 0 {
sigKeys := make([]string, 0, len(ni.Sig))
for k := range ni.Sig {
sigKeys = append(sigKeys, k)
}
sort.Strings(sigKeys)
for _, k := range sigKeys {
v := ni.Sig[k]
if _, err := fmt.Fprintf(w, "Sig: %s:%s\n", k, base64.StdEncoding.EncodeToString(v)); err != nil {
sigs := ni.Sigs()
for _, sig := range sigs {
if _, err := fmt.Fprintf(w, "Sig: %s\n", sig); err != nil {
return err
}
}

View file

@ -18,7 +18,6 @@ type FS interface {
}
func packFile(sw nixwire.Serializer, root FS, fn string, stat fs.FileInfo) (int64, error) {
var nSoFar int64
write := func(data ...any) (int64, error) {
for _, datum := range data {
@ -146,7 +145,7 @@ func packFile(sw nixwire.Serializer, root FS, fn string, stat fs.FileInfo) (int6
}
func Pack(w io.Writer, fs FS, fn string) (int64, error) {
sw := nixwire.Serializer{w}
sw := nixwire.Serializer{Writer: w}
n, err := sw.WriteString("nix-archive-1")
if err != nil {

View file

@ -7,12 +7,13 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"hg.lukegb.com/lukegb/depot/go/nix/nixwire"
)
func TestHeader(t *testing.T) {
var b bytes.Buffer
sw := serializeWriter{&b}
sw := nixwire.Serializer{Writer: &b}
wrote, err := sw.WriteString("nix-archive-1")
if err != nil {
t.Fatalf("WriteString: %v", err)

181
go/nix/nar/narunpacker.go Normal file
View file

@ -0,0 +1,181 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nar
import (
"bytes"
"fmt"
"io"
"strings"
"hg.lukegb.com/lukegb/depot/go/nix/nixwire"
)
type WriteFile interface {
io.WriteCloser
MakeExecutable() error
}
type WriteFS interface {
Create(name string) (WriteFile, error)
Mkdir(name string) (WriteFS, error)
Symlink(name, target string) error
}
type NullFS struct{}
func (NullFS) Close() error { return nil }
func (NullFS) MakeExecutable() error { return nil }
func (NullFS) Write(p []byte) (n int, err error) { return len(p), nil }
func (f NullFS) Create(name string) (WriteFile, error) { return f, nil }
func (f NullFS) Mkdir(name string) (WriteFS, error) { return f, nil }
func (NullFS) Symlink(name string, target string) error { return nil }
func unpackFile(dw nixwire.Deserializer, fs WriteFS, fn string) error {
s, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading file tuple open: %w", err)
}
if s != "(" {
return fmt.Errorf("expected file tuple open '('; got %q", s)
}
var fileType string
var regularFile WriteFile
var directoryFS WriteFS
var directoryLastName string
parseLoop:
for {
s, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading next file tuple part: %w", err)
}
switch {
case s == ")":
break parseLoop
case s == "type":
if fileType != "" {
return fmt.Errorf("multiple file types for %s", fileType)
}
t, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading file type: %w", err)
}
switch t {
case "regular":
regularFile, err = fs.Create(fn)
if err != nil {
return fmt.Errorf("creating file %v: %w", fn, err)
}
case "directory":
directoryFS, err = fs.Mkdir(fn)
if err != nil {
return fmt.Errorf("creating directory %v: %w", fn, err)
}
case "symlink":
// Do nothing until we have a target.
default:
return fmt.Errorf("bad file type %q; want regular/directory/symlink", t)
}
fileType = t
case s == "contents" && fileType == "regular":
size, err := dw.ReadUint64()
if err != nil {
return fmt.Errorf("reading file size: %w", err)
}
if _, err := io.CopyN(regularFile, dw.Reader, int64(size)); err != nil {
return fmt.Errorf("reading file content: %w", err)
}
if err := dw.ReadPadding(size); err != nil {
return fmt.Errorf("reading file padding: %w", err)
}
case s == "executable" && fileType == "regular":
v, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading executable emptystring: %w", err)
}
if v != "" {
return fmt.Errorf("executable emptystring is not empty: %v", v)
}
if err := regularFile.MakeExecutable(); err != nil {
return fmt.Errorf("making target file %v executable: %w", fn, err)
}
case s == "entry" && fileType == "directory":
// TODO
dirOpen, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading dirent open: %w", err)
}
if dirOpen != "(" {
return fmt.Errorf("dirent open was %q; wanted (", dirOpen)
}
var name string
direntLoop:
for {
s, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading next dirent tuple part: %w", err)
}
switch s {
case ")":
break direntLoop
case "name":
name, err = dw.ReadString()
if err != nil {
return fmt.Errorf("reading dirent name: %w", err)
}
switch {
case name == "", name == ".", name == "..", strings.Contains(name, "/"), bytes.Contains([]byte(name), []byte{0}):
return fmt.Errorf("invalid file name %q", name)
case name <= directoryLastName:
return fmt.Errorf("NAR not sorted (%q came after %q)", name, directoryLastName)
}
directoryLastName = name
case "node":
if name == "" {
return fmt.Errorf("entry name must come before node")
}
if err := unpackFile(dw, directoryFS, name); err != nil {
return fmt.Errorf("%s: %w", fn, err)
}
default:
return fmt.Errorf("%s: unknown field %s", fn, s)
}
}
case s == "target" && fileType == "symlink":
target, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading symlink target: %w", err)
}
if err := fs.Symlink(fn, target); err != nil {
return fmt.Errorf("creating symlink %q to %q: %w", fn, target, err)
}
default:
return fmt.Errorf("unknown field %v (filetype %q, filename %q)", s, fileType, fn)
}
}
if regularFile != nil {
if err := regularFile.Close(); err != nil {
return fmt.Errorf("closing file: %w", err)
}
}
return nil
}
func Unpack(r io.Reader, fs WriteFS, fn string) error {
dw := nixwire.Deserializer{Reader: r}
s, err := dw.ReadString()
if err != nil {
return fmt.Errorf("reading magic: %w", err)
}
if want := "nix-archive-1"; s != want {
return fmt.Errorf("invalid NAR magic %q (wanted %q)", s, want)
}
return unpackFile(dw, fs, fn)
}

72
go/nix/nixbuild/config.go Normal file
View file

@ -0,0 +1,72 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild
import (
"context"
"io"
"strings"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixpool"
)
type Fetcher interface {
FetchNarInfo(ctx context.Context, path string) (*narinfo.NarInfo, error)
FetchNar(ctx context.Context, ni *narinfo.NarInfo) (io.ReadCloser, error)
}
type DerivationFetcher interface {
nixdrv.Resolver
FetchDerivationForPath(ctx context.Context, path string) (*nixdrv.BasicDerivation, string, error)
}
type Resolver interface {
Fetcher
RelativePriority() int
}
type Target interface {
ValidPaths(ctx context.Context, paths []string) ([]string, error)
EnsurePaths(ctx context.Context, paths []string) ([]string, error)
PutNar(ctx context.Context, ni *narinfo.NarInfo, w io.Reader) error
Fetcher
}
type Builder struct {
Pool *nixpool.Pool
SupportedPlatforms map[string]bool
}
type Config struct {
// Target is the endpoint which should end up having all the built packages.
// If Target is nil, we will build things if necessary and leave them on the builders. Things may end up in a weird state if Target is nil and the Builders are not present in Resolvers, since built dependencies may not be available.
Target Target
// Allow target to resolve packages on its own, from its locally configured caches?
ResolveOnTarget bool
// Things to resolve packages from.
// You should probably put Target and Builders in this list.
Resolvers []Resolver
// Peers to build packages on.
// If empty, building will be disabled.
Builders []*Builder
// Allow builders to resolve dependencies on their own, from their locally configured caches?
ResolveDependenciesOnBuilders bool
// Store path. Usually /nix/store, which is the default if empty.
StorePath string
}
func (c Config) storePath() string {
if c.StorePath == "" {
return "/nix/store"
}
return strings.TrimSuffix(c.StorePath, "/")
}

View file

@ -0,0 +1,78 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild
import (
"context"
"sort"
"sync"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
type Coordinator struct {
cfg Config
mu sync.Mutex
work map[string]*WorkItem
at *nixstore.ActivityTracker
}
func NewCoordinator(cfg Config, at *nixstore.ActivityTracker) *Coordinator {
return &Coordinator{
cfg: cfg,
work: make(map[string]*WorkItem),
at: at,
}
}
func (c *Coordinator) AddDerivationWork(ctx context.Context, drv *nixdrv.BasicDerivation, drvPath string) *WorkItem {
var outputNames []string
for o := range drv.Outputs {
outputNames = append(outputNames, o)
}
sort.Strings(outputNames)
path := drv.Outputs[outputNames[0]].Path
c.mu.Lock()
defer c.mu.Unlock()
if i := c.work[path]; i != nil {
return i
}
i := &WorkItem{
doneCh: make(chan struct{}),
path: path,
coord: c,
at: c.at,
}
c.work[path] = i
go i.run(ctx)
return i
}
func (c *Coordinator) AddWork(ctx context.Context, path string) *WorkItem {
c.mu.Lock()
defer c.mu.Unlock()
if i := c.work[path]; i != nil {
return i
}
i := &WorkItem{
doneCh: make(chan struct{}),
path: path,
coord: c,
at: c.at,
}
c.work[path] = i
go i.run(ctx)
return i
}

View file

@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.package {
name = "nixbuild";
path = "hg.lukegb.com/lukegb/depot/go/nix/nixbuild";
srcs = [
./config.go
./coordinator.go
./httpresolver.go
./nixbuild.go
./peerresolver.go
./state.go
./workitem.go
];
deps = with depot; [
go.nix.nixstore
go.nix.nixpool
go.nix.nar
];
}

View file

@ -0,0 +1,106 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild
import (
"context"
"fmt"
"io"
"net/http"
"os"
"regexp"
"github.com/numtide/go-nix/nixbase32"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
)
var (
hashExtractRegexp = regexp.MustCompile(`(^|/)([0-9a-df-np-sv-z]{32})([-.].*)?$`)
)
func hashExtract(s string) string {
res := hashExtractRegexp.FindStringSubmatch(s)
if len(res) == 0 {
return ""
}
return res[2]
}
func keyForPath(storePath string) (string, error) {
fileHash := hashExtract(storePath)
if fileHash == "" {
return "", fmt.Errorf("store path %v seems to be invalid: couldn't extract hash", storePath)
}
return fmt.Sprintf("%s.narinfo", fileHash), nil
}
type HTTPCacheResolver struct {
Endpoint string
HTTPClient *http.Client
Priority int
}
var _ Resolver = ((*HTTPCacheResolver)(nil))
func (r *HTTPCacheResolver) RelativePriority() int { return r.Priority }
func (r *HTTPCacheResolver) FetchNarInfo(ctx context.Context, path string) (*narinfo.NarInfo, error) {
key, err := keyForPath(path)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%v/%v", r.Endpoint, key), nil)
if err != nil {
return nil, fmt.Errorf("constructing request for %v/%v: %v", r.Endpoint, key, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("making request for %v/%v: %v", r.Endpoint, key, err)
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNotFound:
return nil, os.ErrNotExist
case http.StatusOK:
// continue with function
default:
return nil, fmt.Errorf("%v/%v: HTTP %v", r.Endpoint, key, resp.Status)
}
return narinfo.ParseNarInfo(resp.Body)
}
func (r *HTTPCacheResolver) FetchNar(ctx context.Context, ni *narinfo.NarInfo) (io.ReadCloser, error) {
compressionSuffix, err := ni.Compression.FileSuffix()
if err != nil {
return nil, fmt.Errorf("determining filename: %w", err)
}
key := fmt.Sprintf("nar/%s.nar%s", nixbase32.EncodeToString(ni.FileHash.Hash), compressionSuffix)
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%v/%v", r.Endpoint, key), nil)
if err != nil {
return nil, fmt.Errorf("constructing request for %v/%v: %v", r.Endpoint, key, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("making request for %v/%v: %v", r.Endpoint, key, err)
}
switch resp.StatusCode {
case http.StatusNotFound:
resp.Body.Close()
return nil, os.ErrNotExist
case http.StatusOK:
return resp.Body, nil
default:
resp.Body.Close()
return nil, fmt.Errorf("downloading %v/%v: HTTP status code %v", r.Endpoint, key, resp.Status)
}
}

View file

@ -0,0 +1,5 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild

View file

@ -0,0 +1,179 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"path"
"hg.lukegb.com/lukegb/depot/go/nix/nar"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixpool"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
type PeerFetcher struct {
Pool *nixpool.Pool
}
var _ Fetcher = ((*PeerFetcher)(nil))
var _ DerivationFetcher = ((*PeerFetcher)(nil))
func (r PeerFetcher) FetchNarInfo(ctx context.Context, path string) (*narinfo.NarInfo, error) {
d, err := r.Pool.Get()
if err != nil {
return nil, err
}
defer r.Pool.Put(d)
return d.NARInfo(path)
}
type poolReturningReadCloser struct {
io.ReadCloser
pool *nixpool.Pool
d *nixstore.Daemon
}
func (prc *poolReturningReadCloser) Close() error {
err := prc.ReadCloser.Close()
prc.pool.Put(prc.d)
return err
}
func (r PeerFetcher) FetchNar(ctx context.Context, ni *narinfo.NarInfo) (io.ReadCloser, error) {
d, err := r.Pool.Get()
if err != nil {
return nil, err
}
rc, err := d.NARFromPath(ni.StorePath)
if err != nil {
return nil, err
}
return &poolReturningReadCloser{
ReadCloser: rc,
pool: r.Pool,
d: d,
}, nil
}
func (r PeerFetcher) FetchDerivationForPath(ctx context.Context, path string) (*nixdrv.BasicDerivation, string, error) {
d, err := r.Pool.Get()
if err != nil {
return nil, "", err
}
derivers, err := d.QueryValidDerivers(path)
r.Pool.Put(d)
if err != nil {
return nil, "", err
}
if len(derivers) == 0 {
return nil, "", fmt.Errorf("don't know how to build %v", path)
}
// Do we need to do something smarter than take the first derivation?
drvPath := derivers[0]
drv, err := r.LoadDerivation(ctx, drvPath)
if err != nil {
return nil, "", fmt.Errorf("fetching derivation %v for %v: %w", drvPath, path, err)
}
bd, err := drv.ToBasicDerivation(ctx, r)
if err != nil {
return nil, "", fmt.Errorf("converting %v to a basic derivation: %w", drvPath, err)
}
//drvBase := strings.TrimSuffix(drvPath, filepath.Ext(drvPath))
return bd, drvPath, nil
}
func (r PeerFetcher) LoadDerivation(ctx context.Context, drvPath string) (*nixdrv.Derivation, error) {
ni, err := r.FetchNarInfo(ctx, drvPath)
if err != nil {
return nil, fmt.Errorf("fetching narinfo for %v: %w", drvPath, err)
}
rc, err := r.FetchNar(ctx, ni)
if err != nil {
return nil, fmt.Errorf("fetching nar for %v: %w", drvPath, err)
}
defer rc.Close()
imfs := nar.NewInmemoryFS()
fn := path.Base(ni.StorePath)
if err := nar.Unpack(rc, imfs, fn); err != nil {
return nil, fmt.Errorf("unpacking nar for %v: %w", drvPath, err)
}
dent, ok := imfs.Dirent[fn]
if !ok {
return nil, fmt.Errorf("unpacking nar for %v yielded no file", drvPath)
} else if dent.Content == nil {
return nil, fmt.Errorf("unpacking nar for %v yielded non-file", drvPath)
}
return nixdrv.Load(bufio.NewReader(bytes.NewReader(dent.Content)))
}
type PeerResolver struct {
PeerFetcher
Priority int
}
var _ Resolver = ((*PeerResolver)(nil))
func (r PeerResolver) RelativePriority() int { return r.Priority }
type PeerTarget struct {
PeerFetcher
ActivityTracker *nixstore.ActivityTracker
}
var _ Target = ((*PeerTarget)(nil))
func (t *PeerTarget) EnsurePaths(ctx context.Context, paths []string) ([]string, error) {
d, err := t.Pool.Get()
if err != nil {
return nil, err
}
defer t.Pool.Put(d)
var ensuredPaths []string
for _, p := range paths {
if err := d.EnsurePath(t.ActivityTracker, p); err != nil {
var nv nixstore.PathNotValidError
if !errors.As(err, &nv) {
return nil, fmt.Errorf("ensuring %v: %w", p, err)
}
} else {
ensuredPaths = append(ensuredPaths, p)
}
}
return ensuredPaths, nil
}
func (t *PeerTarget) PutNar(ctx context.Context, ni *narinfo.NarInfo, w io.Reader) error {
d, err := t.Pool.Get()
if err != nil {
return err
}
defer t.Pool.Put(d)
return d.AddToStoreNar(ni, w)
}
func (t *PeerTarget) ValidPaths(ctx context.Context, paths []string) ([]string, error) {
d, err := t.Pool.Get()
if err != nil {
return nil, err
}
defer t.Pool.Put(d)
return d.ValidPaths(paths)
}

View file

@ -0,0 +1,62 @@
package nixbuild
import (
"context"
"testing"
"hg.lukegb.com/lukegb/depot/go/nix/nixpool"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
// DELETE ME
const remote = "ssh-ng://lukegb@localhost?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519"
// const storePath = "/nix/store/hx2qbqfjpv08w2aw6d5md6vnjwppzccb-golib-nixbuild"
const drvPath = "/nix/store/bi4dbkxdy94wk5qbqcq54rby0y0dql0j-ci.drv"
func TestPeerResolver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
factory, err := nixpool.DaemonDialer(ctx, remote)
if err != nil {
t.Fatalf("DaemonDialer(%q): %v", remote, err)
}
pool := nixpool.New(ctx, factory, 1)
builderPool := nixpool.New(ctx, factory, 1)
d, err := factory()
if err != nil {
t.Fatalf("factory: %v", err)
}
d.Close()
res := PeerResolver{PeerFetcher: PeerFetcher{pool}}
drv, err := res.LoadDerivation(ctx, drvPath)
if err != nil {
t.Fatalf("LoadDerivation(%q): %v", drvPath, err)
}
bd, err := drv.ToBasicDerivation(ctx, res)
if err != nil {
t.Fatalf("drv.ToBasicDerivation(): %v", err)
}
at := nixstore.NewActivityTracker()
cfg := Config{
Target: &PeerTarget{
PeerFetcher: PeerFetcher{
Pool: pool,
},
ActivityTracker: at,
},
ResolveOnTarget: true,
Resolvers: []Resolver{
PeerResolver{PeerFetcher: PeerFetcher{pool}},
},
Builders: []*nixpool.Pool{builderPool},
ResolveDependenciesOnBuilders: false,
}
coord := NewCoordinator(cfg, at)
//wi := coord.AddWork(ctx, storePath)
wi := coord.AddDerivationWork(ctx, bd, drvPath)
<-wi.doneCh
}

67
go/nix/nixbuild/state.go Normal file
View file

@ -0,0 +1,67 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild
type State int
const (
// Does this path (or path(s)) exist on the target already?
StateCheckingOnTarget State = iota
StateDoneAlreadyExisted
StateResolvingOnTarget
StateDoneResolvedOnTarget
// Does this path exist in any configured resolvers?
StateCheckingResolvers
StateFetchingReferencesFromResolvers
StateFetchingFromResolver
StateDoneFromResolver
// If we don't have the derivation for this path, fetch it.
// This state only matters if we only have the path and not the derivation.
StateLookingUpDerivation
StateChoosingBuilder
StateResolvingReferencesOnBuilder
StateCopyingReferencesToBuilder
StateBuildingDerivation
StateCopying
StateDoneBuilt
StateFailed
)
func (s State) String() string {
return map[State]string{
StateCheckingOnTarget: "checking if already on target",
StateDoneAlreadyExisted: "done (already on target)",
StateResolvingOnTarget: "resolving on target",
StateDoneResolvedOnTarget: "done (resolved on target)",
StateCheckingResolvers: "checking if present on any resolvers",
StateFetchingReferencesFromResolvers: "copying references from resolvers",
StateFetchingFromResolver: "copying from resolver",
StateDoneFromResolver: "done (copied from resolver)",
StateLookingUpDerivation: "looking up derivation",
StateChoosingBuilder: "picking a builder",
StateResolvingReferencesOnBuilder: "resolving build-time references on builder",
StateCopyingReferencesToBuilder: "copying build-time references to builder",
StateBuildingDerivation: "building",
StateCopying: "copying from builder",
StateDoneBuilt: "done (built)",
StateFailed: "failed :(",
}[s]
}
func (s State) Terminal() bool {
switch s {
case StateDoneAlreadyExisted, StateDoneResolvedOnTarget, StateDoneFromResolver, StateDoneBuilt, StateFailed:
return true
}
return false
}

554
go/nix/nixbuild/workitem.go Normal file
View file

@ -0,0 +1,554 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixbuild
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"path"
"sort"
"sync"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
type WorkItem struct {
doneCh chan struct{}
state State
// Either (path) or (drv, drvPath) must be set.
path string
drv *nixdrv.BasicDerivation
drvPath string
err error
cleanup []func()
// StateFetchingReferencesFromResolvers, StateFetchingFromResolver, StateDoneFromResolver
resolver Resolver
narInfos []*narinfo.NarInfo
// StateFetchingReferencesFromResolvers, StateCopyingReferencesToBuilder
childWork []*WorkItem
// StateChoosingBuilder, StateResolvingReferencesOnBuilder, StateCopyingReferencesToBuilder, StateBuildingDerivation, StateCopying, StateDoneBuilt
builder *Builder
builderDaemon *nixstore.Daemon
builderNeededReferences []string
coord *Coordinator
at *nixstore.ActivityTracker
}
func (i *WorkItem) paths() []string {
if i.path != "" {
return []string{i.path}
}
os := make([]string, 0, len(i.drv.Outputs))
for _, o := range i.drv.Outputs {
os = append(os, o.Path)
}
return os
}
func (i *WorkItem) checkOnTarget(ctx context.Context) (State, error) {
// i.state == StateCheckingOnTarget
if i.coord.cfg.Target == nil {
return StateResolvingOnTarget, nil
}
wantPaths := i.paths()
gotPaths, err := i.coord.cfg.Target.ValidPaths(ctx, wantPaths)
if err != nil {
return StateFailed, err
}
if len(wantPaths) == len(gotPaths) {
return StateDoneAlreadyExisted, nil
}
return StateResolvingOnTarget, nil
}
func (i *WorkItem) resolveOnTarget(ctx context.Context) (State, error) {
// i.state == StateResolvingOnTarget
if i.coord.cfg.Target == nil || !i.coord.cfg.ResolveOnTarget {
return StateCheckingResolvers, nil
}
wantPaths := i.paths()
resolvedPaths, err := i.coord.cfg.Target.EnsurePaths(ctx, wantPaths)
if err != nil {
var ne nixstore.NixError
if !errors.As(err, &ne) {
return StateFailed, err
}
}
if len(wantPaths) == len(resolvedPaths) {
return StateDoneResolvedOnTarget, nil
}
return StateCheckingResolvers, nil
}
func (i *WorkItem) checkResolvers(ctx context.Context) (State, error) {
// i.state == StateCheckingResolvers
if len(i.coord.cfg.Resolvers) == 0 {
return StateLookingUpDerivation, nil
}
type result struct {
r Resolver
ni []*narinfo.NarInfo
}
validResolversCh := make(chan result, len(i.coord.cfg.Resolvers))
var wg sync.WaitGroup
for _, r := range i.coord.cfg.Resolvers {
r := r
wg.Add(1)
go func() {
defer wg.Done()
res := result{r: r}
for _, p := range i.paths() {
ni, err := r.FetchNarInfo(ctx, p)
if err != nil {
// Nope.
return
}
res.ni = append(res.ni, ni)
}
validResolversCh <- res
}()
}
wg.Wait()
close(validResolversCh)
validResolvers := make([]result, 0, len(i.coord.cfg.Resolvers))
for r := range validResolversCh {
validResolvers = append(validResolvers, r)
}
if len(validResolvers) == 0 {
return StateLookingUpDerivation, nil
}
sort.Slice(validResolvers, func(i, j int) bool {
return validResolvers[i].r.RelativePriority() < validResolvers[j].r.RelativePriority()
})
i.resolver = validResolvers[0].r
i.narInfos = validResolvers[0].ni
return StateFetchingReferencesFromResolvers, nil
}
func (i *WorkItem) pathSet() map[string]bool {
s := make(map[string]bool)
for _, p := range i.paths() {
s[p] = true
}
return s
}
func (i *WorkItem) fetchReferencesFromResolvers(ctx context.Context) (State, error) {
// i.state == StateFetchingReferencesFromResolvers
// i.narInfos populated
if i.coord.cfg.Target == nil {
// Nowhere to fetch to.
return StateFetchingFromResolver, nil
}
ownPaths := i.pathSet()
refs := make(map[string]bool)
var wis []*WorkItem
sp := i.coord.cfg.storePath()
for _, ni := range i.narInfos {
for _, ref := range ni.References {
refPath := path.Join(sp, ref)
if refs[refPath] || ownPaths[refPath] {
continue
}
refs[refPath] = true
wis = append(wis, i.coord.AddWork(ctx, refPath))
}
}
if len(refs) == 0 {
// No references to fetch.
return StateFetchingFromResolver, nil
}
i.childWork = wis
if err := i.awaitChildWorkItems(ctx); err != nil {
return StateFailed, err
}
return StateFetchingFromResolver, nil
}
func (i *WorkItem) awaitChildWorkItems(ctx context.Context) error {
// i.childWork populated
for _, wi := range i.childWork {
select {
case <-ctx.Done():
return ctx.Err()
case <-wi.doneCh:
if wi.err != nil {
return fmt.Errorf("dependency %v failed: %w", wi.paths(), wi.err)
}
}
}
return nil
}
func (i *WorkItem) fetchFromResolver(ctx context.Context) (State, error) {
// i.state == StateFetchingFromResolver
// i.resolver populated
// i.narInfos populated
if i.coord.cfg.Target == nil {
// Present on a resolver and nowhere to copy to; we're done here.
return StateDoneFromResolver, nil
}
type result struct {
err error
}
resCh := make(chan result, len(i.narInfos))
var wg sync.WaitGroup
for _, ni := range i.narInfos {
ni := ni
wg.Add(1)
go func() {
defer wg.Done()
rc, err := i.resolver.FetchNar(ctx, ni)
if err != nil {
resCh <- result{err}
return
}
if err := i.coord.cfg.Target.PutNar(ctx, ni, rc); err != nil {
resCh <- result{err}
return
}
rc.Close()
resCh <- result{}
}()
}
wg.Wait()
close(resCh)
for res := range resCh {
if res.err != nil {
return StateFailed, res.err
}
}
return StateDoneFromResolver, nil
}
func (i *WorkItem) lookUpDerivation(ctx context.Context) (State, error) {
// i.state == StateLookingUpDerivation
if i.drv != nil {
return StateChoosingBuilder, nil
}
type result struct {
drv *nixdrv.BasicDerivation
drvPath string
}
var wg sync.WaitGroup
resCh := make(chan result, len(i.coord.cfg.Resolvers))
for _, res := range i.coord.cfg.Resolvers {
res, ok := res.(DerivationFetcher)
if !ok {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
drv, drvPath, err := res.FetchDerivationForPath(ctx, i.path)
if err != nil {
return
}
resCh <- result{drv, drvPath}
}()
}
wg.Wait()
close(resCh)
for res := range resCh {
i.drv = res.drv
i.drvPath = res.drvPath
return StateChoosingBuilder, nil
}
return StateFailed, fmt.Errorf("couldn't find someone who knows how to build %v", i.path)
}
func (i *WorkItem) chooseBuilder(ctx context.Context) (State, error) {
// i.state == StateChoosingBuilder
// i.drv non-empty
if len(i.coord.cfg.Builders) == 0 {
return StateFailed, fmt.Errorf("no builders configured")
}
var err error
// TODO: a better algorithm for picking a builder than "the first entry in the list" :)
type builderCandidate struct {
builder *Builder
busyness float64
}
var candidates []builderCandidate
for _, b := range i.coord.cfg.Builders {
if !b.SupportedPlatforms[i.drv.Platform] {
continue
}
candidates = append(candidates, builderCandidate{
builder: b,
busyness: b.Pool.Busyness(),
})
}
if len(candidates) == 0 {
return StateFailed, fmt.Errorf("no builders support platform %q", i.drv.Platform)
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].busyness < candidates[j].busyness
})
minBusyness := candidates[0].busyness
sameBusynessCandidates := candidates
for n, c := range candidates {
if c.busyness != minBusyness {
sameBusynessCandidates = candidates[:n]
break
}
}
rand.Shuffle(len(sameBusynessCandidates), func(i, j int) {
sameBusynessCandidates[i], sameBusynessCandidates[j] = sameBusynessCandidates[j], sameBusynessCandidates[i]
})
i.builder = sameBusynessCandidates[0].builder
i.builderDaemon, err = sameBusynessCandidates[0].builder.Pool.Get()
if err != nil {
return StateFailed, err
}
i.cleanup = append(i.cleanup, func() {
if i.builderDaemon != nil {
i.builder.Pool.Put(i.builderDaemon)
}
i.builderDaemon = nil
})
return StateResolvingReferencesOnBuilder, nil
}
func (i *WorkItem) ensureReferencesOnBuilder(ctx context.Context) (State, error) {
// i.state == StateEnsuringReferencesOnBuilder
// i.builder non-empty
// i.drv non-empty
for path := range i.drv.InputSrcs {
if !i.coord.cfg.ResolveDependenciesOnBuilders {
i.builderNeededReferences = append(i.builderNeededReferences, path)
continue
}
if err := i.builderDaemon.EnsurePath(i.at, path); err != nil {
var pnv nixstore.PathNotValidError
var ne nixstore.NixError
if errors.As(err, &pnv) || errors.As(err, &ne) {
i.builderNeededReferences = append(i.builderNeededReferences, path)
} else {
return StateFailed, fmt.Errorf("path %v: %w", path, err)
}
}
}
return StateCopyingReferencesToBuilder, nil
}
func (i *WorkItem) copyReferencesToBuilder(ctx context.Context) (State, error) {
// i.state == StateCopyingReferencesToBuilder
// i.builder, i.builderPool non-empty
if len(i.builderNeededReferences) == 0 {
return StateBuildingDerivation, nil
}
i.builder.Pool.Put(i.builderDaemon)
i.builderDaemon = nil
var wis []*WorkItem
for _, p := range i.builderNeededReferences {
// Must be in same order in wis/i.childWork as i.builderNeededReferences!
wis = append(wis, i.coord.AddWork(ctx, p))
}
i.childWork = wis
if err := i.awaitChildWorkItems(ctx); err != nil {
return StateFailed, err
}
var err error
i.builderDaemon, err = i.builder.Pool.Get()
if err != nil {
return StateFailed, fmt.Errorf("reacquiring builder connection: %w", err)
}
// Recheck which things we need to fetch; maybe we built or resolved some of them on the box.
validPaths, err := i.builderDaemon.ValidPaths(i.builderNeededReferences)
if err != nil {
return StateFailed, fmt.Errorf("checking valid paths on builder: %w", err)
}
validOnBuilder := make(map[string]bool)
for _, vp := range validPaths {
validOnBuilder[vp] = true
}
for n, wi := range i.childWork {
path := i.builderNeededReferences[n]
if validOnBuilder[path] {
continue
}
var f Fetcher
var ni *narinfo.NarInfo
switch wi.state {
case StateDoneAlreadyExisted, StateDoneResolvedOnTarget:
// copy from Target
f = i.coord.cfg.Target
case StateDoneFromResolver:
// copy from Resolver (wi.resolver)
f = wi.resolver
// We should already have the NAR info, so skip a step.
for _, wini := range wi.narInfos {
if wini.StorePath == path {
ni = wini
}
}
case StateDoneBuilt:
// copy from Builder (wi.builderPool) or Target
if wi.builder == i.builder {
// Nothing to do, we built the dep on the same builder.
continue
}
if i.coord.cfg.Target != nil {
f = i.coord.cfg.Target
} else {
f = PeerFetcher{wi.builder.Pool}
}
default:
return StateFailed, fmt.Errorf("don't know where the target of %s is (for path %s)", wi.state, wi.paths())
}
if ni == nil {
var err error
ni, err = f.FetchNarInfo(ctx, path)
if err != nil {
return StateFailed, fmt.Errorf("looking up dependency NAR info for %v: %w", path, err)
}
}
rc, err := f.FetchNar(ctx, ni)
if err != nil {
return StateFailed, fmt.Errorf("fetching NAR for dependency %v: %w", wi.paths(), err)
}
if err := i.builderDaemon.AddToStoreNar(ni, rc); err != nil {
return StateFailed, fmt.Errorf("copying NAR to builder for dependency %v: %w", wi.paths(), err)
}
rc.Close()
}
return StateBuildingDerivation, nil
}
func (i *WorkItem) build(ctx context.Context) (State, error) {
// i.state == StateBuildingDerivation
// i.builder non-empty
brs, err := i.builderDaemon.BuildDerivation(i.at, i.drvPath, i.drv, nixstore.BMNormal)
if err != nil {
return StateFailed, err
}
if !brs.IsSuccess() {
return StateFailed, fmt.Errorf("build failed: %v", brs)
}
return StateCopying, nil
}
func (i *WorkItem) copyFromBuilder(ctx context.Context) (State, error) {
// i.state == StateCopying
// i.builder non-empty
if i.coord.cfg.Target == nil {
return StateDoneBuilt, nil
}
// Check if these paths are already valid on target.
validPaths, err := i.coord.cfg.Target.ValidPaths(ctx, i.paths())
if err != nil {
return StateFailed, fmt.Errorf("checking valid paths on target: %w", err)
}
validPathsSet := make(map[string]bool)
for _, p := range validPaths {
validPathsSet[p] = true
}
for _, path := range i.paths() {
if validPathsSet[path] {
continue
}
ni, err := i.builderDaemon.NARInfo(path)
if err != nil {
return StateFailed, fmt.Errorf("fetching narinfo for %v: %w", path, err)
}
rc, err := i.builderDaemon.NARFromPath(path)
if err != nil {
return StateFailed, fmt.Errorf("fetching nar for %v: %w", path, err)
}
if err := i.coord.cfg.Target.PutNar(ctx, ni, rc); err != nil {
return StateFailed, fmt.Errorf("putting nar for %v to target: %w", path, err)
}
}
return StateDoneBuilt, nil
}
func (i *WorkItem) run(ctx context.Context) error {
defer close(i.doneCh)
defer func() {
for _, cleanup := range i.cleanup {
cleanup()
}
}()
i.state = StateCheckingOnTarget
for !i.state.Terminal() {
if ctx.Err() != nil {
return ctx.Err()
}
log.Printf("%s: running worker for %s", i.path, i.state)
var nextstate State
var err error
switch i.state {
case StateCheckingOnTarget:
nextstate, err = i.checkOnTarget(ctx)
case StateResolvingOnTarget:
nextstate, err = i.resolveOnTarget(ctx)
case StateCheckingResolvers:
nextstate, err = i.checkResolvers(ctx)
case StateFetchingReferencesFromResolvers:
nextstate, err = i.fetchReferencesFromResolvers(ctx)
case StateFetchingFromResolver:
nextstate, err = i.fetchFromResolver(ctx)
case StateLookingUpDerivation:
nextstate, err = i.lookUpDerivation(ctx)
case StateChoosingBuilder:
nextstate, err = i.chooseBuilder(ctx)
case StateResolvingReferencesOnBuilder:
nextstate, err = i.ensureReferencesOnBuilder(ctx)
case StateCopyingReferencesToBuilder:
nextstate, err = i.copyReferencesToBuilder(ctx)
case StateBuildingDerivation:
nextstate, err = i.build(ctx)
case StateCopying:
nextstate, err = i.copyFromBuilder(ctx)
default:
log.Printf("%s: ended up in unimplemented state %s", i.path, i.state)
<-make(chan struct{})
}
prevstate := i.state
i.state = nextstate
if err != nil {
log.Printf("%s: transitioning to %s: %s", i.path, nextstate, err)
i.err = fmt.Errorf("%s: %w", prevstate, err)
return i.err
}
log.Printf("%s: transitioning to %s", i.path, nextstate)
}
return nil
}
func (wi *WorkItem) Done() <-chan struct{} { return wi.doneCh }

View file

@ -11,5 +11,6 @@ depot.third_party.buildGo.package {
./localfs.go
];
deps = with depot; [
go.nix.nixhash
];
}

View file

@ -2,12 +2,13 @@ package nixdrv
import (
"bufio"
"context"
"os"
)
type LocalFSResolver struct{}
func (LocalFSResolver) LoadDerivation(path string) (*Derivation, error) {
func (LocalFSResolver) LoadDerivation(ctx context.Context, path string) (*Derivation, error) {
f, err := os.Open(path)
if err != nil {
return nil, err

View file

@ -8,8 +8,13 @@ package nixdrv
import (
"bufio"
"bytes"
"context"
"fmt"
"path"
"sort"
"strings"
"hg.lukegb.com/lukegb/depot/go/nix/nixhash"
)
type Output struct {
@ -30,8 +35,6 @@ type BasicDerivation struct {
Builder string
Args []string
Env map[string]string
InputDerivations []InputDerivation
}
type Derivation struct {
@ -69,13 +72,13 @@ func (drv *Derivation) Clone() *Derivation {
}
type Resolver interface {
LoadDerivation(path string) (*Derivation, error)
LoadDerivation(ctx context.Context, path string) (*Derivation, error)
}
func (drv *Derivation) ToBasicDerivation(r Resolver) (*BasicDerivation, error) {
func (drv *Derivation) ToBasicDerivation(ctx context.Context, r Resolver) (*BasicDerivation, error) {
o := drv.BasicDerivation.Clone()
for _, inp := range drv.InputDerivations {
inpDrv, err := r.LoadDerivation(inp.Path)
inpDrv, err := r.LoadDerivation(ctx, inp.Path)
if err != nil {
return nil, fmt.Errorf("resolving %v: %w", inp.Path, err)
}
@ -92,6 +95,21 @@ func (drv *Derivation) ToBasicDerivation(r Resolver) (*BasicDerivation, error) {
return o, nil
}
func (drv *Derivation) StoreHash(name, storeRoot string) string {
var references []string
for ref := range drv.InputSrcs {
references = append(references, ref)
}
for _, ref := range drv.InputDerivations {
references = append(references, ref.Path)
}
return nixhash.StorePathForText(name+".drv", drv.String(), storeRoot, references)
}
func (drv *Derivation) StorePath(name, storeRoot string) string {
return path.Join(storeRoot, fmt.Sprintf("%s-%s.drv", drv.StoreHash(name, storeRoot), name))
}
func expect(f *bufio.Reader, s string) error {
buf := make([]byte, len(s))
_, err := f.Read(buf)
@ -194,6 +212,103 @@ loop:
return s.String(), nil
}
func writeTuple(sb *strings.Builder, ss ...string) {
sb.WriteByte('(')
if len(ss) >= 1 {
writeString(sb, ss[0])
}
for _, s := range ss[1:] {
sb.WriteByte(',')
writeString(sb, s)
}
sb.WriteByte(')')
}
func writeList[T any](sb *strings.Builder, elements []T, writeElement func(*strings.Builder, T)) {
sb.WriteByte('[')
if len(elements) >= 1 {
writeElement(sb, elements[0])
}
for _, e := range elements[1:] {
sb.WriteByte(',')
writeElement(sb, e)
}
sb.WriteByte(']')
}
var (
escaperReplacer = strings.NewReplacer("\"", "\\\"",
"\\", "\\\\",
"\n", "\\n",
"\r", "\\r",
"\t", "\\t")
)
func escapeString(s string) string {
return "\"" + escaperReplacer.Replace(s) + "\""
}
func writeUnescapedString(sb *strings.Builder, s string) {
sb.WriteString(escapeString(s))
}
func writeString(sb *strings.Builder, s string) {
sb.WriteString(s)
}
func (d *Derivation) String() string {
var sb strings.Builder
sb.WriteString("Derive(")
var outputKeys []string
for key := range d.Outputs {
outputKeys = append(outputKeys, key)
}
sort.Strings(outputKeys)
writeList(&sb, outputKeys, func(sb *strings.Builder, k string) {
v := d.Outputs[k]
writeTuple(sb, escapeString(k), escapeString(v.Path), escapeString(v.HashAlgorithm), escapeString(v.Hash))
})
sb.WriteByte(',')
writeList(&sb, d.InputDerivations, func(sb *strings.Builder, drv InputDerivation) {
var outputsBuilder strings.Builder
writeList(&outputsBuilder, drv.Outputs, writeUnescapedString)
writeTuple(sb, escapeString(drv.Path), outputsBuilder.String())
})
sb.WriteByte(',')
var inputSrcs []string
for is := range d.InputSrcs {
inputSrcs = append(inputSrcs, is)
}
sort.Strings(inputSrcs)
writeList(&sb, inputSrcs, writeUnescapedString)
sb.WriteByte(',')
writeUnescapedString(&sb, d.Platform)
sb.WriteByte(',')
writeUnescapedString(&sb, d.Builder)
sb.WriteByte(',')
writeList(&sb, d.Args, writeUnescapedString)
sb.WriteByte(',')
var env []string
for k := range d.Env {
env = append(env, k)
}
sort.Strings(env)
writeList(&sb, env, func(sb *strings.Builder, k string) {
v := d.Env[k]
writeTuple(sb, escapeString(k), escapeString(v))
})
sb.WriteByte(')') // Derive(
return sb.String()
}
func Load(f *bufio.Reader) (*Derivation, error) {
drv := &Derivation{}
@ -278,6 +393,9 @@ func Load(f *bufio.Reader) (*Derivation, error) {
}
return id, nil
})
if err != nil {
return nil, err
}
if err := expectChar(f, ','); err != nil {
return nil, fmt.Errorf(", after input derivations list: %w", err)
}
@ -342,6 +460,9 @@ func Load(f *bufio.Reader) (*Derivation, error) {
}
return kv, nil
})
if err != nil {
return nil, err
}
drv.Env = make(map[string]string)
for _, kv := range envList {
drv.Env[kv.key] = kv.value

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.package {
name = "nixhash";
path = "hg.lukegb.com/lukegb/depot/go/nix/nixhash";
srcs = [
./nixhash.go
];
deps = with depot; [
third_party.gopkgs."github.com".numtide.go-nix.nixbase32
];
}

32
go/nix/nixhash/nixhash.go Normal file
View file

@ -0,0 +1,32 @@
package nixhash
import (
"crypto/sha256"
"encoding/hex"
"sort"
"strings"
"github.com/numtide/go-nix/nixbase32"
)
func StorePathForText(name, s, storePath string, references []string) string {
strHash := sha256.Sum256([]byte(s))
sort.Strings(references)
var typeStrBuilder strings.Builder
typeStrBuilder.WriteString("text")
for _, ref := range references {
typeStrBuilder.WriteString(":")
typeStrBuilder.WriteString(ref)
}
typeStr := typeStrBuilder.String()
sStr := typeStr + ":sha256:" + hex.EncodeToString(strHash[:]) + ":" + storePath + ":" + name
sHash := sha256.Sum256([]byte(sStr))
sCompressedHash := make([]byte, 20)
for i := range sHash {
sCompressedHash[i%len(sCompressedHash)] ^= sHash[i]
}
return nixbase32.EncodeToString(sCompressedHash[:])
}

View file

@ -0,0 +1,19 @@
# SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, ... }:
depot.third_party.buildGo.package {
name = "nixpool";
path = "hg.lukegb.com/lukegb/depot/go/nix/nixpool";
srcs = [
./dialer.go
./nixpool.go
];
deps = with depot; [
go.nix.nixdrv
go.nix.nixstore
third_party.gopkgs."golang.org".x.crypto.ssh
];
}

134
go/nix/nixpool/dialer.go Normal file
View file

@ -0,0 +1,134 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixpool
import (
"context"
"fmt"
"io"
"net/url"
"os"
"golang.org/x/crypto/ssh"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
// DaemonFactory is the shape of a factory function.
type DaemonFactory func() (*nixstore.Daemon, error)
// DaemonDialer creates a factory function from the provided remote.
func DaemonDialer(ctx context.Context, remote string) (DaemonFactory, error) {
u, err := url.Parse(remote)
if err != nil {
return nil, fmt.Errorf("parsing remote %q as URL: %w", remote, err)
}
switch u.Scheme {
case "unix":
if u.Path == "" {
u.Path = nixstore.DaemonSock
}
return func() (*nixstore.Daemon, error) {
d, err := nixstore.OpenDaemon(u.Path)
if err != nil {
return nil, err
}
return d, nil
}, nil
case "ssh-ng":
// Construct a ClientConfig from the URL.
cfg := &ssh.ClientConfig{}
cfg.Ciphers = []string{"chacha20-poly1305@openssh.com"}
if u.Query().Has("privkey") {
var keys []ssh.Signer
for _, privkeyPath := range u.Query()["privkey"] {
privkeyF, err := os.Open(privkeyPath)
if err != nil {
return nil, fmt.Errorf("opening privkey %q: %w", privkeyPath, err)
}
defer privkeyF.Close()
privkeyB, err := io.ReadAll(privkeyF)
if err != nil {
return nil, fmt.Errorf("reading privkey %q: %w", privkeyPath, err)
}
privkey, err := ssh.ParsePrivateKey(privkeyB)
if err != nil {
return nil, fmt.Errorf("parsing privkey %q: %w", privkeyPath, err)
}
keys = append(keys, privkey)
}
cfg.Auth = append(cfg.Auth, ssh.PublicKeys(keys...))
}
if u.User != nil {
cfg.User = u.User.Username()
if pw, ok := u.User.Password(); ok {
cfg.Auth = append(cfg.Auth, ssh.Password(pw))
}
}
switch {
case u.Query().Has("host-key"):
hkStr := u.Query().Get("host-key")
_, _, hk, _, _, err := ssh.ParseKnownHosts(append([]byte("x "), []byte(hkStr)...))
if err != nil {
return nil, fmt.Errorf("parsing host-key %q: %w", hkStr, err)
}
cfg.HostKeyCallback = ssh.FixedHostKey(hk)
case u.Query().Has("insecure-allow-any-ssh-host-key"):
cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
default:
return nil, fmt.Errorf("some SSH host key configuration is required (?host-key=; ?insecure-allow-any-ssh-host-key)")
}
// Work out other misc parameters.
// ...remote command.
remoteCmd := "nix-daemon --stdio"
if u.Query().Has("remote-cmd") {
remoteCmd = u.Query().Get("remote-cmd")
}
// Work out the host:port to connect to.
remote := u.Hostname()
if portStr := u.Port(); portStr != "" {
remote = remote + ":" + portStr
} else {
remote = remote + ":22"
}
return func() (*nixstore.Daemon, error) {
conn, err := ssh.Dial("tcp", remote, cfg)
if err != nil {
return nil, fmt.Errorf("dialing %v via SSH: %w", remote, err)
}
sess, err := conn.NewSession()
if err != nil {
conn.Close()
return nil, fmt.Errorf("opening SSH session to %v: %w", remote, err)
}
stdin, err := sess.StdinPipe()
if err != nil {
conn.Close()
return nil, fmt.Errorf("opening stdin pipe: %w", err)
}
stdout, err := sess.StdoutPipe()
if err != nil {
conn.Close()
return nil, fmt.Errorf("opening stdout pipe: %w", err)
}
if err := sess.Start(remoteCmd); err != nil {
conn.Close()
return nil, fmt.Errorf("starting %q: %w", remoteCmd, err)
}
d, err := nixstore.OpenDaemonWithIOs(stdout, stdin, conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("establishing connection to daemon: %w", err)
}
return d, nil
}, nil
default:
return nil, fmt.Errorf("unknown remote %q", remote)
}
}

119
go/nix/nixpool/nixpool.go Normal file
View file

@ -0,0 +1,119 @@
// SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
//
// SPDX-License-Identifier: Apache-2.0
package nixpool
import (
"context"
"log"
"math/rand"
"runtime"
"sync"
"time"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
type Pool struct {
factory DaemonFactory
cond *sync.Cond
limit int
generated int
available []*nixstore.Daemon
}
func New(ctx context.Context, factory DaemonFactory, limit int) *Pool {
return &Pool{
factory: func() (*nixstore.Daemon, error) {
var d *nixstore.Daemon
var err error
retryInterval := 10 * time.Millisecond
maxRetry := 10 * time.Second
for n := 0; n < 20; n++ {
d, err = factory()
if err == nil {
break
}
log.Printf("failed to connect: %v", err)
time.Sleep(retryInterval + time.Duration((rand.Float64()-0.5)*float64(retryInterval)))
retryInterval = retryInterval * 2
if retryInterval > maxRetry {
retryInterval = maxRetry
}
}
return d, err
},
cond: sync.NewCond(new(sync.Mutex)),
limit: limit,
}
}
func (p *Pool) getFromAvailableLocked() *nixstore.Daemon {
end := len(p.available) - 1
d := p.available[end]
p.available = p.available[:end]
return d
}
func (p *Pool) log(ln string) {
if false {
_, file, line, _ := runtime.Caller(2)
log.Printf("%p (%s:%d): %s", p, file, line, ln)
}
}
func (p *Pool) Get() (*nixstore.Daemon, error) {
p.cond.L.Lock()
for {
if len(p.available) > 0 {
d := p.getFromAvailableLocked()
p.log("pool.Get: from available")
p.cond.L.Unlock()
return d, nil
}
if p.generated < p.limit {
p.generated++
p.cond.L.Unlock()
d, err := p.factory()
if err != nil {
p.cond.L.Lock()
p.generated--
p.cond.L.Unlock()
return nil, err
}
p.log("pool.Get: generated new")
return d, nil
}
p.cond.Wait()
}
}
func (p *Pool) Put(d *nixstore.Daemon) {
p.cond.L.Lock()
if d.Err() != nil {
d.Close()
p.log("pool.Put: broken")
p.generated--
} else {
p.log("pool.Put: returned to pool")
p.available = append(p.available, d)
}
p.cond.Signal()
p.cond.L.Unlock()
}
func (p *Pool) Busyness() float64 {
p.cond.L.Lock()
defer p.cond.L.Unlock()
currentlyAvailable := len(p.available)
currentlyCheckedOut := p.generated - currentlyAvailable
proportionCheckedOut := float64(currentlyCheckedOut) / float64(p.limit)
return proportionCheckedOut
}

View file

@ -197,7 +197,7 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) {
llt = LogLinePostBuild
}
if VerboseActivities {
log.Printf("%s:%d> %s", am.ActivityID, fields[0].(string))
log.Printf("%s:%d> %s", am.String, am.ActivityID, fields[0].(string))
}
am.logs = append(am.logs, &ActivityLog{
Timestamp: now,

View file

@ -13,7 +13,9 @@ depot.third_party.buildGo.package {
./sqlitestore.go
];
deps = with depot; [
go.nix.nar
go.nix.nar.narinfo
go.nix.nixdrv
go.nix.nixwire
third_party.gopkgs."github.com".mattn.go-sqlite3
third_party.gopkgs."golang.org".x.crypto.ssh

View file

@ -1,14 +1,19 @@
package nixstore
import (
"bytes"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"net"
"path"
"strings"
"sync"
"hg.lukegb.com/lukegb/depot/go/nix/nar"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixwire"
@ -18,12 +23,18 @@ const (
DaemonSock = "/nix/var/nix/daemon-socket/socket"
WorkerMagic1 = 0x6e697863
WorkerMagic2 = 0x6478696f
ProtocolVersion = 0x115
ProtocolVersion = 0x120 // 0x115
WopAddMultipleToStore = 44
WopAddToStoreNar = 39
WopBuildDerivation = 36
WopEnsurePath = 10
WopIsValidPath = 1
WopNarFromPath = 38
WopQueryPathInfo = 26
WopQuerySubstitutablePathInfo = 21
WopQueryValidDerivers = 33
WopQueryValidPaths = 31
MaxBuf = 1024 * 1024 // 1 MB
)
@ -34,6 +45,8 @@ type Daemon struct {
r *nixwire.Deserializer
mu sync.Mutex
err error
peerMinorVersion int
}
type PathNotValidError struct{ Path string }
@ -42,23 +55,31 @@ func (err PathNotValidError) Error() string {
return fmt.Sprintf("path %q is not valid", err.Path)
}
func (d *Daemon) Err() error {
return d.err
}
func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.w.WriteUint64(WopQueryPathInfo); err != nil {
if _, err := d.writeOp(WopQueryPathInfo); err != nil {
d.err = err
return nil, fmt.Errorf("writing worker op WopQueryPathInfo: %w", err)
}
if _, err := d.w.WriteString(storePath); err != nil {
d.err = err
return nil, fmt.Errorf("writing store path query %v: %w", storePath, err)
}
if err := d.processStderr(nil, nil, nil); err != nil {
d.err = err
return nil, fmt.Errorf("reading stderr from WopQueryPathInfo: %w", err)
}
validInt, err := d.r.ReadUint64()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading path validity: %w", err)
}
valid := validInt == uint64(1)
@ -70,6 +91,7 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
StorePath: storePath,
}
if ni.Deriver, err = d.r.ReadString(); err != nil {
d.err = err
return nil, fmt.Errorf("reading deriver: %w", err)
}
ni.Deriver = path.Base(ni.Deriver)
@ -79,14 +101,17 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
hashStr, err := d.r.ReadString()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading NAR hash: %w", err)
}
if ni.NarHash, err = narinfo.HashFromString("sha256:" + hashStr); err != nil {
d.err = err
return nil, fmt.Errorf("parsing NAR hash %q: %w", hashStr, err)
}
refs, err := d.r.ReadStrings()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading referrers: %w", err)
}
for n, ref := range refs {
@ -95,19 +120,25 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
ni.References = refs
if _, err := d.r.ReadUint64(); err != nil {
d.err = err
return nil, fmt.Errorf("reading registration time: %w", err)
}
if ni.NarSize, err = d.r.ReadUint64(); err != nil {
d.err = err
return nil, fmt.Errorf("reading narsize: %w", err)
}
if _, err := d.r.ReadUint64(); err != nil {
d.err = err
return nil, fmt.Errorf("reading ultimate: %w", err)
}
sigs, err := d.r.ReadStrings()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading sigs: %w", err)
}
if _, err := d.r.ReadUint64(); err != nil {
ni.CA, err = d.r.ReadString()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading CA: %w", err)
}
@ -124,6 +155,71 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
return ni, nil
}
func (d *Daemon) NARFromPath(storePath string) (io.ReadCloser, error) {
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.writeOp(WopNarFromPath); err != nil {
d.err = err
return nil, fmt.Errorf("writing worker op WopNarFromPath: %w", err)
}
if _, err := d.w.WriteString(storePath); err != nil {
d.err = err
return nil, fmt.Errorf("writing store path query %v: %w", storePath, err)
}
if err := d.processStderr(nil, nil, nil); err != nil {
return nil, fmt.Errorf("waiting for NAR packing: %w", err)
}
pr, pw := io.Pipe()
go func() {
err := nar.Unpack(io.TeeReader(d.r.Reader, pw), nar.NullFS{}, "irrelevant")
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()
return io.NopCloser(pr), nil
}
func (d *Daemon) writeOp(op uint64) (int64, error) {
return d.w.WriteUint64(op)
}
func (d *Daemon) ValidPaths(storePaths []string) ([]string, error) {
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.writeOp(WopQueryValidPaths); err != nil {
d.err = err
return nil, fmt.Errorf("writing worker op WopQueryValidPaths: %w", err)
}
if _, err := d.w.WriteStrings(storePaths); err != nil {
d.err = err
return nil, fmt.Errorf("writing store path query %v: %w", storePaths, err)
}
if d.peerMinorVersion >= 27 {
// Substitute flag
if _, err := d.w.WriteUint64(0); err != nil {
d.err = err
return nil, fmt.Errorf("writing substitute flag: %w", err)
}
}
if err := d.processStderr(nil, nil, nil); err != nil {
return nil, fmt.Errorf("reading stderr from WopQueryValidPaths: %w", err)
}
validPaths, err := d.r.ReadStrings()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading valid paths: %w", err)
}
return validPaths, nil
}
func (d *Daemon) Close() error {
d.mu.Lock()
defer d.mu.Unlock()
@ -175,27 +271,33 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re
for {
msg, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("read stderr msg code: %w", err)
}
switch msg {
case 0x64617416: // STDERR_WRITE
if stdout == nil {
d.err = err
return fmt.Errorf("STDERR_WRITE requested")
}
bs, err := d.r.ReadBytes()
if err != nil {
d.err = err
return fmt.Errorf("read bytes: %w", err)
}
_, err = stdout.Write(bs)
if err != nil {
d.err = err
return fmt.Errorf("write bytes into stdout: %w", err)
}
case 0x64617461: // STDERR_READ
if stdin == nil {
d.err = err
return fmt.Errorf("STDERR_READ requested")
}
readSizeU64, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_READ reading uint64: %w", err)
}
readSize := int(readSizeU64)
@ -205,52 +307,111 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re
buf := make([]byte, readSize)
readSize, err = stdin.Read(buf)
if err != nil {
d.err = err
return fmt.Errorf("STDERR_READ reading from stdin: %w", err)
}
if _, err := d.w.WriteBytes(buf[:readSize]); err != nil {
d.err = err
return fmt.Errorf("STDERR_READ writing stdin to socket: %w", err)
}
case 0x63787470: // STDERR_ERROR
if d.peerMinorVersion >= 26 {
errStr, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading 'Error' string: %w", err)
}
errLevel, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading error level: %w", err)
}
errName, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading error name: %w", err)
}
errMsg, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading error message: %w", err)
}
errPos, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading error position: %w", err)
}
log.Printf("error? Error=%s Level=%d Name=%s Msg=%s Pos=%d", errStr, errLevel, errName, errMsg, errPos)
errTraceNum, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading trace count: %w", err)
}
for n := 0; n < int(errTraceNum); n++ {
tracePos, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading trace position: %w", err)
}
traceHint, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading trace hint: %w", err)
}
log.Printf("error? trace %d/%d: Pos=%d Hint=%s", n+1, errTraceNum, tracePos, traceHint)
}
return fmt.Errorf("nix Error=%s Level=%d Name=%s Msg=%s Pos=%d", errStr, errLevel, errName, errMsg, errPos)
} else {
errStr, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading error string: %w", err)
}
status, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_ERROR reading uint64: %w", err)
}
al.AddError(status, errStr)
return NixError{status, errStr}
}
case 0x6f6c6d67: // STDERR_NEXT
msg, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_NEXT reading log string: %w", err)
}
al.AddLog(msg)
case 0x53545254: // STDERR_START_ACTIVITY
activity, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_START_ACTIVITY reading ActivityId: %w", err)
}
lvl, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_START_ACTIVITY reading level: %w", err)
}
actTypeU64, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_START_ACTIVITY reading activity type: %w", err)
}
actType := ActivityType(actTypeU64)
s, err := d.r.ReadString()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_START_ACTIVITY reading s: %w", err)
}
fields, err := d.readFields()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_START_ACTIVITY fields: %w", err)
}
parentActivity, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_START_ACTIVITY reading parent activity: %w", err)
}
al.StartActivity(actType, ActivityMeta{
@ -263,21 +424,25 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re
case 0x53544f50: // STDERR_STOP_ACTIVITY
activity, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_STOP_ACTIVITY reading ActivityId: %w", err)
}
al.EndActivity(al.Activity(activity))
case 0x52534c54: // STDERR_RESULT
activity, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_RESULT reading ActivityId: %w", err)
}
resultTypU64, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_RESULT reading result: %w", err)
}
resultTyp := ResultType(resultTypU64)
fields, err := d.readFields()
if err != nil {
d.err = err
return fmt.Errorf("STDERR_RESULT fields: %w", err)
}
al.ActivityResult(al.Activity(activity), resultTyp, fields)
@ -290,44 +455,83 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re
func (d *Daemon) hello() error {
_, err := d.w.WriteUint64(WorkerMagic1)
if err != nil {
d.err = err
return fmt.Errorf("writing magic 1: %w", err)
}
magic2, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("reading magic 2: %w", err)
}
if magic2 != WorkerMagic2 {
d.err = err
return fmt.Errorf("magic 2 mismatch: got %x, wanted %x", magic2, WorkerMagic2)
}
daemonVersion, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("reading daemon version: %w", err)
}
majorVersion := int((daemonVersion >> 8) & 0xff)
minorVersion := int(daemonVersion & 0xff)
if majorVersion != 1 {
d.err = err
return fmt.Errorf("daemon major version mismatch: got %d, want 1", majorVersion)
}
if minorVersion < 21 {
d.err = err
return fmt.Errorf("daemon minor version too old: got %d, want at least 21", minorVersion)
}
d.peerMinorVersion = minorVersion
if _, err := d.w.WriteUint64(ProtocolVersion); err != nil {
d.err = err
return fmt.Errorf("writing protocol version: %w", err)
}
if _, err := d.w.WriteUint64(0); err != nil {
d.err = err
return fmt.Errorf("writing obsolete CPU affinity: %w", err)
}
if _, err := d.w.WriteUint64(0); err != nil {
d.err = err
return fmt.Errorf("writing obsolete reserveSpace: %w", err)
}
return d.processStderr(nil, nil, nil)
}
func (d *Daemon) IsValidPath(at *ActivityTracker, storePath string) (bool, error) {
al := at.StartAction(fmt.Sprintf("IsValidPath(%q)", storePath))
defer al.Close()
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.writeOp(WopIsValidPath); err != nil {
d.err = err
return false, fmt.Errorf("writing worker op WopIsValidPath: %w", err)
}
if _, err := d.w.WriteString(storePath); err != nil {
d.err = err
return false, fmt.Errorf("writing store path %v: %w", storePath, err)
}
if err := d.processStderr(al, nil, nil); err != nil {
return false, fmt.Errorf("reading stderr from WopIsValidPath: %w", err)
}
validInt, err := d.r.ReadUint64()
if err != nil {
d.err = err
return false, fmt.Errorf("reading path validity: %w", err)
}
return validInt != 0, nil
}
func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error {
al := at.StartAction(fmt.Sprintf("EnsurePath(%q)", storePath))
defer al.Close()
@ -335,10 +539,12 @@ func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error {
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.w.WriteUint64(WopEnsurePath); err != nil {
if _, err := d.writeOp(WopEnsurePath); err != nil {
d.err = err
return fmt.Errorf("writing worker op WopEnsurePath: %w", err)
}
if _, err := d.w.WriteString(storePath); err != nil {
d.err = err
return fmt.Errorf("writing store path %v: %w", storePath, err)
}
@ -348,10 +554,11 @@ func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error {
validInt, err := d.r.ReadUint64()
if err != nil {
d.err = err
return fmt.Errorf("reading path validity: %w", err)
}
if validInt != 1 {
return fmt.Errorf("path validity was %d; wanted 1", validInt)
return PathNotValidError{storePath}
}
return nil
}
@ -422,16 +629,20 @@ func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, der
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.w.WriteUint64(WopBuildDerivation); err != nil {
if _, err := d.writeOp(WopBuildDerivation); err != nil {
d.err = err
return BRSMiscFailure, fmt.Errorf("writing worker op WopBuildDerivation: %w", err)
}
if _, err := d.w.WriteString(derivationPath); err != nil {
d.err = err
return BRSMiscFailure, fmt.Errorf("writing derivation store path %v: %w", derivationPath, err)
}
if _, err := d.w.WriteDerivation(derivation); err != nil {
d.err = err
return BRSMiscFailure, fmt.Errorf("writing derivation content of %v: %w", derivationPath, err)
}
if _, err := d.w.WriteUint64(uint64(buildMode)); err != nil {
d.err = err
return BRSMiscFailure, fmt.Errorf("writing build mode %v: %w", buildMode, err)
}
@ -441,11 +652,13 @@ func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, der
buildStatusU64, err := d.r.ReadUint64()
if err != nil {
d.err = err
return BRSMiscFailure, fmt.Errorf("reading build status code: %w", err)
}
buildStatus := BuildResultStatus(buildStatusU64)
buildErrorMsg, err := d.r.ReadString()
if err != nil {
d.err = err
return BRSMiscFailure, fmt.Errorf("reading build error message: %w", err)
}
@ -456,23 +669,224 @@ func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, der
return buildStatus, nil
}
func (d *Daemon) writeNarInfo(ni *narinfo.NarInfo, w *nixwire.Serializer) error {
storeRoot := path.Dir(ni.StorePath)
if _, err := w.WriteString(ni.StorePath); err != nil {
d.err = err
return fmt.Errorf("writing store path: %w", err)
}
var deriverPath string
if ni.Deriver != "" {
deriverPath = path.Join(storeRoot, ni.Deriver)
}
if _, err := w.WriteString(deriverPath); err != nil {
d.err = err
return fmt.Errorf("writing deriver: %w", err)
}
if _, err := w.WriteString(hex.EncodeToString(ni.NarHash.Hash)); err != nil {
d.err = err
return fmt.Errorf("writing NAR hash: %w", err)
}
rootedRefs := make([]string, len(ni.References))
for n, ref := range ni.References {
rootedRefs[n] = path.Join(storeRoot, ref)
}
if _, err := w.WriteStrings(rootedRefs); err != nil {
d.err = err
return fmt.Errorf("writing references: %w", err)
}
if _, err := w.WriteUint64(0); err != nil {
d.err = err
return fmt.Errorf("writing dummy registration time: %w", err)
}
if _, err := w.WriteUint64(ni.NarSize); err != nil {
d.err = err
return fmt.Errorf("writing NAR size: %w", err)
}
if _, err := w.WriteUint64(1); err != nil {
d.err = err
return fmt.Errorf("writing ultimate (true): %w", err)
}
if _, err := w.WriteStrings(ni.Sigs()); err != nil {
d.err = err
return fmt.Errorf("writing signatures: %w", err)
}
if _, err := w.WriteString(ni.CA); err != nil {
d.err = err
return fmt.Errorf("writing content-addressable: %w", err)
}
return nil
}
type bufferingReader struct {
r io.Reader
}
func (r *bufferingReader) Read(p []byte) (int, error) {
pos := 0
for {
n, err := r.r.Read(p[pos:])
pos += n
if err != nil {
if pos > 0 && errors.Is(err, io.EOF) {
return pos, nil
}
return pos, err
} else if pos >= len(p) {
break
}
}
return pos, nil
}
func (d *Daemon) AddToStoreNar(ni *narinfo.NarInfo, r io.Reader) error {
d.mu.Lock()
defer d.mu.Unlock()
var preBuffer []byte
if d.peerMinorVersion >= 32 {
if _, err := d.writeOp(WopAddMultipleToStore); err != nil {
d.err = err
return fmt.Errorf("writing worker op WopAddMultipleToStore: %w", err)
}
if _, err := d.w.WriteUint64(0); err != nil {
d.err = err
return fmt.Errorf("writing repair (false): %w", err)
}
if _, err := d.w.WriteUint64(1); err != nil {
d.err = err
return fmt.Errorf("writing dont-check-sigs (true): %w", err)
}
buf := new(bytes.Buffer)
bufs := &nixwire.Serializer{Writer: buf}
if _, err := bufs.WriteUint64(1); err != nil {
d.err = err
return fmt.Errorf("writing count (1): %w", err)
}
if err := d.writeNarInfo(ni, bufs); err != nil {
return fmt.Errorf("buffering NAR info: %w", err)
}
preBuffer = buf.Bytes()
} else {
if _, err := d.writeOp(WopAddToStoreNar); err != nil {
d.err = err
return fmt.Errorf("writing worker op WopAddToStoreNar: %w", err)
}
if err := d.writeNarInfo(ni, d.w); err != nil {
return fmt.Errorf("writing NAR info: %w", err)
}
if _, err := d.w.WriteUint64(0); err != nil {
d.err = err
return fmt.Errorf("writing repair (false): %w", err)
}
if _, err := d.w.WriteUint64(1); err != nil {
d.err = err
return fmt.Errorf("writing dont-check-sigs (true): %w", err)
}
}
if d.peerMinorVersion >= 23 {
// FramedSink
errCh := make(chan error)
mr := &bufferingReader{io.MultiReader(bytes.NewReader(preBuffer), r)}
go func() {
defer close(errCh)
buf := make([]byte, MaxBuf)
for {
n, err := mr.Read(buf)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
err = fmt.Errorf("reading: %w", err)
d.err = err
errCh <- err
return
}
if _, err := d.w.WriteBytes(buf[:n]); err != nil {
err = fmt.Errorf("writing payload: %w", err)
d.err = err
errCh <- err
return
}
}
if _, err := d.w.WriteUint64(0); err != nil {
err = fmt.Errorf("sending framed EOF: %w", err)
d.err = err
errCh <- err
return
}
}()
if err := d.processStderr(nil, nil, nil); err != nil {
return fmt.Errorf("reading (framed) stderr from WopAddToStoreNar: %w", err)
}
return <-errCh
} else {
if err := d.processStderr(nil, nil, r); err != nil {
return fmt.Errorf("reading stderr from WopAddToStoreNar: %w", err)
}
}
return nil
}
func (d *Daemon) QueryValidDerivers(path string) ([]string, error) {
d.mu.Lock()
defer d.mu.Unlock()
if _, err := d.writeOp(WopQueryValidDerivers); err != nil {
d.err = err
return nil, fmt.Errorf("writing worker op WopQueryValidDerivers: %w", err)
}
if _, err := d.w.WriteString(path); err != nil {
d.err = err
return nil, fmt.Errorf("writing store path: %w", err)
}
if err := d.processStderr(nil, nil, nil); err != nil {
return nil, fmt.Errorf("reading stderr from WopQueryValidDerivers: %w", err)
}
drvPaths, err := d.r.ReadStrings()
if err != nil {
d.err = err
return nil, fmt.Errorf("reading deriver paths: %w", err)
}
return drvPaths, nil
}
func OpenDaemon(path string) (*Daemon, error) {
conn, err := net.Dial("unix", path)
if err != nil {
return nil, fmt.Errorf("dialing %v: %w", path, err)
}
d := &Daemon{conn: conn, w: &nixwire.Serializer{conn}, r: &nixwire.Deserializer{conn}}
if err := d.hello(); err != nil {
d.Close()
return nil, fmt.Errorf("sending hello to %v: %w", path, err)
}
return OpenDaemonWithIOs(conn, conn, conn)
}
return d, nil
type hexDumpingWriter struct {
w io.Writer
enabled bool
}
func (w *hexDumpingWriter) Write(p []byte) (int, error) {
n, err := w.w.Write(p)
if err != nil {
return 0, err
}
if w.enabled {
log.Printf(">>\n>> %s", strings.TrimSuffix(strings.ReplaceAll(hex.Dump(p), "\n", "\n>> "), "\n>> "))
}
return n, nil
}
func OpenDaemonWithIOs(r io.Reader, w io.Writer, c io.Closer) (*Daemon, error) {
d := &Daemon{conn: c, w: &nixwire.Serializer{w}, r: &nixwire.Deserializer{r}}
d := &Daemon{
conn: c,
w: &nixwire.Serializer{Writer: &hexDumpingWriter{w: w}},
r: &nixwire.Deserializer{Reader: r},
}
if err := d.hello(); err != nil {
d.Close()
return nil, fmt.Errorf("sending hello: %w", err)

View file

@ -8,8 +8,8 @@ depot.third_party.buildGo.external {
src = depot.third_party.nixpkgs.fetchFromGitHub {
owner = "golang";
repo = "crypto";
rev = "7f63de1d35b0f77fa2b9faea3e7deb402a2383c8";
sha256 = "1dr89jfs4dmpr3jqfshqqvfpzzdx4r76nkzhrvmixfrmn6wxrnd1";
rev = "b4ddeeda5bc71549846db71ba23e83ecb26f36ed";
sha256 = "00cg67w0n01a64fc4kqg5j7r47fx5y9vyqlanwb60513dv6lzacs";
};
deps = with depot.third_party; [
gopkgs."golang.org".x.sys.cpu