From b11cc9d3c8e2e52629cc59dfcd000bac4b4bd1c6 Mon Sep 17 00:00:00 2001 From: Luke Granger-Brown Date: Thu, 17 Aug 2023 02:51:18 +0000 Subject: [PATCH] go/nix: start working on making it possible to build derivations --- go/nix/bnix/bnix.go | 100 ++++++++ go/nix/bnix/default.nix | 14 ++ go/nix/default.nix | 1 + go/nix/nixdrv/localfs.go | 23 ++ go/nix/nixdrv/nixdrv.go | 355 ++++++++++++++++++++++++++ go/nix/nixstore/activities.go | 438 +++++++++++++++++++++++++++++++++ go/nix/nixstore/default.nix | 1 + go/nix/nixstore/remotestore.go | 200 +++++++++++++-- go/nix/nixwire/nixwire.go | 197 ++++++++++++++- 9 files changed, 1303 insertions(+), 26 deletions(-) create mode 100644 go/nix/bnix/bnix.go create mode 100644 go/nix/bnix/default.nix create mode 100644 go/nix/nixdrv/localfs.go create mode 100644 go/nix/nixdrv/nixdrv.go create mode 100644 go/nix/nixstore/activities.go diff --git a/go/nix/bnix/bnix.go b/go/nix/bnix/bnix.go new file mode 100644 index 0000000000..9c26fa334a --- /dev/null +++ b/go/nix/bnix/bnix.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" + "hg.lukegb.com/lukegb/depot/go/nix/nixstore" +) + +func ensure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error { + return d.EnsurePath(at, path) +} + +var rs nixdrv.LocalFSResolver + +func loadDerivation(ctx context.Context, path string) (*nixdrv.Derivation, error) { + return rs.LoadDerivation(path) +} + +func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error { + drv, err := loadDerivation(ctx, path) + if err != nil { + return fmt.Errorf("loading derivation: %w", err) + } + + basicDrv, err := drv.ToBasicDerivation(rs) + if err != nil { + return fmt.Errorf("resolving: %w", err) + } + + brs, err := d.BuildDerivation(at, path, basicDrv, nixstore.BMNormal) + if err != nil { + return err + } + log.Printf("build result: %v", brs) + return nil +} + +func main() { + flag.Parse() + + badCall := func(f string, xs ...interface{}) { + fmt.Fprintf(os.Stderr, f+"\n", xs...) + flag.Usage() + os.Exit(1) + } + + if flag.NArg() < 1 { + badCall("need a subcommand") + } + + var cmd func(context.Context, *nixstore.Daemon, *nixstore.ActivityTracker) error + switch flag.Arg(0) { + case "ensure": + if flag.NArg() != 2 { + badCall("`ensure` needs a store path") + } + cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error { + return ensure(ctx, d, at, flag.Arg(1)) + } + case "show-derivation": + if flag.NArg() != 2 { + badCall("`show-derivation` needs a derivation") + } + cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error { + drv, err := loadDerivation(ctx, flag.Arg(1)) + if err != nil { + return err + } + fmt.Printf("%#v\n", drv) + return nil + } + case "build-derivation": + if flag.NArg() != 2 { + badCall("`build-derivation` needs a derivation") + } + cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error { + return buildDerivation(ctx, d, at, flag.Arg(1)) + } + default: + badCall("bad subcommand %s", flag.Arg(0)) + } + + at := nixstore.NewActivityTracker() + + d, err := nixstore.OpenDaemon(nixstore.DaemonSock) + if err != nil { + log.Fatalf("OpenDaemon: %v", err) + } + defer d.Close() + + ctx := context.Background() + if err := cmd(ctx, d, at); err != nil { + log.Fatalf("%s: %s", flag.Arg(0), err) + } +} diff --git a/go/nix/bnix/default.nix b/go/nix/bnix/default.nix new file mode 100644 index 0000000000..d9272a05ad --- /dev/null +++ b/go/nix/bnix/default.nix @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2023 Luke Granger-Brown +# +# SPDX-License-Identifier: Apache-2.0 + +{ depot, ... }: +depot.third_party.buildGo.program { + name = "bnix"; + srcs = [ + ./bnix.go + ]; + deps = with depot; [ + go.nix.nixstore + ]; +} diff --git a/go/nix/default.nix b/go/nix/default.nix index 9f7960af21..4ca56894fc 100644 --- a/go/nix/default.nix +++ b/go/nix/default.nix @@ -9,4 +9,5 @@ args: nixwire = import ./nixwire args; bcachegc = import ./bcachegc args; bcacheup = import ./bcacheup args; + bnix = import ./bnix args; } diff --git a/go/nix/nixdrv/localfs.go b/go/nix/nixdrv/localfs.go new file mode 100644 index 0000000000..61a09a3588 --- /dev/null +++ b/go/nix/nixdrv/localfs.go @@ -0,0 +1,23 @@ +package nixdrv + +import ( + "bufio" + "os" +) + +type LocalFSResolver struct{} + +func (LocalFSResolver) LoadDerivation(path string) (*Derivation, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + bio := bufio.NewReader(f) + drv, err := Load(bio) + if err != nil { + return nil, err + } + return drv, nil +} diff --git a/go/nix/nixdrv/nixdrv.go b/go/nix/nixdrv/nixdrv.go new file mode 100644 index 0000000000..277de6a895 --- /dev/null +++ b/go/nix/nixdrv/nixdrv.go @@ -0,0 +1,355 @@ +// SPDX-FileCopyrightText: 2023 Luke Granger-Brown +// +// SPDX-License-Identifier: Apache-2.0 + +// Package nixdrv holds basic data types for describing Nix derivations. +package nixdrv + +import ( + "bufio" + "bytes" + "fmt" + "strings" +) + +type Output struct { + Path string + HashAlgorithm string + Hash string +} + +type InputDerivation struct { + Path string + Outputs []string +} + +type BasicDerivation struct { + Outputs map[string]Output + InputSrcs map[string]bool // PathSet + Platform string + Builder string + Args []string + Env map[string]string + + InputDerivations []InputDerivation +} + +type Derivation struct { + BasicDerivation + + InputDerivations []InputDerivation +} + +func (drv *BasicDerivation) Clone() *BasicDerivation { + o := &BasicDerivation{ + Outputs: map[string]Output{}, + InputSrcs: map[string]bool{}, + Platform: drv.Platform, + Builder: drv.Builder, + Args: drv.Args, + Env: map[string]string{}, + } + for k, v := range drv.Outputs { + o.Outputs[k] = v + } + for k, v := range drv.InputSrcs { + o.InputSrcs[k] = v + } + for k, v := range drv.Env { + o.Env[k] = v + } + return o +} + +func (drv *Derivation) Clone() *Derivation { + return &Derivation{ + BasicDerivation: *drv.BasicDerivation.Clone(), + InputDerivations: drv.InputDerivations, + } +} + +type Resolver interface { + LoadDerivation(path string) (*Derivation, error) +} + +func (drv *Derivation) ToBasicDerivation(r Resolver) (*BasicDerivation, error) { + o := drv.BasicDerivation.Clone() + for _, inp := range drv.InputDerivations { + inpDrv, err := r.LoadDerivation(inp.Path) + if err != nil { + return nil, fmt.Errorf("resolving %v: %w", inp.Path, err) + } + + for _, inpOut := range inp.Outputs { + inpDrvOut, ok := inpDrv.Outputs[inpOut] + if !ok { + return nil, fmt.Errorf("input derivation %v has no output %v", inp.Path, inpOut) + } + + o.InputSrcs[inpDrvOut.Path] = true + } + } + return o, nil +} + +func expect(f *bufio.Reader, s string) error { + buf := make([]byte, len(s)) + _, err := f.Read(buf) + if err != nil { + return err + } + if !bytes.Equal(buf, []byte(s)) { + return fmt.Errorf("expected %q; got %q", s, string(buf)) + } + return nil +} + +func expectChar(f *bufio.Reader, c byte) error { + b, err := f.ReadByte() + switch { + case err != nil: + return fmt.Errorf("reading char: %w", err) + case b != c: + return fmt.Errorf("expected char '%c', got '%c'", c, b) + } + return nil +} + +func readList[T any](f *bufio.Reader, readElement func(f *bufio.Reader) (T, error)) ([]T, error) { + if err := expectChar(f, '['); err != nil { + return nil, fmt.Errorf("list open: %w", err) + } + b, err := f.ReadByte() + if err != nil { + return nil, fmt.Errorf("list first char: %w", err) + } + if b == ']' { + // Empty list. + return nil, nil + } + if err := f.UnreadByte(); err != nil { + return nil, fmt.Errorf("unreading: %w", err) + } + var l []T +loop: + for { + t, err := readElement(f) + if err != nil { + return nil, fmt.Errorf("list element: %w", err) + } + l = append(l, t) + + b, err := f.ReadByte() + switch { + case err != nil: + return nil, fmt.Errorf("reading list element or end of list: %w", err) + case b == ']': + break loop + case b == ',': + continue + default: + return nil, fmt.Errorf("expecting list element or end of list: got '%c'", b) + } + } + return l, nil +} + +func readStrings(f *bufio.Reader) ([]string, error) { + return readList(f, readString) +} + +func readString(f *bufio.Reader) (string, error) { + if err := expectChar(f, '"'); err != nil { + return "", fmt.Errorf("expected string opening '\"': %w", err) + } + var s strings.Builder +loop: + for { + b, err := f.ReadByte() + if err != nil { + return "", fmt.Errorf("reading string: %w", err) + } + switch b { + case '"': + break loop + case '\\': + b, err := f.ReadByte() + if err != nil { + return "", fmt.Errorf("reading string: %w", err) + } + switch b { + case 'n': + s.WriteByte('\n') + case 'r': + s.WriteByte('\r') + case 't': + s.WriteByte('\t') + default: + s.WriteByte(b) + } + continue + } + s.WriteByte(b) + } + return s.String(), nil +} + +func Load(f *bufio.Reader) (*Derivation, error) { + drv := &Derivation{} + + if err := expect(f, "Derive("); err != nil { + return nil, err + } + + type outputWithName struct { + Name string + Output + } + outputList, err := readList(f, func(f *bufio.Reader) (outputWithName, error) { + var ( + empT, o outputWithName + err error + ) + if err := expectChar(f, '('); err != nil { + return empT, fmt.Errorf("output open: %w", err) + } + + if o.Name, err = readString(f); err != nil { + return empT, fmt.Errorf("output name: %w", err) + } + if err := expectChar(f, ','); err != nil { + return empT, fmt.Errorf(", after output name: %w", err) + } + + if o.Path, err = readString(f); err != nil { + return empT, fmt.Errorf("output path: %w", err) + } + if err := expectChar(f, ','); err != nil { + return empT, fmt.Errorf(", after output path: %w", err) + } + + if o.HashAlgorithm, err = readString(f); err != nil { + return empT, fmt.Errorf("output hash algorithm: %w", err) + } + if err := expectChar(f, ','); err != nil { + return empT, fmt.Errorf(", after output hash algorithm: %w", err) + } + + if o.Hash, err = readString(f); err != nil { + return empT, fmt.Errorf("output hash: %w", err) + } + if err := expectChar(f, ')'); err != nil { + return empT, fmt.Errorf("output close: %w", err) + } + return o, nil + }) + if err != nil { + return nil, fmt.Errorf("reading outputs: %w", err) + } + drv.Outputs = make(map[string]Output) + for _, own := range outputList { + drv.Outputs[own.Name] = own.Output + } + if err := expectChar(f, ','); err != nil { + return nil, fmt.Errorf(", after outputs list: %w", err) + } + + drv.InputDerivations, err = readList(f, func(f *bufio.Reader) (InputDerivation, error) { + var ( + empT, id InputDerivation + err error + ) + if err := expectChar(f, '('); err != nil { + return empT, fmt.Errorf("input derivation open: %w", err) + } + + if id.Path, err = readString(f); err != nil { + return empT, fmt.Errorf("input derivation path: %w", err) + } + if err := expectChar(f, ','); err != nil { + return empT, fmt.Errorf(", after input derivation path: %w", err) + } + + if id.Outputs, err = readStrings(f); err != nil { + return empT, fmt.Errorf("input derivation outputs: %w", err) + } + if err := expectChar(f, ')'); err != nil { + return empT, fmt.Errorf("input derivation close: %w", err) + } + return id, nil + }) + if err := expectChar(f, ','); err != nil { + return nil, fmt.Errorf(", after input derivations list: %w", err) + } + + inputSrcsList, err := readStrings(f) + if err != nil { + return nil, fmt.Errorf("reading input srcs list: %w", err) + } + drv.InputSrcs = make(map[string]bool) + for _, k := range inputSrcsList { + drv.InputSrcs[k] = true + } + if err := expectChar(f, ','); err != nil { + return nil, fmt.Errorf(", after input srcs list: %w", err) + } + + if drv.Platform, err = readString(f); err != nil { + return nil, fmt.Errorf("reading platform: %w", err) + } + if err := expectChar(f, ','); err != nil { + return nil, fmt.Errorf(", after platform: %w", err) + } + + if drv.Builder, err = readString(f); err != nil { + return nil, fmt.Errorf("reading builder: %w", err) + } + if err := expectChar(f, ','); err != nil { + return nil, fmt.Errorf(", after builder: %w", err) + } + + if drv.Args, err = readStrings(f); err != nil { + return nil, fmt.Errorf("reading builder args: %w", err) + } + if err := expectChar(f, ','); err != nil { + return nil, fmt.Errorf(", after builder args: %w", err) + } + + type keyValue struct { + key, value string + } + envList, err := readList(f, func(f *bufio.Reader) (keyValue, error) { + var ( + empT, kv keyValue + err error + ) + if err := expectChar(f, '('); err != nil { + return empT, fmt.Errorf("key value open: %w", err) + } + + if kv.key, err = readString(f); err != nil { + return empT, fmt.Errorf("key: %w", err) + } + if err := expectChar(f, ','); err != nil { + return empT, fmt.Errorf(", after key: %w", err) + } + + if kv.value, err = readString(f); err != nil { + return empT, fmt.Errorf("value: %w", err) + } + if err := expectChar(f, ')'); err != nil { + return empT, fmt.Errorf("key value close: %w", err) + } + return kv, nil + }) + drv.Env = make(map[string]string) + for _, kv := range envList { + drv.Env[kv.key] = kv.value + } + + if err := expectChar(f, ')'); err != nil { + return nil, fmt.Errorf("closing brace: %w", err) + } + + return drv, nil +} diff --git a/go/nix/nixstore/activities.go b/go/nix/nixstore/activities.go new file mode 100644 index 0000000000..9f1b35991a --- /dev/null +++ b/go/nix/nixstore/activities.go @@ -0,0 +1,438 @@ +package nixstore + +import ( + "log" + "sync" + "time" +) + +const ( + progressLogRateLimit = 1 * time.Second +) + +type ActivityType uint64 + +const ( + ActUnknown ActivityType = 0 + ActCopyPath ActivityType = 99 + iota + ActDownload + ActRealise + ActCopyPaths + ActBuilds + ActBuild + ActOptimiseStore + ActVerifyPaths + ActSubstitute + ActQueryPathInfo + ActPostBuildHook +) + +func (at ActivityType) String() string { + return map[ActivityType]string{ + ActUnknown: "unknown", + ActCopyPath: "copying path", + ActDownload: "downloading path", + ActRealise: "realising path", + ActCopyPaths: "copying paths", + ActBuilds: "running builds", + ActBuild: "building", + ActOptimiseStore: "optimising store", + ActVerifyPaths: "verifying paths", + ActSubstitute: "substituting path", // paths? + ActQueryPathInfo: "querying path info", + ActPostBuildHook: "running post-build hook", + }[at] +} + +type ResultType uint64 + +const ( + ResFileLinked ResultType = 100 + iota + ResBuildLogLine + ResUntrustedPath + ResCorruptedPath + ResSetPhase + ResProgress + ResSetExpected + ResPostBuildLogLine +) + +func (rt ResultType) String() string { + return map[ResultType]string{ + ResFileLinked: "linked file", // [size: int, blocks: int] + ResBuildLogLine: "log line from build", // [log line: str] + ResUntrustedPath: "untrusted path", // [path: str] + ResCorruptedPath: "corrupted path", // [path: str] + ResSetPhase: "entering phase", // [phase: str] + ResProgress: "progress update", // [done: int, expected: int, running: int, failed: int] + ResSetExpected: "update expectation", // [activityType: int, expected: int] + ResPostBuildLogLine: "log line from post-build", // [log line: str] + }[rt] +} + +type ActivityTracker struct { + actions []*ActivityLogger + mu sync.Mutex +} + +func NewActivityTracker() *ActivityTracker { + return &ActivityTracker{} +} + +type ActivityLogger struct { + // Static for lifetime of logger. + action string + at *ActivityTracker + + // Guarded by mu. + activities map[uint64]Activity + logs []ActivityMetaLog + mu sync.Mutex +} + +func (al *ActivityLogger) AddLog(message string) { + if al == nil { + log.Println(message) + return + } + + al.mu.Lock() + al.logs = append(al.logs, ActivityMetaLog{ + Timestamp: time.Now(), + LogLine: message, + }) + al.mu.Unlock() +} + +func (al *ActivityLogger) AddError(errorCode uint64, message string) { + if al == nil { + log.Printf("error %d: %v", errorCode, message) + return + } + + al.mu.Lock() + al.logs = append(al.logs, ActivityMetaLog{ + Timestamp: time.Now(), + ErrorCode: errorCode, + ErrorMessage: message, + }) + al.mu.Unlock() +} + +type Activity interface { + ActivityType() ActivityType + Meta() *ActivityMeta +} + +type ActivityMetaLog struct { + Timestamp time.Time + + // One of: + // Line + LogLine string + + // Error + ErrorCode uint64 + ErrorMessage string + + // Started + ActivityStarted Activity + + // Finished + ActivityFinished Activity +} + +type LogLineType int + +const ( + LogLineBuild LogLineType = iota + LogLinePostBuild +) + +type ActivityLog struct { + Timestamp time.Time + + // One of: + LogLineType LogLineType + LogLine string + + NewPhase string + + Progress []any + + Expected []any +} + +type ActivityMeta struct { + ActivityID uint64 + Level uint64 + String string + Fields []any + ParentActivityID uint64 + + mu sync.Mutex + logs []*ActivityLog + phase string + progress []any + firstProgressLog *ActivityLog + lastProgressLogCreated time.Time + lastProgressLog *ActivityLog + expected []any +} + +func (am *ActivityMeta) RecordResult(result ResultType, fields []any) { + if am == nil { + return + } + now := time.Now() + am.mu.Lock() + switch result { + case ResBuildLogLine, ResPostBuildLogLine: + var llt LogLineType + if result == ResBuildLogLine { + llt = LogLineBuild + } else { + llt = LogLinePostBuild + } + am.logs = append(am.logs, &ActivityLog{ + Timestamp: now, + LogLineType: llt, + LogLine: fields[0].(string), + }) + case ResSetPhase: + phase := fields[0].(string) + am.logs = append(am.logs, &ActivityLog{ + Timestamp: now, + NewPhase: phase, + }) + am.phase = phase + case ResProgress: + am.progress = fields + + if am.firstProgressLog == nil { + // If we've never logged progress before, log it. + log := &ActivityLog{ + Timestamp: now, + Progress: fields, + } + am.logs = append(am.logs, log) + am.firstProgressLog = log + } else if am.lastProgressLog == nil { + // If we don't have a tail log entry, log it. + log := &ActivityLog{ + Timestamp: now, + Progress: fields, + } + am.logs = append(am.logs, log) + am.lastProgressLog = log + am.lastProgressLogCreated = now + } else { + am.lastProgressLog.Timestamp = now + am.lastProgressLog.Progress = fields + if time.Since(am.lastProgressLogCreated) > progressLogRateLimit { + // If our last progress log was more than rate limit ago, bump the log entry forwards and mark that we should create a new one. + am.lastProgressLog = nil + } + } + case ResSetExpected: + am.expected = fields + am.logs = append(am.logs, &ActivityLog{ + Timestamp: now, + Expected: fields, + }) + } + am.mu.Unlock() +} + +var ( + activityRegistry = map[ActivityType]func(ActivityMeta) Activity{} +) + +func registerActivityType(a Activity, factory func(ActivityMeta) Activity) { + activityRegistry[a.ActivityType()] = factory +} + +type CopyPathActivity struct{ ActivityMeta } + +func (*CopyPathActivity) ActivityType() ActivityType { return ActCopyPath } +func (a *CopyPathActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&CopyPathActivity{}, func(am ActivityMeta) Activity { return &CopyPathActivity{am} }) +} + +type DownloadActivity struct{ ActivityMeta } + +func (*DownloadActivity) ActivityType() ActivityType { return ActDownload } +func (a *DownloadActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&DownloadActivity{}, func(am ActivityMeta) Activity { return &DownloadActivity{am} }) +} + +type RealiseActivity struct{ ActivityMeta } + +func (*RealiseActivity) ActivityType() ActivityType { return ActRealise } +func (a *RealiseActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&RealiseActivity{}, func(am ActivityMeta) Activity { return &RealiseActivity{am} }) +} + +type BuildsActivity struct{ ActivityMeta } + +func (*BuildsActivity) ActivityType() ActivityType { return ActBuilds } +func (a *BuildsActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&BuildsActivity{}, func(am ActivityMeta) Activity { return &BuildsActivity{am} }) +} + +type BuildActivity struct{ ActivityMeta } + +func (*BuildActivity) ActivityType() ActivityType { return ActBuild } +func (a *BuildActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&BuildActivity{}, func(am ActivityMeta) Activity { return &BuildActivity{am} }) +} + +type OptimiseStoreActivity struct{ ActivityMeta } + +func (*OptimiseStoreActivity) ActivityType() ActivityType { return ActOptimiseStore } +func (a *OptimiseStoreActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&OptimiseStoreActivity{}, func(am ActivityMeta) Activity { return &OptimiseStoreActivity{am} }) +} + +type VerifyPathsActivity struct{ ActivityMeta } + +func (*VerifyPathsActivity) ActivityType() ActivityType { return ActVerifyPaths } +func (a *VerifyPathsActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&VerifyPathsActivity{}, func(am ActivityMeta) Activity { return &VerifyPathsActivity{am} }) +} + +type SubstituteActivity struct{ ActivityMeta } + +func (*SubstituteActivity) ActivityType() ActivityType { return ActSubstitute } +func (a *SubstituteActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&SubstituteActivity{}, func(am ActivityMeta) Activity { return &SubstituteActivity{am} }) +} + +type QueryPathInfoActivity struct{ ActivityMeta } + +func (*QueryPathInfoActivity) ActivityType() ActivityType { return ActQueryPathInfo } +func (a *QueryPathInfoActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&QueryPathInfoActivity{}, func(am ActivityMeta) Activity { return &QueryPathInfoActivity{am} }) +} + +type PostBuildHookActivity struct{ ActivityMeta } + +func (*PostBuildHookActivity) ActivityType() ActivityType { return ActPostBuildHook } +func (a *PostBuildHookActivity) Meta() *ActivityMeta { return &a.ActivityMeta } + +func init() { + registerActivityType(&PostBuildHookActivity{}, func(am ActivityMeta) Activity { return &PostBuildHookActivity{am} }) +} + +func (at *ActivityTracker) StartAction(actionName string) *ActivityLogger { + if at == nil { + return nil + } + + al := &ActivityLogger{ + action: actionName, + at: at, + activities: make(map[uint64]Activity), + } + at.mu.Lock() + at.actions = append(at.actions, al) + at.mu.Unlock() + return al +} + +func (at *ActivityTracker) EndAction(al *ActivityLogger) { + if at == nil { + return + } + + at.mu.Lock() + newActions := make([]*ActivityLogger, 0, len(at.actions)-1) + for _, oal := range at.actions { + if oal != al { + newActions = append(newActions, oal) + } + } + at.actions = newActions + at.mu.Unlock() +} + +func (al *ActivityLogger) Close() { + if al == nil { + return + } + + al.at.EndAction(al) +} + +func (al *ActivityLogger) StartActivity(at ActivityType, am ActivityMeta) Activity { + if al == nil { + return nil + } + + af, ok := activityRegistry[at] + if !ok { + return nil + } + a := af(am) + + al.mu.Lock() + al.logs = append(al.logs, ActivityMetaLog{ + Timestamp: time.Now(), + ActivityStarted: a, + }) + al.activities[am.ActivityID] = a + al.mu.Unlock() + + return a +} + +func (al *ActivityLogger) Activity(activityID uint64) Activity { + if al == nil { + return nil + } + + var a Activity + al.mu.Lock() + a = al.activities[activityID] + al.mu.Unlock() + return a +} + +func (al *ActivityLogger) ActivityResult(a Activity, resultType ResultType, data []any) { + if a == nil || al == nil { + return + } + + a.Meta().RecordResult(resultType, data) +} + +func (al *ActivityLogger) EndActivity(a Activity) { + if al == nil || a == nil { + return + } + + al.mu.Lock() + al.logs = append(al.logs, ActivityMetaLog{ + Timestamp: time.Now(), + ActivityFinished: a, + }) + al.mu.Unlock() +} diff --git a/go/nix/nixstore/default.nix b/go/nix/nixstore/default.nix index 056f8aa5a8..8e4ec1c48f 100644 --- a/go/nix/nixstore/default.nix +++ b/go/nix/nixstore/default.nix @@ -15,5 +15,6 @@ depot.third_party.buildGo.package { go.nix.nar.narinfo go.nix.nixwire third_party.gopkgs."github.com".mattn.go-sqlite3 + third_party.gopkgs."golang.org".x.crypto.ssh ]; } diff --git a/go/nix/nixstore/remotestore.go b/go/nix/nixstore/remotestore.go index 250f489ca9..1904c7aa60 100644 --- a/go/nix/nixstore/remotestore.go +++ b/go/nix/nixstore/remotestore.go @@ -3,13 +3,14 @@ package nixstore import ( "encoding/base64" "fmt" - "log" + "io" "net" "path" "strings" "sync" "hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo" + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" "hg.lukegb.com/lukegb/depot/go/nix/nixwire" ) @@ -19,8 +20,12 @@ const ( WorkerMagic2 = 0x6478696f ProtocolVersion = 0x115 + WopBuildDerivation = 36 + WopEnsurePath = 10 WopQueryPathInfo = 26 WopQuerySubstitutablePathInfo = 21 + + MaxBuf = 1024 * 1024 // 1 MB ) type Daemon struct { @@ -42,7 +47,7 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) { return nil, fmt.Errorf("writing store path query %v: %w", storePath, err) } - if err := d.processStderr(); err != nil { + if err := d.processStderr(nil, nil, nil); err != nil { return nil, fmt.Errorf("reading stderr from WopQueryPathInfo: %w", err) } @@ -151,7 +156,7 @@ func (d *Daemon) readFields() ([]any, error) { return fields, nil } -func (d *Daemon) processStderr() error { +func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Reader) error { for { msg, err := d.r.ReadUint64() if err != nil { @@ -159,13 +164,37 @@ func (d *Daemon) processStderr() error { } switch msg { case 0x64617416: // STDERR_WRITE - stderr, err := d.r.ReadString() - if err != nil { - return fmt.Errorf("read stderr msg: %w", err) + if stdout == nil { + return fmt.Errorf("STDERR_WRITE requested") + } + bs, err := d.r.ReadBytes() + if err != nil { + return fmt.Errorf("read bytes: %w", err) + } + _, err = stdout.Write(bs) + if err != nil { + return fmt.Errorf("write bytes into stdout: %w", err) } - log.Println(stderr) case 0x64617461: // STDERR_READ - return fmt.Errorf("STDERR_READ requested") + if stdin == nil { + return fmt.Errorf("STDERR_READ requested") + } + readSizeU64, err := d.r.ReadUint64() + if err != nil { + return fmt.Errorf("STDERR_READ reading uint64: %w", err) + } + readSize := int(readSizeU64) + if readSize > MaxBuf { + readSize = MaxBuf + } + buf := make([]byte, readSize) + readSize, err = stdin.Read(buf) + if err != nil { + return fmt.Errorf("STDERR_READ reading from stdin: %w", err) + } + if _, err := d.w.WriteBytes(buf[:readSize]); err != nil { + return fmt.Errorf("STDERR_READ writing stdin to socket: %w", err) + } case 0x63787470: // STDERR_ERROR errStr, err := d.r.ReadString() if err != nil { @@ -175,13 +204,14 @@ func (d *Daemon) processStderr() error { if err != nil { return fmt.Errorf("STDERR_ERROR reading uint64: %w", err) } + al.AddError(status, errStr) return fmt.Errorf("error code %d: %v", status, errStr) case 0x6f6c6d67: // STDERR_NEXT - errStr, err := d.r.ReadString() + msg, err := d.r.ReadString() if err != nil { - return fmt.Errorf("STDERR_NEXT reading error string: %w", err) + return fmt.Errorf("STDERR_NEXT reading log string: %w", err) } - return fmt.Errorf("error: %v", errStr) + al.AddLog(msg) case 0x53545254: // STDERR_START_ACTIVITY activity, err := d.r.ReadUint64() if err != nil { @@ -191,10 +221,11 @@ func (d *Daemon) processStderr() error { if err != nil { return fmt.Errorf("STDERR_START_ACTIVITY reading level: %w", err) } - actType, err := d.r.ReadUint64() + actTypeU64, err := d.r.ReadUint64() if err != nil { return fmt.Errorf("STDERR_START_ACTIVITY reading activity type: %w", err) } + actType := ActivityType(actTypeU64) s, err := d.r.ReadString() if err != nil { return fmt.Errorf("STDERR_START_ACTIVITY reading s: %w", err) @@ -207,27 +238,34 @@ func (d *Daemon) processStderr() error { if err != nil { return fmt.Errorf("STDERR_START_ACTIVITY reading parent activity: %w", err) } - log.Printf("START_ACTIVITY activity=%d, lvl=%d, actType=%d, s=%s, fields=%v, parentActivity=%d", activity, lvl, actType, s, fields, parentActivity) + al.StartActivity(actType, ActivityMeta{ + ActivityID: activity, + Level: lvl, + String: s, + Fields: fields, + ParentActivityID: parentActivity, + }) case 0x53544f50: // STDERR_STOP_ACTIVITY activity, err := d.r.ReadUint64() if err != nil { return fmt.Errorf("STDERR_STOP_ACTIVITY reading ActivityId: %w", err) } - log.Printf("STOP_ACTIVITY activity=%d", activity) + al.EndActivity(al.Activity(activity)) case 0x52534c54: // STDERR_RESULT activity, err := d.r.ReadUint64() if err != nil { return fmt.Errorf("STDERR_RESULT reading ActivityId: %w", err) } - resultTyp, err := d.r.ReadUint64() + resultTypU64, err := d.r.ReadUint64() if err != nil { return fmt.Errorf("STDERR_RESULT reading result: %w", err) } + resultTyp := ResultType(resultTypU64) fields, err := d.readFields() if err != nil { return fmt.Errorf("STDERR_RESULT fields: %w", err) } - log.Printf("activity RESULT activity=%d resultType=%d fields=%v", activity, resultTyp, fields) + al.ActivityResult(al.Activity(activity), resultTyp, fields) case 0x616c7473: // STDERR_LAST return nil } @@ -272,7 +310,135 @@ func (d *Daemon) hello() error { return fmt.Errorf("writing obsolete reserveSpace: %w", err) } - return d.processStderr() + return d.processStderr(nil, nil, nil) +} + +func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error { + al := at.StartAction(fmt.Sprintf("EnsurePath(%q)", storePath)) + defer al.Close() + + d.mu.Lock() + defer d.mu.Unlock() + + if _, err := d.w.WriteUint64(WopEnsurePath); err != nil { + return fmt.Errorf("writing worker op WopEnsurePath: %w", err) + } + if _, err := d.w.WriteString(storePath); err != nil { + return fmt.Errorf("writing store path %v: %w", storePath, err) + } + + if err := d.processStderr(al, nil, nil); err != nil { + return fmt.Errorf("reading stderr from WopEnsurePath: %w", err) + } + + validInt, err := d.r.ReadUint64() + if err != nil { + return fmt.Errorf("reading path validity: %w", err) + } + if validInt != 1 { + return fmt.Errorf("path validity was %d; wanted 1", validInt) + } + return nil +} + +type BuildMode uint64 + +const ( + BMNormal BuildMode = iota + BMRepair + BMCheck +) + +func (bm BuildMode) String() string { + return map[BuildMode]string{ + BMNormal: "normal", + BMRepair: "repair", + BMCheck: "check", + }[bm] +} + +type BuildResultStatus uint64 + +const ( + BRSBuilt BuildResultStatus = iota + BRSSubstituted + BRSAlreadyValid + BRSPermanentFailure + BRSInputRejected + BRSOutputRejected + BRSTransientFailure + BRSTimedOut + BRSMiscFailure + BRSDependencyFailed + BRSLogLimitExceeded + BRSNotDeterministic +) + +func (brs BuildResultStatus) String() string { + return map[BuildResultStatus]string{ + BRSBuilt: "built", + BRSSubstituted: "substituted", + BRSAlreadyValid: "already valid", + BRSPermanentFailure: "permanent failure", + BRSInputRejected: "input rejected", + BRSOutputRejected: "output rejected", + BRSTransientFailure: "transient failure", + BRSTimedOut: "timed out", + BRSMiscFailure: "misc failure", + BRSDependencyFailed: "dependency failed", + BRSLogLimitExceeded: "log limit exceeded", + BRSNotDeterministic: "not deterministic", + }[brs] +} + +func (brs BuildResultStatus) IsSuccess() bool { + switch brs { + case BRSBuilt, BRSSubstituted, BRSAlreadyValid: + return true + default: + return false + } +} + +func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, derivation *nixdrv.BasicDerivation, buildMode BuildMode) (BuildResultStatus, error) { + al := at.StartAction(fmt.Sprintf("BuildDerivation(%q, %q)", derivationPath, buildMode)) + defer al.Close() + + d.mu.Lock() + defer d.mu.Unlock() + + if _, err := d.w.WriteUint64(WopBuildDerivation); err != nil { + return BRSMiscFailure, fmt.Errorf("writing worker op WopBuildDerivation: %w", err) + } + if _, err := d.w.WriteString(derivationPath); err != nil { + return BRSMiscFailure, fmt.Errorf("writing derivation store path %v: %w", derivationPath, err) + } + if _, err := d.w.WriteDerivation(derivation); err != nil { + return BRSMiscFailure, fmt.Errorf("writing derivation content of %v: %w", derivationPath, err) + } + if _, err := d.w.WriteUint64(uint64(buildMode)); err != nil { + return BRSMiscFailure, fmt.Errorf("writing build mode %v: %w", buildMode, err) + } + + if err := d.processStderr(al, nil, nil); err != nil { + return BRSMiscFailure, fmt.Errorf("reading stderr from WopBuildDerivation: %w", err) + } + + buildStatusU64, err := d.r.ReadUint64() + if err != nil { + return BRSMiscFailure, fmt.Errorf("reading build status code: %w", err) + } + buildStatus := BuildResultStatus(buildStatusU64) + buildErrorMsg, err := d.r.ReadString() + if err != nil { + return BRSMiscFailure, fmt.Errorf("reading build error message: %w", err) + } + + if !buildStatus.IsSuccess() { + return buildStatus, fmt.Errorf("%s: %s", buildStatus, buildErrorMsg) + } + + return buildStatus, nil } func OpenDaemon(path string) (*Daemon, error) { diff --git a/go/nix/nixwire/nixwire.go b/go/nix/nixwire/nixwire.go index a0e7eb83c1..3c9a86d683 100644 --- a/go/nix/nixwire/nixwire.go +++ b/go/nix/nixwire/nixwire.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "fmt" "io" + + "hg.lukegb.com/lukegb/depot/go/nix/nixdrv" ) type Serializer struct { @@ -25,21 +27,116 @@ func (w Serializer) WriteUint64(n uint64) (int64, error) { return int64(wrote), err } -func (w Serializer) WriteString(s string) (int64, error) { - nSize, err := w.WriteUint64(uint64(len(s))) +func (w Serializer) WriteBytes(bs []byte) (int64, error) { + nSize, err := w.WriteUint64(uint64(len(bs))) if err != nil { return int64(nSize), err } - nData, err := w.Write([]byte(s)) + nData, err := w.Write(bs) if err != nil { return int64(nSize) + int64(nData), err } - nPad, err := w.WritePadding(int64(len(s))) + nPad, err := w.WritePadding(int64(len(bs))) return int64(nSize) + int64(nData) + int64(nPad), err } +func (w Serializer) WriteString(s string) (int64, error) { + return w.WriteBytes([]byte(s)) +} + +func (w Serializer) WriteStrings(ss []string) (int64, error) { + nLen, err := w.WriteUint64(uint64(len(ss))) + if err != nil { + return nLen, err + } + var nTotal int64 + for _, s := range ss { + nStr, err := w.WriteString(s) + if err != nil { + return nLen + nTotal, err + } + nTotal += nStr + } + return nLen + nTotal, nil +} + +func (w Serializer) WriteStringSet(s map[string]bool) (int64, error) { + ss := make([]string, 0, len(s)) + for k := range s { + ss = append(ss, k) + } + return w.WriteStrings(ss) +} + +func (w Serializer) WriteDerivation(drv *nixdrv.BasicDerivation) (int64, error) { + nRunningTotal, err := w.WriteUint64(uint64(len(drv.Outputs))) + if err != nil { + return nRunningTotal, err + } + for outputName, output := range drv.Outputs { + nOutputName, err := w.WriteString(outputName) + nRunningTotal += nOutputName + if err != nil { + return nRunningTotal, err + } + nOutputPath, err := w.WriteString(output.Path) + nRunningTotal += nOutputPath + if err != nil { + return nRunningTotal, err + } + nOutputHashAlgo, err := w.WriteString(output.HashAlgorithm) + nRunningTotal += nOutputHashAlgo + if err != nil { + return nRunningTotal, err + } + nOutputHash, err := w.WriteString(output.Hash) + nRunningTotal += nOutputHash + if err != nil { + return nRunningTotal, err + } + } + nInputSrcs, err := w.WriteStringSet(drv.InputSrcs) + nRunningTotal += nInputSrcs + if err != nil { + return nRunningTotal, err + } + nPlatform, err := w.WriteString(drv.Platform) + nRunningTotal += nPlatform + if err != nil { + return nRunningTotal, err + } + nBuilder, err := w.WriteString(drv.Builder) + nRunningTotal += nBuilder + if err != nil { + return nRunningTotal, err + } + nArgs, err := w.WriteStrings(drv.Args) + nRunningTotal += nArgs + if err != nil { + return nRunningTotal, err + } + nEnvSize, err := w.WriteUint64(uint64(len(drv.Env))) + nRunningTotal += nEnvSize + if err != nil { + return nRunningTotal, err + } + for k, v := range drv.Env { + nEnvKey, err := w.WriteString(k) + nRunningTotal += nEnvKey + if err != nil { + return nRunningTotal, err + } + nEnvValue, err := w.WriteString(v) + nRunningTotal += nEnvValue + if err != nil { + return nRunningTotal, err + } + } + return nRunningTotal, nil +} + type Deserializer struct { io.Reader } @@ -70,22 +167,30 @@ func (r Deserializer) ReadUint64() (uint64, error) { return binary.LittleEndian.Uint64(buf), nil } -func (r Deserializer) ReadString() (string, error) { +func (r Deserializer) ReadBytes() ([]byte, error) { strLen, err := r.ReadUint64() if err != nil { - return "", fmt.Errorf("reading length: %w", err) + return nil, fmt.Errorf("reading length: %w", err) } strBuf := make([]byte, int(strLen)) if _, err := io.ReadFull(r, strBuf); err != nil { - return "", fmt.Errorf("reading string buffer: %w", err) + return nil, fmt.Errorf("reading string buffer: %w", err) } if err := r.ReadPadding(strLen); err != nil { - return "", fmt.Errorf("reading string padding: %w", err) + return nil, fmt.Errorf("reading string padding: %w", err) } - return string(strBuf), nil + return strBuf, nil +} + +func (r Deserializer) ReadString() (string, error) { + bs, err := r.ReadBytes() + if err != nil { + return "", err + } + return string(bs), nil } func (r Deserializer) ReadStrings() ([]string, error) { @@ -103,3 +208,77 @@ func (r Deserializer) ReadStrings() ([]string, error) { } return xs, nil } + +func (r Deserializer) ReadStringSet() (map[string]bool, error) { + strs, err := r.ReadStrings() + if err != nil { + return nil, err + } + + m := make(map[string]bool, len(strs)) + for _, k := range strs { + m[k] = true + } + return m, nil +} + +func (r Deserializer) ReadDerivation() (*nixdrv.BasicDerivation, error) { + drv := &nixdrv.BasicDerivation{} + + outputsLen, err := r.ReadUint64() + if err != nil { + return nil, fmt.Errorf("reading outputs length: %w", err) + } + + drv.Outputs = make(map[string]nixdrv.Output) + for n := uint64(0); n < outputsLen; n++ { + outputName, err := r.ReadString() + if err != nil { + return nil, fmt.Errorf("reading output name: %w", err) + } + var o nixdrv.Output + if o.Path, err = r.ReadString(); err != nil { + return nil, fmt.Errorf("reading output path: %w", err) + } + if o.HashAlgorithm, err = r.ReadString(); err != nil { + return nil, fmt.Errorf("reading output hash algorithm: %w", err) + } + if o.Hash, err = r.ReadString(); err != nil { + return nil, fmt.Errorf("reading output hash: %w", err) + } + + drv.Outputs[outputName] = o + } + + if drv.InputSrcs, err = r.ReadStringSet(); err != nil { + return nil, fmt.Errorf("reading input srcs: %w", err) + } + if drv.Platform, err = r.ReadString(); err != nil { + return nil, fmt.Errorf("reading platform: %w", err) + } + if drv.Builder, err = r.ReadString(); err != nil { + return nil, fmt.Errorf("reading builder: %w", err) + } + if drv.Args, err = r.ReadStrings(); err != nil { + return nil, fmt.Errorf("reading args: %w", err) + } + + envLen, err := r.ReadUint64() + if err != nil { + return nil, fmt.Errorf("reading environment length: %w", err) + } + drv.Env = map[string]string{} + for n := uint64(0); n < envLen; n++ { + key, err := r.ReadString() + if err != nil { + return nil, fmt.Errorf("reading environment variable name: %w", err) + } + value, err := r.ReadString() + if err != nil { + return nil, fmt.Errorf("reading environment variable value: %w", err) + } + drv.Env[key] = value + } + + return drv, nil +}