depot/go/nix/nixstore/activities.go

461 lines
11 KiB
Go

package nixstore
import (
"log"
"sync"
"time"
)
const (
progressLogRateLimit = 1 * time.Second
)
var VerboseActivities = false
type ActivityType uint64
const (
ActUnknown ActivityType = 0
ActCopyPath ActivityType = 99 + iota
ActDownload
ActRealise
ActCopyPaths
ActBuilds
ActBuild
ActOptimiseStore
ActVerifyPaths
ActSubstitute
ActQueryPathInfo
ActPostBuildHook
)
func (at ActivityType) String() string {
return map[ActivityType]string{
ActUnknown: "unknown",
ActCopyPath: "copying path",
ActDownload: "downloading path",
ActRealise: "realising path",
ActCopyPaths: "copying paths",
ActBuilds: "running builds",
ActBuild: "building",
ActOptimiseStore: "optimising store",
ActVerifyPaths: "verifying paths",
ActSubstitute: "substituting path", // paths?
ActQueryPathInfo: "querying path info",
ActPostBuildHook: "running post-build hook",
}[at]
}
type ResultType uint64
const (
ResFileLinked ResultType = 100 + iota
ResBuildLogLine
ResUntrustedPath
ResCorruptedPath
ResSetPhase
ResProgress
ResSetExpected
ResPostBuildLogLine
)
func (rt ResultType) String() string {
return map[ResultType]string{
ResFileLinked: "linked file", // [size: int, blocks: int]
ResBuildLogLine: "log line from build", // [log line: str]
ResUntrustedPath: "untrusted path", // [path: str]
ResCorruptedPath: "corrupted path", // [path: str]
ResSetPhase: "entering phase", // [phase: str]
ResProgress: "progress update", // [done: int, expected: int, running: int, failed: int]
ResSetExpected: "update expectation", // [activityType: int, expected: int]
ResPostBuildLogLine: "log line from post-build", // [log line: str]
}[rt]
}
type ActivityTracker struct {
actions []*ActivityLogger
mu sync.Mutex
}
func NewActivityTracker() *ActivityTracker {
return &ActivityTracker{}
}
type ActivityLogger struct {
// Static for lifetime of logger.
action string
at *ActivityTracker
// Guarded by mu.
activities map[uint64]Activity
logs []ActivityMetaLog
mu sync.Mutex
}
func (al *ActivityLogger) AddLog(message string) {
if al == nil {
log.Println(message)
return
}
al.mu.Lock()
al.logs = append(al.logs, ActivityMetaLog{
Timestamp: time.Now(),
LogLine: message,
})
al.mu.Unlock()
}
func (al *ActivityLogger) AddError(errorCode uint64, message string) {
if al == nil {
log.Printf("error %d: %v", errorCode, message)
return
}
al.mu.Lock()
al.logs = append(al.logs, ActivityMetaLog{
Timestamp: time.Now(),
ErrorCode: errorCode,
ErrorMessage: message,
})
al.mu.Unlock()
}
type Activity interface {
ActivityType() ActivityType
Meta() *ActivityMeta
}
type ActivityMetaLog struct {
Timestamp time.Time
// One of:
// Line
LogLine string
// Error
ErrorCode uint64
ErrorMessage string
// Started
ActivityStarted Activity
// Finished
ActivityFinished Activity
}
type LogLineType int
const (
LogLineBuild LogLineType = iota
LogLinePostBuild
)
type ActivityLog struct {
Timestamp time.Time
// One of:
LogLineType LogLineType
LogLine string
NewPhase string
Progress []any
Expected []any
}
type ActivityMeta struct {
ActivityID uint64
Level uint64
String string
Fields []any
ParentActivityID uint64
mu sync.Mutex
logs []*ActivityLog
phase string
progress []any
firstProgressLog *ActivityLog
lastProgressLogCreated time.Time
lastProgressLog *ActivityLog
expected []any
}
func (am *ActivityMeta) RecordResult(result ResultType, fields []any) {
if am == nil {
return
}
now := time.Now()
am.mu.Lock()
switch result {
case ResBuildLogLine, ResPostBuildLogLine:
var llt LogLineType
if result == ResBuildLogLine {
llt = LogLineBuild
} else {
llt = LogLinePostBuild
}
if VerboseActivities {
log.Printf("%s:%d> %s", am.String, am.ActivityID, fields[0].(string))
}
am.logs = append(am.logs, &ActivityLog{
Timestamp: now,
LogLineType: llt,
LogLine: fields[0].(string),
})
case ResSetPhase:
phase := fields[0].(string)
am.logs = append(am.logs, &ActivityLog{
Timestamp: now,
NewPhase: phase,
})
am.phase = phase
if VerboseActivities {
log.Printf("%d [NEW PHASE] %v", am.ActivityID, phase)
}
case ResProgress:
am.progress = fields
if am.firstProgressLog == nil {
// If we've never logged progress before, log it.
log := &ActivityLog{
Timestamp: now,
Progress: fields,
}
am.logs = append(am.logs, log)
am.firstProgressLog = log
} else if am.lastProgressLog == nil {
// If we don't have a tail log entry, log it.
log := &ActivityLog{
Timestamp: now,
Progress: fields,
}
am.logs = append(am.logs, log)
am.lastProgressLog = log
am.lastProgressLogCreated = now
} else {
am.lastProgressLog.Timestamp = now
am.lastProgressLog.Progress = fields
if time.Since(am.lastProgressLogCreated) > progressLogRateLimit {
// If our last progress log was more than rate limit ago, bump the log entry forwards and mark that we should create a new one.
if VerboseActivities {
log.Printf("%d [PROGRESS] %v", am.ActivityID, fields)
}
am.lastProgressLog = nil
}
}
case ResSetExpected:
am.expected = fields
am.logs = append(am.logs, &ActivityLog{
Timestamp: now,
Expected: fields,
})
if VerboseActivities {
log.Printf("%d [EXPECTED] %v", am.ActivityID, fields)
}
}
am.mu.Unlock()
}
var (
activityRegistry = map[ActivityType]func(ActivityMeta) Activity{}
)
func registerActivityType(a Activity, factory func(ActivityMeta) Activity) {
activityRegistry[a.ActivityType()] = factory
}
type CopyPathActivity struct{ ActivityMeta }
func (*CopyPathActivity) ActivityType() ActivityType { return ActCopyPath }
func (a *CopyPathActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&CopyPathActivity{}, func(am ActivityMeta) Activity { return &CopyPathActivity{am} })
}
type DownloadActivity struct{ ActivityMeta }
func (*DownloadActivity) ActivityType() ActivityType { return ActDownload }
func (a *DownloadActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&DownloadActivity{}, func(am ActivityMeta) Activity { return &DownloadActivity{am} })
}
type RealiseActivity struct{ ActivityMeta }
func (*RealiseActivity) ActivityType() ActivityType { return ActRealise }
func (a *RealiseActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&RealiseActivity{}, func(am ActivityMeta) Activity { return &RealiseActivity{am} })
}
type BuildsActivity struct{ ActivityMeta }
func (*BuildsActivity) ActivityType() ActivityType { return ActBuilds }
func (a *BuildsActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&BuildsActivity{}, func(am ActivityMeta) Activity { return &BuildsActivity{am} })
}
type BuildActivity struct{ ActivityMeta }
func (*BuildActivity) ActivityType() ActivityType { return ActBuild }
func (a *BuildActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&BuildActivity{}, func(am ActivityMeta) Activity { return &BuildActivity{am} })
}
type OptimiseStoreActivity struct{ ActivityMeta }
func (*OptimiseStoreActivity) ActivityType() ActivityType { return ActOptimiseStore }
func (a *OptimiseStoreActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&OptimiseStoreActivity{}, func(am ActivityMeta) Activity { return &OptimiseStoreActivity{am} })
}
type VerifyPathsActivity struct{ ActivityMeta }
func (*VerifyPathsActivity) ActivityType() ActivityType { return ActVerifyPaths }
func (a *VerifyPathsActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&VerifyPathsActivity{}, func(am ActivityMeta) Activity { return &VerifyPathsActivity{am} })
}
type SubstituteActivity struct{ ActivityMeta }
func (*SubstituteActivity) ActivityType() ActivityType { return ActSubstitute }
func (a *SubstituteActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&SubstituteActivity{}, func(am ActivityMeta) Activity { return &SubstituteActivity{am} })
}
type QueryPathInfoActivity struct{ ActivityMeta }
func (*QueryPathInfoActivity) ActivityType() ActivityType { return ActQueryPathInfo }
func (a *QueryPathInfoActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&QueryPathInfoActivity{}, func(am ActivityMeta) Activity { return &QueryPathInfoActivity{am} })
}
type PostBuildHookActivity struct{ ActivityMeta }
func (*PostBuildHookActivity) ActivityType() ActivityType { return ActPostBuildHook }
func (a *PostBuildHookActivity) Meta() *ActivityMeta { return &a.ActivityMeta }
func init() {
registerActivityType(&PostBuildHookActivity{}, func(am ActivityMeta) Activity { return &PostBuildHookActivity{am} })
}
func (at *ActivityTracker) StartAction(actionName string) *ActivityLogger {
if at == nil {
return nil
}
al := &ActivityLogger{
action: actionName,
at: at,
activities: make(map[uint64]Activity),
}
at.mu.Lock()
at.actions = append(at.actions, al)
at.mu.Unlock()
return al
}
func (at *ActivityTracker) EndAction(al *ActivityLogger) {
if at == nil {
return
}
at.mu.Lock()
newActions := make([]*ActivityLogger, 0, len(at.actions)-1)
for _, oal := range at.actions {
if oal != al {
newActions = append(newActions, oal)
}
}
at.actions = newActions
at.mu.Unlock()
}
func (al *ActivityLogger) Close() {
if al == nil {
return
}
al.at.EndAction(al)
}
func (al *ActivityLogger) StartActivity(at ActivityType, am ActivityMeta) Activity {
if al == nil {
return nil
}
af, ok := activityRegistry[at]
if !ok {
return nil
}
a := af(am)
al.mu.Lock()
al.logs = append(al.logs, ActivityMetaLog{
Timestamp: time.Now(),
ActivityStarted: a,
})
al.activities[am.ActivityID] = a
al.mu.Unlock()
if VerboseActivities {
log.Printf("%d [START] %s", am.ActivityID, at)
}
return a
}
func (al *ActivityLogger) Activity(activityID uint64) Activity {
if al == nil {
return nil
}
var a Activity
al.mu.Lock()
a = al.activities[activityID]
al.mu.Unlock()
return a
}
func (al *ActivityLogger) ActivityResult(a Activity, resultType ResultType, data []any) {
if a == nil || al == nil {
return
}
a.Meta().RecordResult(resultType, data)
}
func (al *ActivityLogger) EndActivity(a Activity) {
if al == nil || a == nil {
return
}
if VerboseActivities {
log.Printf("%d [END] %s", a.Meta().ActivityID, a.ActivityType())
}
al.mu.Lock()
al.logs = append(al.logs, ActivityMetaLog{
Timestamp: time.Now(),
ActivityFinished: a,
})
al.mu.Unlock()
}