Files
buildkit/executor/resources/monitor.go
2025-09-09 14:13:04 +02:00

287 lines
6.0 KiB
Go

package resources
import (
"bufio"
"context"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
resourcestypes "github.com/moby/buildkit/executor/resources/types"
"github.com/moby/buildkit/util/bklog"
"github.com/prometheus/procfs"
)
const (
cgroupProcsFile = "cgroup.procs"
cgroupControllersFile = "cgroup.controllers"
cgroupSubtreeFile = "cgroup.subtree_control"
defaultMountpoint = "/sys/fs/cgroup"
initGroup = "init"
)
var initOnce sync.Once
var isCgroupV2 bool
type cgroupRecord struct {
once sync.Once
ns string
sampler *Sub[*resourcestypes.Sample]
closeSampler func() error
samples []*resourcestypes.Sample
err error
done chan struct{}
monitor *Monitor
netSampler NetworkSampler
startCPUStat *procfs.CPUStat
sysCPUStat *resourcestypes.SysCPUStat
}
func (r *cgroupRecord) Wait() error {
go r.close()
<-r.done
return r.err
}
func (r *cgroupRecord) Start() {
if stat, err := r.monitor.proc.Stat(); err == nil {
r.startCPUStat = &stat.CPUTotal
}
s := NewSampler(2*time.Second, 10, r.sample)
r.sampler = s.Record()
r.closeSampler = s.Close
}
func (r *cgroupRecord) Close() {
r.close()
}
func (r *cgroupRecord) CloseAsync(next func(context.Context) error) error {
go func() {
r.close()
next(context.TODO())
}()
return nil
}
func (r *cgroupRecord) close() {
r.once.Do(func() {
defer close(r.done)
go func() {
r.monitor.mu.Lock()
delete(r.monitor.records, r.ns)
r.monitor.mu.Unlock()
}()
if r.sampler == nil {
return
}
s, err := r.sampler.Close(true)
if err != nil {
r.err = err
} else {
r.samples = s
}
r.closeSampler()
if r.startCPUStat != nil {
stat, err := r.monitor.proc.Stat()
if err == nil {
cpu := &resourcestypes.SysCPUStat{
User: stat.CPUTotal.User - r.startCPUStat.User,
Nice: stat.CPUTotal.Nice - r.startCPUStat.Nice,
System: stat.CPUTotal.System - r.startCPUStat.System,
Idle: stat.CPUTotal.Idle - r.startCPUStat.Idle,
Iowait: stat.CPUTotal.Iowait - r.startCPUStat.Iowait,
IRQ: stat.CPUTotal.IRQ - r.startCPUStat.IRQ,
SoftIRQ: stat.CPUTotal.SoftIRQ - r.startCPUStat.SoftIRQ,
Steal: stat.CPUTotal.Steal - r.startCPUStat.Steal,
Guest: stat.CPUTotal.Guest - r.startCPUStat.Guest,
GuestNice: stat.CPUTotal.GuestNice - r.startCPUStat.GuestNice,
}
r.sysCPUStat = cpu
}
}
})
}
func (r *cgroupRecord) sample(tm time.Time) (*resourcestypes.Sample, error) {
cpu, err := getCgroupCPUStat(filepath.Join(defaultMountpoint, r.ns))
if err != nil {
return nil, err
}
memory, err := getCgroupMemoryStat(filepath.Join(defaultMountpoint, r.ns))
if err != nil {
return nil, err
}
io, err := getCgroupIOStat(filepath.Join(defaultMountpoint, r.ns))
if err != nil {
return nil, err
}
pids, err := getCgroupPIDsStat(filepath.Join(defaultMountpoint, r.ns))
if err != nil {
return nil, err
}
sample := &resourcestypes.Sample{
Timestamp_: tm,
CPUStat: cpu,
MemoryStat: memory,
IOStat: io,
PIDsStat: pids,
}
if r.netSampler != nil {
net, err := r.netSampler.Sample()
if err != nil {
return nil, err
}
sample.NetStat = net
}
return sample, nil
}
func (r *cgroupRecord) Samples() (*resourcestypes.Samples, error) {
<-r.done
if r.err != nil {
return nil, r.err
}
return &resourcestypes.Samples{
Samples: r.samples,
SysCPUStat: r.sysCPUStat,
}, nil
}
type nopRecord struct {
}
func (r *nopRecord) Wait() error {
return nil
}
func (r *nopRecord) Samples() (*resourcestypes.Samples, error) {
return nil, nil
}
func (r *nopRecord) Close() {
}
func (r *nopRecord) CloseAsync(next func(context.Context) error) error {
return next(context.TODO())
}
func (r *nopRecord) Start() {
}
type Monitor struct {
mu sync.Mutex
closed chan struct{}
records map[string]*cgroupRecord
proc procfs.FS
}
type NetworkSampler interface {
Sample() (*resourcestypes.NetworkSample, error)
}
type RecordOpt struct {
NetworkSampler NetworkSampler
}
func (m *Monitor) RecordNamespace(ns string, opt RecordOpt) (resourcestypes.Recorder, error) {
isClosed := false
select {
case <-m.closed:
isClosed = true
default:
}
if !isCgroupV2 || isClosed {
return &nopRecord{}, nil
}
r := &cgroupRecord{
ns: ns,
done: make(chan struct{}),
monitor: m,
netSampler: opt.NetworkSampler,
}
m.mu.Lock()
m.records[ns] = r
m.mu.Unlock()
return r, nil
}
func (m *Monitor) Close() error {
close(m.closed)
m.mu.Lock()
defer m.mu.Unlock()
for _, r := range m.records {
r.close()
}
return nil
}
func NewMonitor() (*Monitor, error) {
initOnce.Do(func() {
isCgroupV2 = isCgroup2()
if !isCgroupV2 {
return
}
if err := prepareCgroupControllers(); err != nil {
bklog.L.Warnf("failed to prepare cgroup controllers: %+v", err)
}
})
fs, err := procfs.NewDefaultFS()
if err != nil {
return nil, err
}
return &Monitor{
closed: make(chan struct{}),
records: make(map[string]*cgroupRecord),
proc: fs,
}, nil
}
func prepareCgroupControllers() error {
v, ok := os.LookupEnv("BUILDKIT_SETUP_CGROUPV2_ROOT")
if !ok {
return nil
}
if b, _ := strconv.ParseBool(v); !b {
return nil
}
// move current process to init cgroup
if err := os.MkdirAll(filepath.Join(defaultMountpoint, initGroup), 0755); err != nil {
return err
}
f, err := os.OpenFile(filepath.Join(defaultMountpoint, cgroupProcsFile), os.O_RDONLY, 0)
if err != nil {
return err
}
s := bufio.NewScanner(f)
for s.Scan() {
if err := os.WriteFile(filepath.Join(defaultMountpoint, initGroup, cgroupProcsFile), s.Bytes(), 0); err != nil {
return err
}
}
if err := f.Close(); err != nil {
return err
}
dt, err := os.ReadFile(filepath.Join(defaultMountpoint, cgroupControllersFile))
if err != nil {
return err
}
for c := range strings.SplitSeq(string(dt), " ") {
if c == "" {
continue
}
if err := os.WriteFile(filepath.Join(defaultMountpoint, cgroupSubtreeFile), []byte("+"+c), 0); err != nil {
// ignore error
bklog.L.Warnf("failed to enable cgroup controller %q: %+v", c, err)
}
}
return nil
}