From 678b7c66edc60a263bbe6c952595472a54226c61 Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Sun, 27 Aug 2023 22:45:46 +0000 Subject: [PATCH] go/nix: some more features for nixbuild --- go/nix/bnixbuild/bnixbuild.go | 29 +++++++++++------ go/nix/nixbuild/config.go | 30 ++++++++++++++++++ go/nix/nixbuild/workitem.go | 58 ++++++++++++++++++++++++++++------ go/nix/nixdrv/nixdrv.go | 12 +++++++ go/nix/nixpool/dialer.go | 2 +- go/nix/nixstore/remotestore.go | 16 +++++++--- 6 files changed, 121 insertions(+), 26 deletions(-) diff --git a/go/nix/bnixbuild/bnixbuild.go b/go/nix/bnixbuild/bnixbuild.go index fc0540e920..37d45371ae 100644 --- a/go/nix/bnixbuild/bnixbuild.go +++ b/go/nix/bnixbuild/bnixbuild.go @@ -22,6 +22,8 @@ type remoteDefinition struct { PermittedResolutions int PermittedBuilds int SupportedPlatforms map[string]bool + SupportedFeatures map[string]bool + RequiredFeatures []map[string]bool } var ( @@ -67,7 +69,8 @@ func bmain() { } defer localConn.Close() - const storePath = "/nix/store/pl5izkcgln1kvy8hv1850888my0b80qs-golib-golang.org_x_crypto_ed25519" + //const storePath = "/nix/store/pl5izkcgln1kvy8hv1850888my0b80qs-golib-golang.org_x_crypto_ed25519" + const storePath = "/nix/store/1mpp8vsq5wl5zd6nzallr4dc5jm342yn-factorio_alpha_x64-1.1.88.tar.xz" ni, err := localConn.NARInfo(storePath) if err != nil { log.Fatalf("localConn.NARInfo(%q): %v", storePath, err) @@ -96,18 +99,24 @@ func main() { remoteDefs := []remoteDefinition{{ URL: "unix://", PermittedResolutions: 64, - PermittedBuilds: 0, - SupportedPlatforms: map[string]bool{"x86_64-linux": true}, + PermittedBuilds: 64, + SupportedPlatforms: map[string]bool{"x86_64-linux": true, "i686-linux": true}, + SupportedFeatures: map[string]bool{"kvm": true}, + RequiredFeatures: []map[string]bool{ + map[string]bool{"kvm": true}, + }, + // }, { + // URL: "ssh-ng://lukegb@whitby.tvl.fyi?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519", + // PermittedResolutions: 64, + // PermittedBuilds: 64, + // SupportedPlatforms: map[string]bool{"x86_64-linux": true}, + // SupportedFeatures: map[string]bool{"big-parallel": true}, }, { - // URL: "ssh-ng://lukegb@whitby.tvl.fyi?insecure-allow-any-ssh-host-key=true&privkey=/home/lukegb/.ssh/id_ed25519", - // PermittedResolutions: 64, - // PermittedBuilds: 64, - // SupportedPlatforms: map[string]bool{"x86_64-linux": true}, - // }, { URL: "ssh-ng://lukegb@eu.nixbuild.net?insecure-allow-any-ssh-host-key=true&privkey=/var/lib/secrets/id_ed25519_nixbuild/secret", PermittedResolutions: 100, PermittedBuilds: 100, - SupportedPlatforms: map[string]bool{"x86_64-linux": true, "aarch64-linux": true}, + SupportedPlatforms: map[string]bool{"x86_64-linux": true, "aarch64-linux": true, "i686-linux": true}, + SupportedFeatures: map[string]bool{"big-parallel": true}, }} var builders []*nixbuild.Builder var resolvers []nixbuild.Resolver @@ -122,7 +131,7 @@ func main() { } if d.PermittedBuilds > 0 { builderPool := nixpool.New(ctx, factory, d.PermittedBuilds) - builders = append(builders, &nixbuild.Builder{Pool: builderPool, SupportedPlatforms: d.SupportedPlatforms}) + builders = append(builders, &nixbuild.Builder{Pool: builderPool, SupportedPlatforms: d.SupportedPlatforms, SupportedFeatures: d.SupportedFeatures, RequiredFeatures: d.RequiredFeatures}) } } target := &nixbuild.PeerTarget{ diff --git a/go/nix/nixbuild/config.go b/go/nix/nixbuild/config.go index 8843087a4c..30b00bc857 100644 --- a/go/nix/nixbuild/config.go +++ b/go/nix/nixbuild/config.go @@ -39,6 +39,36 @@ type Target interface { type Builder struct { Pool *nixpool.Pool SupportedPlatforms map[string]bool + SupportedFeatures map[string]bool + RequiredFeatures []map[string]bool +} + +func (b *Builder) SupportsAll(features map[string]bool) bool { + for f := range features { + if b.SupportedFeatures == nil || !b.SupportedFeatures[f] { + return false + } + } + return true +} + +func (b *Builder) CanBuild(features map[string]bool) bool { + if len(b.RequiredFeatures) == 0 { + return true + } + if features == nil { + return false + } +requiredFeaturesLoop: + for _, rf := range b.RequiredFeatures { + for f := range rf { + if !features[f] { + continue requiredFeaturesLoop + } + } + return true + } + return false } type Config struct { diff --git a/go/nix/nixbuild/workitem.go b/go/nix/nixbuild/workitem.go index 1e8a32d2af..b3e6c61556 100644 --- a/go/nix/nixbuild/workitem.go +++ b/go/nix/nixbuild/workitem.go @@ -142,14 +142,18 @@ func (i *WorkItem) checkResolvers(ctx context.Context) (State, error) { return StateFetchingReferencesFromResolvers, nil } -func (i *WorkItem) pathSet() map[string]bool { +func pathSet(ps []string) map[string]bool { s := make(map[string]bool) - for _, p := range i.paths() { + for _, p := range ps { s[p] = true } return s } +func (i *WorkItem) pathSet() map[string]bool { + return pathSet(i.paths()) +} + func (i *WorkItem) fetchReferencesFromResolvers(ctx context.Context) (State, error) { // i.state == StateFetchingReferencesFromResolvers // i.narInfos populated @@ -292,8 +296,9 @@ func (i *WorkItem) chooseBuilder(ctx context.Context) (State, error) { busyness float64 } var candidates []builderCandidate + rsf := i.drv.RequiredSystemFeatures() for _, b := range i.coord.cfg.Builders { - if !b.SupportedPlatforms[i.drv.Platform] { + if !b.SupportedPlatforms[i.drv.Platform] || !b.SupportsAll(rsf) || !b.CanBuild(rsf) { continue } candidates = append(candidates, builderCandidate{ @@ -302,7 +307,7 @@ func (i *WorkItem) chooseBuilder(ctx context.Context) (State, error) { }) } if len(candidates) == 0 { - return StateFailed, fmt.Errorf("no builders support platform %q", i.drv.Platform) + return StateFailed, fmt.Errorf("no builders support platform %q and features %v", i.drv.Platform, rsf) } sort.Slice(candidates, func(i, j int) bool { return candidates[i].busyness < candidates[j].busyness @@ -384,10 +389,7 @@ func (i *WorkItem) copyReferencesToBuilder(ctx context.Context) (State, error) { if err != nil { return StateFailed, fmt.Errorf("checking valid paths on builder: %w", err) } - validOnBuilder := make(map[string]bool) - for _, vp := range validPaths { - validOnBuilder[vp] = true - } + validOnBuilder := pathSet(validPaths) for n, wi := range i.childWork { path := i.builderNeededReferences[n] @@ -460,6 +462,31 @@ func (i *WorkItem) build(ctx context.Context) (State, error) { return StateCopying, nil } +func topoPaths(paths []string, pathNARs map[string]*narinfo.NarInfo) []string { + pset := pathSet(paths) + + var outPaths []string + donePaths := map[string]bool{} + for len(outPaths) != len(paths) { + pathLoop: + for _, p := range paths { + if donePaths[p] { + continue + } + for _, ref := range pathNARs[p].References { + refX := path.Join(path.Dir(p), ref) + if pset[refX] && p != refX && !donePaths[refX] { + continue pathLoop + } + } + + donePaths[p] = true + outPaths = append(outPaths, p) + } + } + return outPaths +} + func (i *WorkItem) copyFromBuilder(ctx context.Context) (State, error) { // i.state == StateCopying // i.builder non-empty @@ -477,14 +504,25 @@ func (i *WorkItem) copyFromBuilder(ctx context.Context) (State, error) { validPathsSet[p] = true } - for _, path := range i.paths() { - if validPathsSet[path] { + var drvPaths []string + for _, out := range i.drv.Outputs { + if validPathsSet[out.Path] { continue } + drvPaths = append(drvPaths, out.Path) + } + + pathNARs := map[string]*narinfo.NarInfo{} + for _, path := range drvPaths { ni, err := i.builderDaemon.NARInfo(path) if err != nil { return StateFailed, fmt.Errorf("fetching narinfo for %v: %w", path, err) } + pathNARs[path] = ni + } + + for _, path := range topoPaths(drvPaths, pathNARs) { + ni := pathNARs[path] rc, err := i.builderDaemon.NARFromPath(path) if err != nil { return StateFailed, fmt.Errorf("fetching nar for %v: %w", path, err) diff --git a/go/nix/nixdrv/nixdrv.go b/go/nix/nixdrv/nixdrv.go index fc9f13b504..f5a264c9eb 100644 --- a/go/nix/nixdrv/nixdrv.go +++ b/go/nix/nixdrv/nixdrv.go @@ -43,6 +43,18 @@ type Derivation struct { InputDerivations []InputDerivation } +func (drv *BasicDerivation) RequiredSystemFeatures() map[string]bool { + rsfs := drv.Env["requiredSystemFeatures"] + if rsfs == "" { + return nil + } + out := map[string]bool{} + for _, rsf := range strings.Fields(rsfs) { + out[rsf] = true + } + return out +} + func (drv *BasicDerivation) Clone() *BasicDerivation { o := &BasicDerivation{ Outputs: map[string]Output{}, diff --git a/go/nix/nixpool/dialer.go b/go/nix/nixpool/dialer.go index 46b2edd874..5ea522a938 100644 --- a/go/nix/nixpool/dialer.go +++ b/go/nix/nixpool/dialer.go @@ -121,7 +121,7 @@ func DaemonDialer(ctx context.Context, remote string) (DaemonFactory, error) { conn.Close() return nil, fmt.Errorf("starting %q: %w", remoteCmd, err) } - d, err := nixstore.OpenDaemonWithIOs(stdout, stdin, conn) + d, err := nixstore.OpenDaemonWithIOs(u.String(), stdout, stdin, conn) if err != nil { conn.Close() return nil, fmt.Errorf("establishing connection to daemon: %w", err) diff --git a/go/nix/nixstore/remotestore.go b/go/nix/nixstore/remotestore.go index 8b5fcedda3..46e91dde29 100644 --- a/go/nix/nixstore/remotestore.go +++ b/go/nix/nixstore/remotestore.go @@ -40,6 +40,8 @@ const ( ) type Daemon struct { + name string + conn io.Closer w *nixwire.Serializer r *nixwire.Deserializer @@ -792,11 +794,14 @@ func (d *Daemon) AddToStoreNar(ni *narinfo.NarInfo, r io.Reader) error { // FramedSink errCh := make(chan error) mr := &bufferingReader{io.MultiReader(bytes.NewReader(preBuffer), r)} + w := &nixwire.Serializer{Writer: &hexDumpingWriter{enabled: false, w: d.w.Writer}} go func() { defer close(errCh) buf := make([]byte, MaxBuf) + sofar := 0 for { n, err := mr.Read(buf) + sofar += n if errors.Is(err, io.EOF) { break } else if err != nil { @@ -805,14 +810,14 @@ func (d *Daemon) AddToStoreNar(ni *narinfo.NarInfo, r io.Reader) error { errCh <- err return } - if _, err := d.w.WriteBytes(buf[:n]); err != nil { + if _, err := w.WriteBytes(buf[:n]); err != nil { err = fmt.Errorf("writing payload: %w", err) d.err = err errCh <- err return } } - if _, err := d.w.WriteUint64(0); err != nil { + if _, err := w.WriteUint64(0); err != nil { err = fmt.Errorf("sending framed EOF: %w", err) d.err = err errCh <- err @@ -824,7 +829,7 @@ func (d *Daemon) AddToStoreNar(ni *narinfo.NarInfo, r io.Reader) error { } return <-errCh } else { - if err := d.processStderr(nil, nil, r); err != nil { + if err := d.processStderr(nil, nil, &bufferingReader{r}); err != nil { return fmt.Errorf("reading stderr from WopAddToStoreNar: %w", err) } } @@ -862,7 +867,7 @@ func OpenDaemon(path string) (*Daemon, error) { return nil, fmt.Errorf("dialing %v: %w", path, err) } - return OpenDaemonWithIOs(conn, conn, conn) + return OpenDaemonWithIOs("unix://"+path, conn, conn, conn) } type hexDumpingWriter struct { @@ -881,8 +886,9 @@ func (w *hexDumpingWriter) Write(p []byte) (int, error) { return n, nil } -func OpenDaemonWithIOs(r io.Reader, w io.Writer, c io.Closer) (*Daemon, error) { +func OpenDaemonWithIOs(name string, r io.Reader, w io.Writer, c io.Closer) (*Daemon, error) { d := &Daemon{ + name: name, conn: c, w: &nixwire.Serializer{Writer: &hexDumpingWriter{w: w}}, r: &nixwire.Deserializer{Reader: r},