depot/go/nix/bnix/bnix.go

391 lines
9.2 KiB
Go

package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"sync"
"git.lukegb.com/lukegb/depot/go/nix/nar/narinfo"
"git.lukegb.com/lukegb/depot/go/nix/nixdrv"
"git.lukegb.com/lukegb/depot/go/nix/nixpool"
"git.lukegb.com/lukegb/depot/go/nix/nixstore"
)
var (
remoteFlag = flag.String("remote", "unix://", "Remote.")
remote2Flag = flag.String("remote2", "unix://", "Remote. But the second one. The destination, for copy.")
maxConnsFlag = flag.Int("max_conns", 10, "Max connections.")
verboseFlag = flag.Bool("verbose", false, "Verbose.")
)
func ensure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error {
return d.EnsurePath(at, path)
}
func narInfo(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error {
ni, err := d.NARInfo(path)
if err != nil {
return err
}
fmt.Printf("%s\n", ni)
return nil
}
func tryEnsure(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) (bool, error) {
err := d.EnsurePath(at, path)
if err == nil {
return true, nil
}
var ne nixstore.NixError
if errors.As(err, &ne) {
if ne.Code == 1 {
return false, nil
}
}
return false, err
}
type workState int
const (
StateCheckingAlreadyExists workState = iota
StateFetchingPathInfo
StateEnqueueingReferences
StateAwaitingReferences
StateCopying
StateDone
StateFailed
)
func (s workState) String() string {
return map[workState]string{
StateCheckingAlreadyExists: "checking already exists",
StateFetchingPathInfo: "fetching path info",
StateEnqueueingReferences: "enqueueing references",
StateAwaitingReferences: "awaiting references",
StateCopying: "copying",
StateDone: "done!",
StateFailed: "failed :(",
}[s]
}
func (s workState) Terminal() bool { return s == StateDone || s == StateFailed }
type workItem struct {
doneCh chan struct{}
state workState
path string
err error
narInfo *narinfo.NarInfo
childWork []*workItem
srcPool *nixpool.Pool
srcAT *nixstore.ActivityTracker
dstPool *nixpool.Pool
dstAT *nixstore.ActivityTracker
wh *workHolder
}
func (i *workItem) checkAlreadyExists(ctx context.Context) (workState, error) {
// i.state == StateCheckingAlreadyExists
dst, err := i.dstPool.Get()
if err != nil {
return StateFailed, err
}
defer i.dstPool.Put(dst)
ok, err := tryEnsure(ctx, dst, i.dstAT, i.path)
if err != nil {
return StateFailed, err
}
if ok {
return StateDone, nil
}
return StateFetchingPathInfo, nil
}
func (i *workItem) fetchPathInfo(ctx context.Context) (workState, error) {
// i.state == StateFetchingPathInfo
src, err := i.srcPool.Get()
if err != nil {
return StateFailed, err
}
defer i.srcPool.Put(src)
ni, err := src.NARInfo(i.path)
if err != nil {
return StateFailed, err
}
i.narInfo = ni
return StateEnqueueingReferences, nil
}
func (i *workItem) enqueueReferences(ctx context.Context) (workState, error) {
// i.state == StateEnqueueingReferences
const nixStorePrefix = "/nix/store/"
var childWork []*workItem
for _, ref := range i.narInfo.References {
refPath := nixStorePrefix + ref
if i.path == refPath {
continue
}
childWork = append(childWork, i.wh.addWork(ctx, refPath))
}
i.childWork = childWork
if len(i.childWork) == 0 {
return StateCopying, nil
}
return StateAwaitingReferences, nil
}
func (i *workItem) awaitReferences(ctx context.Context) (workState, error) {
// i.state == StateAwaitingReferences
for _, ci := range i.childWork {
select {
case <-ctx.Done():
return StateFailed, ctx.Err()
case <-ci.doneCh:
if ci.state == StateFailed {
return StateFailed, fmt.Errorf("reference %v failed", ci.path)
}
if !ci.state.Terminal() {
panic(fmt.Sprintf("%v: reference %v declared done in non-terminal state %v", i.path, ci.path, ci.state))
}
}
}
return StateCopying, nil
}
func (i *workItem) performCopy(ctx context.Context) (workState, error) {
// i.state == StateCopying
src, err := i.srcPool.Get()
if err != nil {
return StateFailed, err
}
defer i.srcPool.Put(src)
dst, err := i.dstPool.Get()
if err != nil {
return StateFailed, err
}
defer i.dstPool.Put(dst)
// TODO: the rest of the owl
return StateDone, nil
}
func (i *workItem) run(ctx context.Context) error {
defer close(i.doneCh)
i.state = StateCheckingAlreadyExists
for !i.state.Terminal() {
if ctx.Err() != nil {
return ctx.Err()
}
log.Printf("%s: running worker for %s", i.path, i.state)
var nextState workState
var err error
switch i.state {
case StateCheckingAlreadyExists:
nextState, err = i.checkAlreadyExists(ctx)
case StateFetchingPathInfo:
nextState, err = i.fetchPathInfo(ctx)
case StateEnqueueingReferences:
nextState, err = i.enqueueReferences(ctx)
case StateAwaitingReferences:
nextState, err = i.awaitReferences(ctx)
case StateCopying:
nextState, err = i.performCopy(ctx)
default:
log.Printf("%s: ended up in unimplemented state %s", i.path, i.state)
<-make(chan struct{})
}
i.state = nextState
if err != nil {
log.Printf("%s: transitioning to %s: %s", i.path, nextState, err)
i.err = err
return fmt.Errorf("%s: %w", i.state, err)
}
log.Printf("%s: transitioning to %s", i.path, nextState)
}
return nil
}
type workHolder struct {
mu sync.Mutex
work map[string]*workItem
srcPool *nixpool.Pool
srcAT *nixstore.ActivityTracker
dstPool *nixpool.Pool
dstAT *nixstore.ActivityTracker
}
func (wh *workHolder) addWork(ctx context.Context, path string) *workItem {
wh.mu.Lock()
defer wh.mu.Unlock()
if i := wh.work[path]; i != nil {
return i
}
i := &workItem{
doneCh: make(chan struct{}),
path: path,
srcPool: wh.srcPool,
srcAT: wh.srcAT,
dstPool: wh.dstPool,
dstAT: wh.dstAT,
wh: wh,
}
wh.work[path] = i
go i.run(ctx)
return i
}
func copyPath(ctx context.Context, poolFrom *nixpool.Pool, atFrom *nixstore.ActivityTracker, poolTo *nixpool.Pool, atTo *nixstore.ActivityTracker, path string) error {
wh := workHolder{
work: make(map[string]*workItem),
srcPool: poolFrom,
srcAT: atFrom,
dstPool: poolTo,
dstAT: atTo,
}
i := wh.addWork(ctx, path)
<-i.doneCh
return i.err
}
var rs nixdrv.Resolver
func loadDerivation(ctx context.Context, path string) (*nixdrv.Derivation, error) {
return rs.LoadDerivation(ctx, path)
}
func buildDerivation(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker, path string) error {
drv, err := loadDerivation(ctx, path)
if err != nil {
return fmt.Errorf("loading derivation: %w", err)
}
basicDrv, err := drv.ToBasicDerivation(ctx, rs)
if err != nil {
return fmt.Errorf("resolving: %w", err)
}
brs, err := d.BuildDerivation(at, path, basicDrv, nixstore.BMNormal)
if err != nil {
return err
}
log.Printf("build result: %v", brs)
return nil
}
func main() {
flag.Parse()
nixstore.VerboseActivities = *verboseFlag
badCall := func(f string, xs ...interface{}) {
fmt.Fprintf(os.Stderr, f+"\n", xs...)
flag.Usage()
os.Exit(1)
}
if flag.NArg() < 1 {
badCall("need a subcommand")
}
ctx := context.Background()
var cmd func(context.Context, *nixstore.Daemon, *nixstore.ActivityTracker) error
switch flag.Arg(0) {
case "ensure":
if flag.NArg() != 2 {
badCall("`ensure` needs a store path")
}
cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error {
return ensure(ctx, d, at, flag.Arg(1))
}
case "show-derivation":
if flag.NArg() != 2 {
badCall("`show-derivation` needs a derivation")
}
cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error {
drv, err := loadDerivation(ctx, flag.Arg(1))
if err != nil {
return err
}
fmt.Printf("%#v\n", drv)
return nil
}
case "build-derivation":
if flag.NArg() != 2 {
badCall("`build-derivation` needs a derivation")
}
cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error {
return buildDerivation(ctx, d, at, flag.Arg(1))
}
case "nar-info":
if flag.NArg() != 2 {
badCall("`nar-info` needs a store path")
}
cmd = func(ctx context.Context, d *nixstore.Daemon, at *nixstore.ActivityTracker) error {
return narInfo(ctx, d, at, flag.Arg(1))
}
case "copy":
if flag.NArg() != 2 {
badCall("`copy` needs a store path")
}
factorySrc, err := nixpool.DaemonDialer(ctx, *remoteFlag)
if err != nil {
log.Fatalf("creating dialer for src: %v", err)
}
poolSrc := nixpool.New(ctx, factorySrc, *maxConnsFlag)
atSrc := nixstore.NewActivityTracker()
factoryDst, err := nixpool.DaemonDialer(ctx, *remote2Flag)
if err != nil {
log.Fatalf("creating dialer for dst: %v", err)
}
poolDst := nixpool.New(ctx, factoryDst, *maxConnsFlag)
atDst := nixstore.NewActivityTracker()
if err := copyPath(ctx, poolSrc, atSrc, poolDst, atDst, flag.Arg(1)); err != nil {
log.Fatalf("copy: %v", err)
}
return
default:
badCall("bad subcommand %s", flag.Arg(0))
}
at := nixstore.NewActivityTracker()
factory, err := nixpool.DaemonDialer(ctx, *remoteFlag)
if err != nil {
log.Fatalf("DaemonDialer(%q): %v", *remoteFlag, err)
}
d, err := factory()
if err != nil {
log.Fatalf("connecting to %q: %v", *remoteFlag, err)
}
defer d.Close()
rs = nixdrv.LocalFSResolver{}
if err := cmd(ctx, d, at); err != nil {
log.Fatalf("%s: %s", flag.Arg(0), err)
}
}