902 lines
23 KiB
Go
902 lines
23 KiB
Go
package nixstore
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
|
|
"hg.lukegb.com/lukegb/depot/go/nix/nar"
|
|
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
|
|
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
|
|
"hg.lukegb.com/lukegb/depot/go/nix/nixwire"
|
|
)
|
|
|
|
const (
|
|
DaemonSock = "/nix/var/nix/daemon-socket/socket"
|
|
WorkerMagic1 = 0x6e697863
|
|
WorkerMagic2 = 0x6478696f
|
|
ProtocolVersion = 0x120 // 0x115
|
|
|
|
WopAddMultipleToStore = 44
|
|
WopAddToStoreNar = 39
|
|
WopBuildDerivation = 36
|
|
WopEnsurePath = 10
|
|
WopIsValidPath = 1
|
|
WopNarFromPath = 38
|
|
WopQueryPathInfo = 26
|
|
WopQuerySubstitutablePathInfo = 21
|
|
WopQueryValidDerivers = 33
|
|
WopQueryValidPaths = 31
|
|
|
|
MaxBuf = 1024 * 1024 // 1 MB
|
|
)
|
|
|
|
type Daemon struct {
|
|
name string
|
|
|
|
conn io.Closer
|
|
w *nixwire.Serializer
|
|
r *nixwire.Deserializer
|
|
mu sync.Mutex
|
|
err error
|
|
|
|
peerMinorVersion int
|
|
}
|
|
|
|
type PathNotValidError struct{ Path string }
|
|
|
|
func (err PathNotValidError) Error() string {
|
|
return fmt.Sprintf("path %q is not valid", err.Path)
|
|
}
|
|
|
|
func (d *Daemon) Err() error {
|
|
return d.err
|
|
}
|
|
|
|
func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopQueryPathInfo); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing worker op WopQueryPathInfo: %w", err)
|
|
}
|
|
if _, err := d.w.WriteString(storePath); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing store path query %v: %w", storePath, err)
|
|
}
|
|
|
|
if err := d.processStderr(nil, nil, nil); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading stderr from WopQueryPathInfo: %w", err)
|
|
}
|
|
|
|
validInt, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading path validity: %w", err)
|
|
}
|
|
valid := validInt == uint64(1)
|
|
if !valid {
|
|
return nil, PathNotValidError{storePath}
|
|
}
|
|
|
|
ni := &narinfo.NarInfo{
|
|
StorePath: storePath,
|
|
}
|
|
if ni.Deriver, err = d.r.ReadString(); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading deriver: %w", err)
|
|
}
|
|
ni.Deriver = path.Base(ni.Deriver)
|
|
if ni.Deriver == "." {
|
|
ni.Deriver = ""
|
|
}
|
|
|
|
hashStr, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading NAR hash: %w", err)
|
|
}
|
|
if ni.NarHash, err = narinfo.HashFromString("sha256:" + hashStr); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("parsing NAR hash %q: %w", hashStr, err)
|
|
}
|
|
|
|
refs, err := d.r.ReadStrings()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading referrers: %w", err)
|
|
}
|
|
for n, ref := range refs {
|
|
refs[n] = path.Base(ref)
|
|
}
|
|
ni.References = refs
|
|
|
|
if _, err := d.r.ReadUint64(); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading registration time: %w", err)
|
|
}
|
|
if ni.NarSize, err = d.r.ReadUint64(); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading narsize: %w", err)
|
|
}
|
|
if _, err := d.r.ReadUint64(); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading ultimate: %w", err)
|
|
}
|
|
sigs, err := d.r.ReadStrings()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading sigs: %w", err)
|
|
}
|
|
ni.CA, err = d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading CA: %w", err)
|
|
}
|
|
|
|
ni.Sig = make(map[string][]byte)
|
|
for _, sigStr := range sigs {
|
|
sigBits := strings.Split(sigStr, ":")
|
|
sigHash, err := base64.StdEncoding.DecodeString(sigBits[1])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decoding signature %q: %w", sigStr, err)
|
|
}
|
|
ni.Sig[sigBits[0]] = sigHash
|
|
}
|
|
|
|
return ni, nil
|
|
}
|
|
|
|
func (d *Daemon) NARFromPath(storePath string) (io.ReadCloser, error) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopNarFromPath); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing worker op WopNarFromPath: %w", err)
|
|
}
|
|
if _, err := d.w.WriteString(storePath); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing store path query %v: %w", storePath, err)
|
|
}
|
|
if err := d.processStderr(nil, nil, nil); err != nil {
|
|
return nil, fmt.Errorf("waiting for NAR packing: %w", err)
|
|
}
|
|
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
err := nar.Unpack(io.TeeReader(d.r.Reader, pw), nar.NullFS{}, "irrelevant")
|
|
if err != nil {
|
|
pw.CloseWithError(err)
|
|
} else {
|
|
pw.Close()
|
|
}
|
|
}()
|
|
|
|
return io.NopCloser(pr), nil
|
|
}
|
|
|
|
func (d *Daemon) writeOp(op uint64) (int64, error) {
|
|
return d.w.WriteUint64(op)
|
|
}
|
|
|
|
func (d *Daemon) ValidPaths(storePaths []string) ([]string, error) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopQueryValidPaths); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing worker op WopQueryValidPaths: %w", err)
|
|
}
|
|
if _, err := d.w.WriteStrings(storePaths); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing store path query %v: %w", storePaths, err)
|
|
}
|
|
if d.peerMinorVersion >= 27 {
|
|
// Substitute flag
|
|
if _, err := d.w.WriteUint64(0); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing substitute flag: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := d.processStderr(nil, nil, nil); err != nil {
|
|
return nil, fmt.Errorf("reading stderr from WopQueryValidPaths: %w", err)
|
|
}
|
|
|
|
validPaths, err := d.r.ReadStrings()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading valid paths: %w", err)
|
|
}
|
|
return validPaths, nil
|
|
}
|
|
|
|
func (d *Daemon) Close() error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
return d.conn.Close()
|
|
}
|
|
|
|
func (d *Daemon) readFields() ([]any, error) {
|
|
fieldsSz, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading fields count: %w", err)
|
|
}
|
|
var fields []any
|
|
for n := 0; n < int(fieldsSz); n++ {
|
|
fieldTyp, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading field %d type: %w", n, err)
|
|
}
|
|
switch fieldTyp {
|
|
case 0: // tInt
|
|
fieldVal, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading field %d int: %w", n, err)
|
|
}
|
|
fields = append(fields, fieldVal)
|
|
case 1: // tString
|
|
fieldVal, err := d.r.ReadString()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading field %d str: %w", n, err)
|
|
}
|
|
fields = append(fields, fieldVal)
|
|
default:
|
|
return nil, fmt.Errorf("reading field %d: unsupported field type %x", n, fieldTyp)
|
|
}
|
|
}
|
|
return fields, nil
|
|
}
|
|
|
|
type NixError struct {
|
|
Code uint64
|
|
Message string
|
|
}
|
|
|
|
func (ne NixError) Error() string {
|
|
return fmt.Sprintf("nix error %d: %v", ne.Code, ne.Message)
|
|
}
|
|
|
|
func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Reader) error {
|
|
for {
|
|
msg, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("read stderr msg code: %w", err)
|
|
}
|
|
switch msg {
|
|
case 0x64617416: // STDERR_WRITE
|
|
if stdout == nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_WRITE requested")
|
|
}
|
|
bs, err := d.r.ReadBytes()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("read bytes: %w", err)
|
|
}
|
|
_, err = stdout.Write(bs)
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("write bytes into stdout: %w", err)
|
|
}
|
|
case 0x64617461: // STDERR_READ
|
|
if stdin == nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_READ requested")
|
|
}
|
|
readSizeU64, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_READ reading uint64: %w", err)
|
|
}
|
|
readSize := int(readSizeU64)
|
|
if readSize > MaxBuf {
|
|
readSize = MaxBuf
|
|
}
|
|
buf := make([]byte, readSize)
|
|
readSize, err = stdin.Read(buf)
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_READ reading from stdin: %w", err)
|
|
}
|
|
if _, err := d.w.WriteBytes(buf[:readSize]); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_READ writing stdin to socket: %w", err)
|
|
}
|
|
case 0x63787470: // STDERR_ERROR
|
|
if d.peerMinorVersion >= 26 {
|
|
errStr, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading 'Error' string: %w", err)
|
|
}
|
|
errLevel, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading error level: %w", err)
|
|
}
|
|
errName, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading error name: %w", err)
|
|
}
|
|
errMsg, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading error message: %w", err)
|
|
}
|
|
errPos, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading error position: %w", err)
|
|
}
|
|
log.Printf("error? Error=%s Level=%d Name=%s Msg=%s Pos=%d", errStr, errLevel, errName, errMsg, errPos)
|
|
errTraceNum, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading trace count: %w", err)
|
|
}
|
|
for n := 0; n < int(errTraceNum); n++ {
|
|
tracePos, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading trace position: %w", err)
|
|
}
|
|
traceHint, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading trace hint: %w", err)
|
|
}
|
|
log.Printf("error? trace %d/%d: Pos=%d Hint=%s", n+1, errTraceNum, tracePos, traceHint)
|
|
}
|
|
return fmt.Errorf("nix Error=%s Level=%d Name=%s Msg=%s Pos=%d", errStr, errLevel, errName, errMsg, errPos)
|
|
} else {
|
|
errStr, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading error string: %w", err)
|
|
}
|
|
status, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_ERROR reading uint64: %w", err)
|
|
}
|
|
al.AddError(status, errStr)
|
|
return NixError{status, errStr}
|
|
}
|
|
case 0x6f6c6d67: // STDERR_NEXT
|
|
msg, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_NEXT reading log string: %w", err)
|
|
}
|
|
al.AddLog(msg)
|
|
case 0x53545254: // STDERR_START_ACTIVITY
|
|
activity, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_START_ACTIVITY reading ActivityId: %w", err)
|
|
}
|
|
lvl, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_START_ACTIVITY reading level: %w", err)
|
|
}
|
|
actTypeU64, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_START_ACTIVITY reading activity type: %w", err)
|
|
}
|
|
actType := ActivityType(actTypeU64)
|
|
s, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_START_ACTIVITY reading s: %w", err)
|
|
}
|
|
fields, err := d.readFields()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_START_ACTIVITY fields: %w", err)
|
|
}
|
|
parentActivity, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_START_ACTIVITY reading parent activity: %w", err)
|
|
}
|
|
al.StartActivity(actType, ActivityMeta{
|
|
ActivityID: activity,
|
|
Level: lvl,
|
|
String: s,
|
|
Fields: fields,
|
|
ParentActivityID: parentActivity,
|
|
})
|
|
case 0x53544f50: // STDERR_STOP_ACTIVITY
|
|
activity, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_STOP_ACTIVITY reading ActivityId: %w", err)
|
|
}
|
|
al.EndActivity(al.Activity(activity))
|
|
case 0x52534c54: // STDERR_RESULT
|
|
activity, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_RESULT reading ActivityId: %w", err)
|
|
}
|
|
resultTypU64, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_RESULT reading result: %w", err)
|
|
}
|
|
resultTyp := ResultType(resultTypU64)
|
|
fields, err := d.readFields()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("STDERR_RESULT fields: %w", err)
|
|
}
|
|
al.ActivityResult(al.Activity(activity), resultTyp, fields)
|
|
case 0x616c7473: // STDERR_LAST
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) hello() error {
|
|
_, err := d.w.WriteUint64(WorkerMagic1)
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing magic 1: %w", err)
|
|
}
|
|
|
|
magic2, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("reading magic 2: %w", err)
|
|
}
|
|
if magic2 != WorkerMagic2 {
|
|
d.err = err
|
|
return fmt.Errorf("magic 2 mismatch: got %x, wanted %x", magic2, WorkerMagic2)
|
|
}
|
|
|
|
daemonVersion, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("reading daemon version: %w", err)
|
|
}
|
|
majorVersion := int((daemonVersion >> 8) & 0xff)
|
|
minorVersion := int(daemonVersion & 0xff)
|
|
if majorVersion != 1 {
|
|
d.err = err
|
|
return fmt.Errorf("daemon major version mismatch: got %d, want 1", majorVersion)
|
|
}
|
|
if minorVersion < 21 {
|
|
d.err = err
|
|
return fmt.Errorf("daemon minor version too old: got %d, want at least 21", minorVersion)
|
|
}
|
|
d.peerMinorVersion = minorVersion
|
|
|
|
if _, err := d.w.WriteUint64(ProtocolVersion); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing protocol version: %w", err)
|
|
}
|
|
|
|
if _, err := d.w.WriteUint64(0); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing obsolete CPU affinity: %w", err)
|
|
}
|
|
if _, err := d.w.WriteUint64(0); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing obsolete reserveSpace: %w", err)
|
|
}
|
|
|
|
return d.processStderr(nil, nil, nil)
|
|
}
|
|
|
|
func (d *Daemon) IsValidPath(at *ActivityTracker, storePath string) (bool, error) {
|
|
al := at.StartAction(fmt.Sprintf("IsValidPath(%q)", storePath))
|
|
defer al.Close()
|
|
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopIsValidPath); err != nil {
|
|
d.err = err
|
|
return false, fmt.Errorf("writing worker op WopIsValidPath: %w", err)
|
|
}
|
|
if _, err := d.w.WriteString(storePath); err != nil {
|
|
d.err = err
|
|
return false, fmt.Errorf("writing store path %v: %w", storePath, err)
|
|
}
|
|
|
|
if err := d.processStderr(al, nil, nil); err != nil {
|
|
return false, fmt.Errorf("reading stderr from WopIsValidPath: %w", err)
|
|
}
|
|
|
|
validInt, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return false, fmt.Errorf("reading path validity: %w", err)
|
|
}
|
|
return validInt != 0, nil
|
|
|
|
}
|
|
|
|
func (d *Daemon) EnsurePath(at *ActivityTracker, storePath string) error {
|
|
al := at.StartAction(fmt.Sprintf("EnsurePath(%q)", storePath))
|
|
defer al.Close()
|
|
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopEnsurePath); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing worker op WopEnsurePath: %w", err)
|
|
}
|
|
if _, err := d.w.WriteString(storePath); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing store path %v: %w", storePath, err)
|
|
}
|
|
|
|
if err := d.processStderr(al, nil, nil); err != nil {
|
|
return fmt.Errorf("reading stderr from WopEnsurePath: %w", err)
|
|
}
|
|
|
|
validInt, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("reading path validity: %w", err)
|
|
}
|
|
if validInt != 1 {
|
|
return PathNotValidError{storePath}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type BuildMode uint64
|
|
|
|
const (
|
|
BMNormal BuildMode = iota
|
|
BMRepair
|
|
BMCheck
|
|
)
|
|
|
|
func (bm BuildMode) String() string {
|
|
return map[BuildMode]string{
|
|
BMNormal: "normal",
|
|
BMRepair: "repair",
|
|
BMCheck: "check",
|
|
}[bm]
|
|
}
|
|
|
|
type BuildResultStatus uint64
|
|
|
|
const (
|
|
BRSBuilt BuildResultStatus = iota
|
|
BRSSubstituted
|
|
BRSAlreadyValid
|
|
BRSPermanentFailure
|
|
BRSInputRejected
|
|
BRSOutputRejected
|
|
BRSTransientFailure
|
|
BRSTimedOut
|
|
BRSMiscFailure
|
|
BRSDependencyFailed
|
|
BRSLogLimitExceeded
|
|
BRSNotDeterministic
|
|
)
|
|
|
|
func (brs BuildResultStatus) String() string {
|
|
return map[BuildResultStatus]string{
|
|
BRSBuilt: "built",
|
|
BRSSubstituted: "substituted",
|
|
BRSAlreadyValid: "already valid",
|
|
BRSPermanentFailure: "permanent failure",
|
|
BRSInputRejected: "input rejected",
|
|
BRSOutputRejected: "output rejected",
|
|
BRSTransientFailure: "transient failure",
|
|
BRSTimedOut: "timed out",
|
|
BRSMiscFailure: "misc failure",
|
|
BRSDependencyFailed: "dependency failed",
|
|
BRSLogLimitExceeded: "log limit exceeded",
|
|
BRSNotDeterministic: "not deterministic",
|
|
}[brs]
|
|
}
|
|
|
|
func (brs BuildResultStatus) IsSuccess() bool {
|
|
switch brs {
|
|
case BRSBuilt, BRSSubstituted, BRSAlreadyValid:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) BuildDerivation(at *ActivityTracker, derivationPath string, derivation *nixdrv.BasicDerivation, buildMode BuildMode) (BuildResultStatus, error) {
|
|
al := at.StartAction(fmt.Sprintf("BuildDerivation(%q, %q)", derivationPath, buildMode))
|
|
defer al.Close()
|
|
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopBuildDerivation); err != nil {
|
|
d.err = err
|
|
return BRSMiscFailure, fmt.Errorf("writing worker op WopBuildDerivation: %w", err)
|
|
}
|
|
if _, err := d.w.WriteString(derivationPath); err != nil {
|
|
d.err = err
|
|
return BRSMiscFailure, fmt.Errorf("writing derivation store path %v: %w", derivationPath, err)
|
|
}
|
|
if _, err := d.w.WriteDerivation(derivation); err != nil {
|
|
d.err = err
|
|
return BRSMiscFailure, fmt.Errorf("writing derivation content of %v: %w", derivationPath, err)
|
|
}
|
|
if _, err := d.w.WriteUint64(uint64(buildMode)); err != nil {
|
|
d.err = err
|
|
return BRSMiscFailure, fmt.Errorf("writing build mode %v: %w", buildMode, err)
|
|
}
|
|
|
|
if err := d.processStderr(al, nil, nil); err != nil {
|
|
return BRSMiscFailure, fmt.Errorf("reading stderr from WopBuildDerivation: %w", err)
|
|
}
|
|
|
|
buildStatusU64, err := d.r.ReadUint64()
|
|
if err != nil {
|
|
d.err = err
|
|
return BRSMiscFailure, fmt.Errorf("reading build status code: %w", err)
|
|
}
|
|
buildStatus := BuildResultStatus(buildStatusU64)
|
|
buildErrorMsg, err := d.r.ReadString()
|
|
if err != nil {
|
|
d.err = err
|
|
return BRSMiscFailure, fmt.Errorf("reading build error message: %w", err)
|
|
}
|
|
|
|
if !buildStatus.IsSuccess() {
|
|
return buildStatus, fmt.Errorf("%s: %s", buildStatus, buildErrorMsg)
|
|
}
|
|
|
|
return buildStatus, nil
|
|
}
|
|
|
|
func (d *Daemon) writeNarInfo(ni *narinfo.NarInfo, w *nixwire.Serializer) error {
|
|
|
|
storeRoot := path.Dir(ni.StorePath)
|
|
|
|
if _, err := w.WriteString(ni.StorePath); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing store path: %w", err)
|
|
}
|
|
var deriverPath string
|
|
if ni.Deriver != "" {
|
|
deriverPath = path.Join(storeRoot, ni.Deriver)
|
|
}
|
|
if _, err := w.WriteString(deriverPath); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing deriver: %w", err)
|
|
}
|
|
if _, err := w.WriteString(hex.EncodeToString(ni.NarHash.Hash)); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing NAR hash: %w", err)
|
|
}
|
|
rootedRefs := make([]string, len(ni.References))
|
|
for n, ref := range ni.References {
|
|
rootedRefs[n] = path.Join(storeRoot, ref)
|
|
}
|
|
if _, err := w.WriteStrings(rootedRefs); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing references: %w", err)
|
|
}
|
|
if _, err := w.WriteUint64(0); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing dummy registration time: %w", err)
|
|
}
|
|
if _, err := w.WriteUint64(ni.NarSize); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing NAR size: %w", err)
|
|
}
|
|
if _, err := w.WriteUint64(1); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing ultimate (true): %w", err)
|
|
}
|
|
if _, err := w.WriteStrings(ni.Sigs()); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing signatures: %w", err)
|
|
}
|
|
if _, err := w.WriteString(ni.CA); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing content-addressable: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type bufferingReader struct {
|
|
r io.Reader
|
|
}
|
|
|
|
func (r *bufferingReader) Read(p []byte) (int, error) {
|
|
pos := 0
|
|
for {
|
|
n, err := r.r.Read(p[pos:])
|
|
pos += n
|
|
if err != nil {
|
|
if pos > 0 && errors.Is(err, io.EOF) {
|
|
return pos, nil
|
|
}
|
|
return pos, err
|
|
} else if pos >= len(p) {
|
|
break
|
|
}
|
|
}
|
|
return pos, nil
|
|
}
|
|
|
|
func (d *Daemon) AddToStoreNar(ni *narinfo.NarInfo, r io.Reader) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
var preBuffer []byte
|
|
|
|
if d.peerMinorVersion >= 32 {
|
|
if _, err := d.writeOp(WopAddMultipleToStore); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing worker op WopAddMultipleToStore: %w", err)
|
|
}
|
|
if _, err := d.w.WriteUint64(0); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing repair (false): %w", err)
|
|
}
|
|
if _, err := d.w.WriteUint64(1); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing dont-check-sigs (true): %w", err)
|
|
}
|
|
buf := new(bytes.Buffer)
|
|
bufs := &nixwire.Serializer{Writer: buf}
|
|
if _, err := bufs.WriteUint64(1); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing count (1): %w", err)
|
|
}
|
|
if err := d.writeNarInfo(ni, bufs); err != nil {
|
|
return fmt.Errorf("buffering NAR info: %w", err)
|
|
}
|
|
preBuffer = buf.Bytes()
|
|
} else {
|
|
if _, err := d.writeOp(WopAddToStoreNar); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing worker op WopAddToStoreNar: %w", err)
|
|
}
|
|
if err := d.writeNarInfo(ni, d.w); err != nil {
|
|
return fmt.Errorf("writing NAR info: %w", err)
|
|
}
|
|
if _, err := d.w.WriteUint64(0); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing repair (false): %w", err)
|
|
}
|
|
if _, err := d.w.WriteUint64(1); err != nil {
|
|
d.err = err
|
|
return fmt.Errorf("writing dont-check-sigs (true): %w", err)
|
|
}
|
|
}
|
|
|
|
if d.peerMinorVersion >= 23 {
|
|
// FramedSink
|
|
errCh := make(chan error)
|
|
mr := &bufferingReader{io.MultiReader(bytes.NewReader(preBuffer), r)}
|
|
w := &nixwire.Serializer{Writer: &hexDumpingWriter{enabled: false, w: d.w.Writer}}
|
|
go func() {
|
|
defer close(errCh)
|
|
buf := make([]byte, MaxBuf)
|
|
sofar := 0
|
|
for {
|
|
n, err := mr.Read(buf)
|
|
sofar += n
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
} else if err != nil {
|
|
err = fmt.Errorf("reading: %w", err)
|
|
d.err = err
|
|
errCh <- err
|
|
return
|
|
}
|
|
if _, err := w.WriteBytes(buf[:n]); err != nil {
|
|
err = fmt.Errorf("writing payload: %w", err)
|
|
d.err = err
|
|
errCh <- err
|
|
return
|
|
}
|
|
}
|
|
if _, err := w.WriteUint64(0); err != nil {
|
|
err = fmt.Errorf("sending framed EOF: %w", err)
|
|
d.err = err
|
|
errCh <- err
|
|
return
|
|
}
|
|
}()
|
|
if err := d.processStderr(nil, nil, nil); err != nil {
|
|
return fmt.Errorf("reading (framed) stderr from WopAddToStoreNar: %w", err)
|
|
}
|
|
return <-errCh
|
|
} else {
|
|
if err := d.processStderr(nil, nil, &bufferingReader{r}); err != nil {
|
|
return fmt.Errorf("reading stderr from WopAddToStoreNar: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Daemon) QueryValidDerivers(path string) ([]string, error) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, err := d.writeOp(WopQueryValidDerivers); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing worker op WopQueryValidDerivers: %w", err)
|
|
}
|
|
if _, err := d.w.WriteString(path); err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("writing store path: %w", err)
|
|
}
|
|
|
|
if err := d.processStderr(nil, nil, nil); err != nil {
|
|
return nil, fmt.Errorf("reading stderr from WopQueryValidDerivers: %w", err)
|
|
}
|
|
|
|
drvPaths, err := d.r.ReadStrings()
|
|
if err != nil {
|
|
d.err = err
|
|
return nil, fmt.Errorf("reading deriver paths: %w", err)
|
|
}
|
|
return drvPaths, nil
|
|
}
|
|
|
|
func OpenDaemon(path string) (*Daemon, error) {
|
|
conn, err := net.Dial("unix", path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dialing %v: %w", path, err)
|
|
}
|
|
|
|
return OpenDaemonWithIOs("unix://"+path, conn, conn, conn)
|
|
}
|
|
|
|
type hexDumpingWriter struct {
|
|
w io.Writer
|
|
enabled bool
|
|
}
|
|
|
|
func (w *hexDumpingWriter) Write(p []byte) (int, error) {
|
|
n, err := w.w.Write(p)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if w.enabled {
|
|
log.Printf(">>\n>> %s", strings.TrimSuffix(strings.ReplaceAll(hex.Dump(p), "\n", "\n>> "), "\n>> "))
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func OpenDaemonWithIOs(name string, r io.Reader, w io.Writer, c io.Closer) (*Daemon, error) {
|
|
d := &Daemon{
|
|
name: name,
|
|
conn: c,
|
|
w: &nixwire.Serializer{Writer: &hexDumpingWriter{w: w}},
|
|
r: &nixwire.Deserializer{Reader: r},
|
|
}
|
|
if err := d.hello(); err != nil {
|
|
d.Close()
|
|
return nil, fmt.Errorf("sending hello: %w", err)
|
|
}
|
|
|
|
return d, nil
|
|
}
|