go/nix: unfinished 'copy' implementation

This commit is contained in:
Luke Granger-Brown 2023-08-19 21:14:04 +00:00
parent ddfb67cdd1
commit 6addc90a6e
3 changed files with 394 additions and 4 deletions

View file

@ -2,26 +2,356 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"math/rand"
"net/url"
"os"
"sync"
"time"
"golang.org/x/crypto/ssh"
"hg.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"hg.lukegb.com/lukegb/depot/go/nix/nixdrv"
"hg.lukegb.com/lukegb/depot/go/nix/nixstore"
)
var (
remoteFlag = flag.String("remote", "unix://", "Remote.")
remote2Flag = flag.String("remote2", "unix://", "Remote. But the second one. The destination, for copy.")
maxConnsFlag = flag.Int("max_conns", 10, "Max connections.")
verboseFlag = flag.Bool("verbose", false, "Verbose.")
)
func ensure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error {
return d.EnsurePath(at, path)
}
func narInfo(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error {
ni, err := d.NARInfo(path)
if err != nil {
return err
}
fmt.Printf("%s\n", ni)
return nil
}
func tryEnsure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) (bool, error) {
err := d.EnsurePath(at, path)
if err == nil {
return true, nil
}
var ne nixstore.NixError
if errors.As(err, &ne) {
if ne.Code == 1 {
return false, nil
}
}
return false, err
}
type connectionPool struct {
factory func() (*nixstore.Daemon, error)
cond *sync.Cond
limit int
generated int
available []*nixstore.Daemon
}
func newConnectionPool(ctx context.Context, dst string, limit int) *connectionPool {
return &connectionPool{
factory: func() (*nixstore.Daemon, error) {
var d *nixstore.Daemon
var err error
retryInterval := 10 * time.Millisecond
maxRetry := 10 * time.Second
for n := 0; n < 20; n++ {
d, _, err = connectTo(ctx, dst)
if err == nil {
break
}
time.Sleep(retryInterval + time.Duration((rand.Float64()-0.5)*float64(retryInterval)))
retryInterval = retryInterval * 2
if retryInterval > maxRetry {
retryInterval = maxRetry
}
}
return d, err
},
cond: sync.NewCond(new(sync.Mutex)),
limit: limit,
}
}
func (p *connectionPool) getFromAvailableLocked() *nixstore.Daemon {
end := len(p.available) - 1
d := p.available[end]
p.available = p.available[:end]
return d
}
func (p *connectionPool) Get() (*nixstore.Daemon, error) {
p.cond.L.Lock()
if len(p.available) > 0 {
d := p.getFromAvailableLocked()
p.cond.L.Unlock()
return d, nil
}
if p.generated < p.limit {
p.generated++
p.cond.L.Unlock()
d, err := p.factory()
if err != nil {
p.cond.L.Lock()
p.generated--
p.cond.L.Unlock()
return nil, err
}
return d, nil
}
for len(p.available) == 0 {
p.cond.Wait()
}
d := p.getFromAvailableLocked()
p.cond.L.Unlock()
return d, nil
}
func (p *connectionPool) Put(d *nixstore.Daemon) {
p.cond.L.Lock()
p.available = append(p.available, d)
p.cond.Signal()
p.cond.L.Unlock()
}
type workState int
const (
StateCheckingAlreadyExists workState = iota
StateFetchingPathInfo
StateEnqueueingReferences
StateAwaitingReferences
StateCopying
StateDone
StateFailed
)
func (s workState) String() string {
return map[workState]string{
StateCheckingAlreadyExists: "checking already exists",
StateFetchingPathInfo: "fetching path info",
StateEnqueueingReferences: "enqueueing references",
StateAwaitingReferences: "awaiting references",
StateCopying: "copying",
StateDone: "done!",
StateFailed: "failed :(",
}[s]
}
func (s workState) Terminal() bool { return s == StateDone || s == StateFailed }
type workItem struct {
doneCh chan struct{}
state workState
path string
err error
narInfo *narinfo.NarInfo
childWork []*workItem
srcPool *connectionPool
srcAT *nixstore.ActivityTracker
dstPool *connectionPool
dstAT *nixstore.ActivityTracker
wh *workHolder
}
func (i *workItem) checkAlreadyExists(ctx context.Context) (workState, error) {
// i.state == StateCheckingAlreadyExists
dst, err := i.dstPool.Get()
if err != nil {
return StateFailed, err
}
defer i.dstPool.Put(dst)
ok, err := tryEnsure(ctx, dst, i.dstAT, i.path)
if err != nil {
return StateFailed, err
}
if ok {
return StateDone, nil
}
return StateFetchingPathInfo, nil
}
func (i *workItem) fetchPathInfo(ctx context.Context) (workState, error) {
// i.state == StateFetchingPathInfo
src, err := i.srcPool.Get()
if err != nil {
return StateFailed, err
}
defer i.srcPool.Put(src)
ni, err := src.NARInfo(i.path)
if err != nil {
return StateFailed, err
}
i.narInfo = ni
return StateEnqueueingReferences, nil
}
func (i *workItem) enqueueReferences(ctx context.Context) (workState, error) {
// i.state == StateEnqueueingReferences
const nixStorePrefix = "/nix/store/"
var childWork []*workItem
for _, ref := range i.narInfo.References {
refPath := nixStorePrefix + ref
if i.path == refPath {
continue
}
childWork = append(childWork, i.wh.addWork(ctx, refPath))
}
i.childWork = childWork
if len(i.childWork) == 0 {
return StateCopying, nil
}
return StateAwaitingReferences, nil
}
func (i *workItem) awaitReferences(ctx context.Context) (workState, error) {
// i.state == StateAwaitingReferences
for _, ci := range i.childWork {
select {
case <-ctx.Done():
return StateFailed, ctx.Err()
case <-ci.doneCh:
if ci.state == StateFailed {
return StateFailed, fmt.Errorf("reference %v failed", ci.path)
}
if !ci.state.Terminal() {
panic(fmt.Sprintf("%v: reference %v declared done in non-terminal state %v", i.path, ci.path, ci.state))
}
}
}
return StateCopying, nil
}
func (i *workItem) performCopy(ctx context.Context) (workState, error) {
// i.state == StateCopying
src, err := i.srcPool.Get()
if err != nil {
return StateFailed, err
}
defer i.srcPool.Put(src)
dst, err := i.dstPool.Get()
if err != nil {
return StateFailed, err
}
defer i.dstPool.Put(dst)
// TODO: the rest of the owl
return StateDone, nil
}
func (i *workItem) run(ctx context.Context) error {
defer close(i.doneCh)
i.state = StateCheckingAlreadyExists
for !i.state.Terminal() {
if ctx.Err() != nil {
return ctx.Err()
}
log.Printf("%s: running worker for %s", i.path, i.state)
var nextState workState
var err error
switch i.state {
case StateCheckingAlreadyExists:
nextState, err = i.checkAlreadyExists(ctx)
case StateFetchingPathInfo:
nextState, err = i.fetchPathInfo(ctx)
case StateEnqueueingReferences:
nextState, err = i.enqueueReferences(ctx)
case StateAwaitingReferences:
nextState, err = i.awaitReferences(ctx)
case StateCopying:
nextState, err = i.performCopy(ctx)
default:
log.Printf("%s: ended up in unimplemented state %s", i.path, i.state)
<-make(chan struct{})
}
i.state = nextState
if err != nil {
log.Printf("%s: transitioning to %s: %s", i.path, nextState, err)
i.err = err
return fmt.Errorf("%s: %w", i.state, err)
}
log.Printf("%s: transitioning to %s", i.path, nextState)
}
return nil
}
type workHolder struct {
mu sync.Mutex
work map[string]*workItem
srcPool *connectionPool
srcAT *nixstore.ActivityTracker
dstPool *connectionPool
dstAT *nixstore.ActivityTracker
}
func (wh *workHolder) addWork(ctx context.Context, path string) *workItem {
wh.mu.Lock()
defer wh.mu.Unlock()
if i := wh.work[path]; i != nil {
return i
}
i := &workItem{
doneCh: make(chan struct{}),
path: path,
srcPool: wh.srcPool,
srcAT: wh.srcAT,
dstPool: wh.dstPool,
dstAT: wh.dstAT,
wh: wh,
}
wh.work[path] = i
go i.run(ctx)
return i
}
func copyPath(ctx context.Context, poolFrom *connectionPool, atFrom *nixstore.ActivityTracker, poolTo *connectionPool, atTo *nixstore.ActivityTracker, path string) error {
wh := workHolder{
work: make(map[string]*workItem),
srcPool: poolFrom,
srcAT: atFrom,
dstPool: poolTo,
dstAT: atTo,
}
i := wh.addWork(ctx, path)
<-i.doneCh
return i.err
}
var rs nixdrv.Resolver
func loadDerivation(ctx context.Context, path string) (*nixdrv.Derivation, error) {
@ -159,6 +489,8 @@ func connectTo(ctx context.Context, remote string) (*nixstore.Daemon, nixdrv.Res
func main() {
flag.Parse()
nixstore.VerboseActivities = *verboseFlag
badCall := func(f string, xs ...interface{}) {
fmt.Fprintf(os.Stderr, f+"\n", xs...)
flag.Usage()
@ -169,6 +501,7 @@ func main() {
badCall("need a subcommand")
}
ctx := context.Background()
var cmd func(context.Context, *nixstore.Daemon, *nixstore.ActivityTracker) error
switch flag.Arg(0) {
case "ensure":
@ -197,13 +530,32 @@ func main() {
cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error {
return buildDerivation(ctx, d, at, flag.Arg(1))
}
case "nar-info":
if flag.NArg() != 2 {
badCall("`nar-info` needs a store path")
}
cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error {
return narInfo(ctx, d, at, flag.Arg(1))
}
case "copy":
if flag.NArg() != 2 {
badCall("`copy` needs a store path")
}
poolSrc := newConnectionPool(ctx, *remoteFlag, *maxConnsFlag)
atSrc := nixstore.NewActivityTracker()
poolDst := newConnectionPool(ctx, *remote2Flag, *maxConnsFlag)
atDst := nixstore.NewActivityTracker()
if err := copyPath(ctx, poolSrc, atSrc, poolDst, atDst, flag.Arg(1)); err != nil {
log.Fatalf("copy: %v", err)
}
return
default:
badCall("bad subcommand %s", flag.Arg(0))
}
at := nixstore.NewActivityTracker()
ctx := context.Background()
d, rss, err := connectTo(ctx, *remoteFlag)
if err != nil {
log.Fatalf("connectTo(%q): %v", *remoteFlag, err)

View file

@ -10,6 +10,8 @@ const (
progressLogRateLimit = 1 * time.Second
)
var VerboseActivities = false
type ActivityType uint64
const (
@ -194,6 +196,9 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) {
} else {
llt = LogLinePostBuild
}
if VerboseActivities {
log.Printf("%s:%d> %s", am.ActivityID, fields[0].(string))
}
am.logs = append(am.logs, &ActivityLog{
Timestamp: now,
LogLineType: llt,
@ -206,6 +211,10 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) {
NewPhase: phase,
})
am.phase = phase
if VerboseActivities {
log.Printf("%d [NEW PHASE] %v", am.ActivityID, phase)
}
case ResProgress:
am.progress = fields
@ -231,6 +240,9 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) {
am.lastProgressLog.Progress = fields
if time.Since(am.lastProgressLogCreated) > progressLogRateLimit {
// If our last progress log was more than rate limit ago, bump the log entry forwards and mark that we should create a new one.
if VerboseActivities {
log.Printf("%d [PROGRESS] %v", am.ActivityID, fields)
}
am.lastProgressLog = nil
}
}
@ -240,6 +252,9 @@ func (am *ActivityMeta) RecordResult(result ResultType, fields []any) {
Timestamp: now,
Expected: fields,
})
if VerboseActivities {
log.Printf("%d [EXPECTED] %v", am.ActivityID, fields)
}
}
am.mu.Unlock()
}
@ -401,6 +416,10 @@ func (al *ActivityLogger) StartActivity(at ActivityType, am ActivityMeta) Activi
al.activities[am.ActivityID] = a
al.mu.Unlock()
if VerboseActivities {
log.Printf("%d [START] %s", am.ActivityID, at)
}
return a
}
@ -429,6 +448,10 @@ func (al *ActivityLogger) EndActivity(a Activity) {
return
}
if VerboseActivities {
log.Printf("%d [END] %s", a.Meta().ActivityID, a.ActivityType())
}
al.mu.Lock()
al.logs = append(al.logs, ActivityMetaLog{
Timestamp: time.Now(),

View file

@ -36,6 +36,12 @@ type Daemon struct {
err error
}
type PathNotValidError struct{ Path string }
func (err PathNotValidError) Error() string {
return fmt.Sprintf("path %q is not valid", err.Path)
}
func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
d.mu.Lock()
defer d.mu.Unlock()
@ -57,7 +63,7 @@ func (d *Daemon) NARInfo(storePath string) (*narinfo.NarInfo, error) {
}
valid := validInt == uint64(1)
if !valid {
return nil, fmt.Errorf("path %q is not valid", storePath)
return nil, PathNotValidError{storePath}
}
ni := &narinfo.NarInfo{
@ -156,6 +162,15 @@ func (d *Daemon) readFields() ([]any, error) {
return fields, nil
}
type NixError struct {
Code uint64
Message string
}
func (ne NixError) Error() string {
return fmt.Sprintf("nix error %d: %v", ne.Code, ne.Message)
}
func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Reader) error {
for {
msg, err := d.r.ReadUint64()
@ -205,7 +220,7 @@ func (d *Daemon) processStderr(al *ActivityLogger, stdout io.Writer, stdin io.Re
return fmt.Errorf("STDERR_ERROR reading uint64: %w", err)
}
al.AddError(status, errStr)
return fmt.Errorf("error code %d: %v", status, errStr)
return NixError{status, errStr}
case 0x6f6c6d67: // STDERR_NEXT
msg, err := d.r.ReadString()
if err != nil {