mirror of
https://github.com/moby/buildkit.git
synced 2026-06-30 19:57:39 +00:00
783 lines
22 KiB
Go
783 lines
22 KiB
Go
package control
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/trace"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
contentapi "github.com/containerd/containerd/api/services/content/v1"
|
|
"github.com/containerd/containerd/v2/core/content"
|
|
"github.com/containerd/containerd/v2/plugins/services/content/contentserver"
|
|
"github.com/distribution/reference"
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/mitchellh/hashstructure/v2"
|
|
controlapi "github.com/moby/buildkit/api/services/control"
|
|
apitypes "github.com/moby/buildkit/api/types"
|
|
"github.com/moby/buildkit/cache/remotecache"
|
|
"github.com/moby/buildkit/client"
|
|
"github.com/moby/buildkit/client/llb"
|
|
"github.com/moby/buildkit/cmd/buildkitd/config"
|
|
controlgateway "github.com/moby/buildkit/control/gateway"
|
|
"github.com/moby/buildkit/exporter"
|
|
"github.com/moby/buildkit/exporter/containerimage/exptypes"
|
|
"github.com/moby/buildkit/exporter/util/epoch"
|
|
"github.com/moby/buildkit/frontend"
|
|
"github.com/moby/buildkit/frontend/attestations"
|
|
"github.com/moby/buildkit/session"
|
|
"github.com/moby/buildkit/session/grpchijack"
|
|
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
|
|
"github.com/moby/buildkit/solver"
|
|
"github.com/moby/buildkit/solver/bboltcachestorage"
|
|
"github.com/moby/buildkit/solver/llbsolver"
|
|
"github.com/moby/buildkit/solver/llbsolver/cdidevices"
|
|
"github.com/moby/buildkit/solver/llbsolver/proc"
|
|
"github.com/moby/buildkit/solver/pb"
|
|
"github.com/moby/buildkit/util/bklog"
|
|
"github.com/moby/buildkit/util/db"
|
|
"github.com/moby/buildkit/util/entitlements"
|
|
"github.com/moby/buildkit/util/imageutil"
|
|
"github.com/moby/buildkit/util/leaseutil"
|
|
"github.com/moby/buildkit/util/throttle"
|
|
"github.com/moby/buildkit/util/tracing/transform"
|
|
"github.com/moby/buildkit/version"
|
|
"github.com/moby/buildkit/worker"
|
|
digest "github.com/opencontainers/go-digest"
|
|
"github.com/pkg/errors"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
type Opt struct {
|
|
SessionManager *session.Manager
|
|
WorkerController *worker.Controller
|
|
Frontends map[string]frontend.Frontend
|
|
CacheManager solver.CacheManager
|
|
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
|
|
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
|
|
Entitlements []string
|
|
TraceCollector sdktrace.SpanExporter
|
|
HistoryDB db.DB
|
|
CacheStore *bboltcachestorage.Store
|
|
LeaseManager *leaseutil.Manager
|
|
ContentStore *containerdsnapshot.Store
|
|
HistoryConfig *config.HistoryConfig
|
|
GarbageCollect func(context.Context) error
|
|
GracefulStop <-chan struct{}
|
|
}
|
|
|
|
type Controller struct { // TODO: ControlService
|
|
// buildCount needs to be 64bit aligned
|
|
buildCount int64
|
|
opt Opt
|
|
solver *llbsolver.Solver
|
|
history *llbsolver.HistoryQueue
|
|
cache solver.CacheManager
|
|
gatewayForwarder *controlgateway.GatewayForwarder
|
|
throttledGC func()
|
|
throttledReleaseUnreferenced func()
|
|
gcmu sync.Mutex
|
|
tracev1.UnimplementedTraceServiceServer
|
|
}
|
|
|
|
func NewController(opt Opt) (*Controller, error) {
|
|
gatewayForwarder := controlgateway.NewGatewayForwarder()
|
|
|
|
hq, err := llbsolver.NewHistoryQueue(llbsolver.HistoryQueueOpt{
|
|
DB: opt.HistoryDB,
|
|
LeaseManager: opt.LeaseManager,
|
|
ContentStore: opt.ContentStore,
|
|
CleanConfig: opt.HistoryConfig,
|
|
GarbageCollect: opt.GarbageCollect,
|
|
GracefulStop: opt.GracefulStop,
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create history queue")
|
|
}
|
|
|
|
s, err := llbsolver.New(llbsolver.Opt{
|
|
WorkerController: opt.WorkerController,
|
|
Frontends: opt.Frontends,
|
|
CacheManager: opt.CacheManager,
|
|
CacheResolvers: opt.ResolveCacheImporterFuncs,
|
|
GatewayForwarder: gatewayForwarder,
|
|
SessionManager: opt.SessionManager,
|
|
Entitlements: opt.Entitlements,
|
|
HistoryQueue: hq,
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create solver")
|
|
}
|
|
|
|
c := &Controller{
|
|
opt: opt,
|
|
solver: s,
|
|
history: hq,
|
|
cache: opt.CacheManager,
|
|
gatewayForwarder: gatewayForwarder,
|
|
}
|
|
c.throttledGC = throttle.After(time.Minute, c.gc)
|
|
// use longer interval for releaseUnreferencedCache deleting links quickly is less important
|
|
c.throttledReleaseUnreferenced = throttle.After(5*time.Minute, func() { c.releaseUnreferencedCache(context.TODO()) })
|
|
|
|
defer func() {
|
|
time.AfterFunc(time.Second, c.throttledGC)
|
|
}()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (c *Controller) Close() error {
|
|
rerr := c.opt.HistoryDB.Close()
|
|
if err := c.opt.WorkerController.Close(); err != nil {
|
|
rerr = multierror.Append(rerr, err)
|
|
}
|
|
if err := c.opt.CacheStore.Close(); err != nil {
|
|
rerr = multierror.Append(rerr, err)
|
|
}
|
|
if err := c.solver.Close(); err != nil {
|
|
rerr = multierror.Append(rerr, err)
|
|
}
|
|
return rerr
|
|
}
|
|
|
|
func (c *Controller) Register(server *grpc.Server) {
|
|
controlapi.RegisterControlServer(server, c)
|
|
c.gatewayForwarder.Register(server)
|
|
tracev1.RegisterTraceServiceServer(server, c)
|
|
|
|
store := &roContentStore{c.opt.ContentStore.WithFallbackNS(c.opt.ContentStore.Namespace() + "_history")}
|
|
contentapi.RegisterContentServer(server, contentserver.New(store))
|
|
}
|
|
|
|
func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
|
|
resp := &controlapi.DiskUsageResponse{}
|
|
workers, err := c.opt.WorkerController.List()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, w := range workers {
|
|
du, err := w.DiskUsage(ctx, client.DiskUsageInfo{
|
|
Filter: r.Filter,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, r := range du {
|
|
resp.Record = append(resp.Record, &controlapi.UsageRecord{
|
|
// TODO: add worker info
|
|
ID: r.ID,
|
|
Mutable: r.Mutable,
|
|
InUse: r.InUse,
|
|
Size: r.Size,
|
|
Parents: r.Parents,
|
|
UsageCount: int64(r.UsageCount),
|
|
Description: r.Description,
|
|
CreatedAt: timestamppb.New(r.CreatedAt),
|
|
LastUsedAt: func() *timestamppb.Timestamp {
|
|
if r.LastUsedAt != nil {
|
|
return timestamppb.New(*r.LastUsedAt)
|
|
}
|
|
return nil
|
|
}(),
|
|
RecordType: string(r.RecordType),
|
|
Shared: r.Shared,
|
|
})
|
|
}
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *Controller) releaseUnreferencedCache(ctx context.Context) error {
|
|
return c.cache.ReleaseUnreferenced(ctx)
|
|
}
|
|
|
|
func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Control_PruneServer) error {
|
|
if atomic.LoadInt64(&c.buildCount) == 0 {
|
|
imageutil.CancelCacheLeases()
|
|
}
|
|
|
|
ch := make(chan client.UsageInfo, 32)
|
|
|
|
eg, ctx := errgroup.WithContext(stream.Context())
|
|
workers, err := c.opt.WorkerController.List()
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to list workers for prune")
|
|
}
|
|
|
|
didPrune := false
|
|
defer func() {
|
|
if didPrune {
|
|
if c, ok := c.cache.(interface {
|
|
ReleaseUnreferenced(context.Context) error
|
|
}); ok {
|
|
if err := c.ReleaseUnreferenced(ctx); err != nil {
|
|
bklog.G(ctx).Errorf("failed to release cache metadata: %+v", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
for _, w := range workers {
|
|
func(w worker.Worker) {
|
|
eg.Go(func() error {
|
|
return w.Prune(ctx, ch, client.PruneInfo{
|
|
Filter: req.Filter,
|
|
All: req.All,
|
|
KeepDuration: time.Duration(req.KeepDuration),
|
|
ReservedSpace: req.ReservedSpace,
|
|
MaxUsedSpace: req.MaxUsedSpace,
|
|
MinFreeSpace: req.MinFreeSpace,
|
|
})
|
|
})
|
|
}(w)
|
|
}
|
|
|
|
eg2, _ := errgroup.WithContext(stream.Context())
|
|
|
|
eg2.Go(func() error {
|
|
defer close(ch)
|
|
return eg.Wait()
|
|
})
|
|
|
|
eg2.Go(func() error {
|
|
defer func() {
|
|
// drain channel on error
|
|
for range ch {
|
|
}
|
|
}()
|
|
for r := range ch {
|
|
didPrune = true
|
|
if err := stream.Send(&controlapi.UsageRecord{
|
|
// TODO: add worker info
|
|
ID: r.ID,
|
|
Mutable: r.Mutable,
|
|
InUse: r.InUse,
|
|
Size: r.Size,
|
|
Parents: r.Parents,
|
|
UsageCount: int64(r.UsageCount),
|
|
Description: r.Description,
|
|
CreatedAt: timestamppb.New(r.CreatedAt),
|
|
LastUsedAt: func() *timestamppb.Timestamp {
|
|
if r.LastUsedAt != nil {
|
|
return timestamppb.New(*r.LastUsedAt)
|
|
}
|
|
return nil
|
|
}(),
|
|
RecordType: string(r.RecordType),
|
|
Shared: r.Shared,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return eg2.Wait()
|
|
}
|
|
|
|
func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
|
|
if c.opt.TraceCollector == nil {
|
|
return nil, status.Errorf(codes.Unavailable, "trace collector not configured")
|
|
}
|
|
err := c.opt.TraceCollector.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &tracev1.ExportTraceServiceResponse{}, nil
|
|
}
|
|
|
|
func (c *Controller) ListenBuildHistory(req *controlapi.BuildHistoryRequest, srv controlapi.Control_ListenBuildHistoryServer) error {
|
|
if err := sendTimestampHeader(srv); err != nil {
|
|
return err
|
|
}
|
|
return c.history.Listen(srv.Context(), req, func(h *controlapi.BuildHistoryEvent) error {
|
|
if err := srv.Send(h); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (c *Controller) UpdateBuildHistory(ctx context.Context, req *controlapi.UpdateBuildHistoryRequest) (*controlapi.UpdateBuildHistoryResponse, error) {
|
|
if req.Delete {
|
|
c.history.Finalize(ctx, req.Ref) // ignore error
|
|
err := c.history.Delete(ctx, req.Ref)
|
|
return &controlapi.UpdateBuildHistoryResponse{}, err
|
|
}
|
|
|
|
if req.Finalize {
|
|
err := c.history.Finalize(ctx, req.Ref)
|
|
return &controlapi.UpdateBuildHistoryResponse{}, err
|
|
}
|
|
|
|
err := c.history.UpdateRef(ctx, req.Ref, func(r *controlapi.BuildHistoryRecord) error {
|
|
if req.Pinned == r.Pinned {
|
|
return nil
|
|
}
|
|
r.Pinned = req.Pinned
|
|
return nil
|
|
})
|
|
return &controlapi.UpdateBuildHistoryResponse{}, err
|
|
}
|
|
|
|
func translateLegacySolveRequest(req *controlapi.SolveRequest) {
|
|
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
|
|
if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" {
|
|
ex := &controlapi.CacheOptionsEntry{
|
|
Type: "registry",
|
|
Attrs: req.Cache.ExportAttrsDeprecated,
|
|
}
|
|
if ex.Attrs == nil {
|
|
ex.Attrs = make(map[string]string)
|
|
}
|
|
ex.Attrs["ref"] = legacyExportRef
|
|
// FIXME(AkihiroSuda): skip append if already exists
|
|
req.Cache.Exports = append(req.Cache.Exports, ex)
|
|
req.Cache.ExportRefDeprecated = ""
|
|
req.Cache.ExportAttrsDeprecated = nil
|
|
}
|
|
|
|
// translates ImportRefs to new Imports (v0.4.0)
|
|
for _, legacyImportRef := range req.Cache.ImportRefsDeprecated {
|
|
im := &controlapi.CacheOptionsEntry{
|
|
Type: "registry",
|
|
Attrs: map[string]string{"ref": legacyImportRef},
|
|
}
|
|
// FIXME(AkihiroSuda): skip append if already exists
|
|
req.Cache.Imports = append(req.Cache.Imports, im)
|
|
}
|
|
req.Cache.ImportRefsDeprecated = nil
|
|
|
|
// translate single exporter to a slice (v0.13.0)
|
|
if len(req.Exporters) == 0 && req.ExporterDeprecated != "" {
|
|
req.Exporters = append(req.Exporters, &controlapi.Exporter{
|
|
Type: req.ExporterDeprecated,
|
|
Attrs: req.ExporterAttrsDeprecated,
|
|
})
|
|
req.ExporterDeprecated = ""
|
|
req.ExporterAttrsDeprecated = nil
|
|
}
|
|
}
|
|
|
|
func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) {
|
|
defer trace.StartRegion(ctx, "Solve").End()
|
|
trace.Logf(ctx, "Request", "solve request: %v", req.Ref)
|
|
atomic.AddInt64(&c.buildCount, 1)
|
|
defer atomic.AddInt64(&c.buildCount, -1)
|
|
|
|
if req.Cache == nil {
|
|
req.Cache = &controlapi.CacheOptions{} // make sure cache options are initialized
|
|
}
|
|
translateLegacySolveRequest(req)
|
|
|
|
defer func() {
|
|
time.AfterFunc(time.Second, c.throttledGC)
|
|
}()
|
|
|
|
// TODO: multiworker
|
|
// This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this.
|
|
w, err := c.opt.WorkerController.GetDefault()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if SOURCE_DATE_EPOCH is set, enable it for the exporters
|
|
if v, ok := epoch.ParseBuildArgs(req.FrontendAttrs); ok {
|
|
for _, ex := range req.Exporters {
|
|
if _, ok := ex.Attrs[string(exptypes.OptKeySourceDateEpoch)]; !ok {
|
|
if ex.Attrs == nil {
|
|
ex.Attrs = make(map[string]string)
|
|
}
|
|
ex.Attrs[string(exptypes.OptKeySourceDateEpoch)] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
var expis []exporter.ExporterInstance
|
|
for i, ex := range req.Exporters {
|
|
exp, err := w.Exporter(ex.Type, c.opt.SessionManager)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bklog.G(ctx).Debugf("resolve exporter %s with %v", ex.Type, ex.Attrs)
|
|
expi, err := exp.Resolve(ctx, i, ex.Attrs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
expis = append(expis, expi)
|
|
}
|
|
|
|
if c, err := findDuplicateCacheOptions(req.Cache.Exports); err != nil {
|
|
return nil, err
|
|
} else if c != nil {
|
|
types := []string{}
|
|
for _, c := range c {
|
|
types = append(types, c.Type)
|
|
}
|
|
return nil, errors.Errorf("duplicate cache exports %s", types)
|
|
}
|
|
var cacheExporters []llbsolver.RemoteCacheExporter
|
|
for _, e := range req.Cache.Exports {
|
|
cacheExporterFunc, ok := c.opt.ResolveCacheExporterFuncs[e.Type]
|
|
if !ok {
|
|
return nil, errors.Errorf("unknown cache exporter: %q", e.Type)
|
|
}
|
|
var exp llbsolver.RemoteCacheExporter
|
|
exp.Exporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to configure %v cache exporter", e.Type)
|
|
}
|
|
if exp.Exporter == nil {
|
|
bklog.G(ctx).Debugf("cache exporter resolver for %v returned nil, skipping exporter", e.Type)
|
|
continue
|
|
}
|
|
if exportMode, supported := parseCacheExportMode(e.Attrs["mode"]); !supported {
|
|
bklog.G(ctx).Debugf("skipping invalid cache export mode: %s", e.Attrs["mode"])
|
|
} else {
|
|
exp.CacheExportMode = exportMode
|
|
}
|
|
if ignoreErrorStr, ok := e.Attrs["ignore-error"]; ok {
|
|
if ignoreError, supported := parseCacheExportIgnoreError(ignoreErrorStr); !supported {
|
|
bklog.G(ctx).Debugf("skipping invalid cache export ignore-error: %s", e.Attrs["ignore-error"])
|
|
} else {
|
|
exp.IgnoreError = ignoreError
|
|
}
|
|
}
|
|
cacheExporters = append(cacheExporters, exp)
|
|
}
|
|
|
|
var cacheImports []frontend.CacheOptionsEntry
|
|
for _, im := range req.Cache.Imports {
|
|
if im == nil {
|
|
continue
|
|
}
|
|
cacheImports = append(cacheImports, frontend.CacheOptionsEntry{
|
|
Type: im.Type,
|
|
Attrs: im.Attrs,
|
|
})
|
|
}
|
|
|
|
attests, err := attestations.Parse(req.FrontendAttrs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var procs []llbsolver.Processor
|
|
|
|
if attrs, ok := attests["sbom"]; ok {
|
|
var ref reference.Named
|
|
params := make(map[string]string)
|
|
for k, v := range attrs {
|
|
if k == "generator" {
|
|
if v == "" {
|
|
return nil, errors.Errorf("sbom generator cannot be empty")
|
|
}
|
|
ref, err = reference.ParseNormalizedNamed(v)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to parse sbom generator %s", v)
|
|
}
|
|
ref = reference.TagNameOnly(ref)
|
|
} else {
|
|
params[k] = v
|
|
}
|
|
}
|
|
|
|
useCache := true
|
|
if v, ok := req.FrontendAttrs["no-cache"]; ok && v == "" {
|
|
// disable cache if cache is disabled for all stages
|
|
useCache = false
|
|
}
|
|
resolveMode := llb.ResolveModeDefault.String()
|
|
if v, ok := req.FrontendAttrs["image-resolve-mode"]; ok {
|
|
resolveMode = v
|
|
}
|
|
|
|
procs = append(procs, proc.SBOMProcessor(ref.String(), useCache, resolveMode, params))
|
|
}
|
|
|
|
if attrs, ok := attests["provenance"]; ok {
|
|
procs = append(procs, proc.ProvenanceProcessor(attrs))
|
|
}
|
|
|
|
resp, err := c.solver.Solve(ctx, req.Ref, req.Session, frontend.SolveRequest{
|
|
Frontend: req.Frontend,
|
|
Definition: req.Definition,
|
|
FrontendOpt: req.FrontendAttrs,
|
|
FrontendInputs: req.FrontendInputs,
|
|
CacheImports: cacheImports,
|
|
}, llbsolver.ExporterRequest{
|
|
Exporters: expis,
|
|
CacheExporters: cacheExporters,
|
|
EnableSessionExporter: req.EnableSessionExporter,
|
|
}, entitlementsFromPB(req.Entitlements), procs, req.Internal, req.SourcePolicy)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &controlapi.SolveResponse{
|
|
ExporterResponse: resp.ExporterResponse,
|
|
}, nil
|
|
}
|
|
|
|
func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Control_StatusServer) error {
|
|
if err := sendTimestampHeader(stream); err != nil {
|
|
return err
|
|
}
|
|
ch := make(chan *client.SolveStatus, 8)
|
|
|
|
eg, ctx := errgroup.WithContext(stream.Context())
|
|
eg.Go(func() error {
|
|
return c.solver.Status(ctx, req.Ref, ch)
|
|
})
|
|
|
|
eg.Go(func() error {
|
|
defer func() {
|
|
// drain channel on error
|
|
for range ch {
|
|
}
|
|
}()
|
|
for {
|
|
ss, ok := <-ch
|
|
if !ok {
|
|
return nil
|
|
}
|
|
for _, sr := range ss.Marshal() {
|
|
if err := stream.SendMsg(sr); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (c *Controller) Session(stream controlapi.Control_SessionServer) error {
|
|
bklog.G(stream.Context()).Debugf("session started")
|
|
|
|
conn, closeCh, opts := grpchijack.Hijack(stream)
|
|
defer conn.Close()
|
|
|
|
ctx, cancel := context.WithCancelCause(stream.Context())
|
|
go func() {
|
|
<-closeCh
|
|
cancel(errors.WithStack(context.Canceled))
|
|
}()
|
|
|
|
err := c.opt.SessionManager.HandleConn(ctx, conn, opts)
|
|
bklog.G(ctx).Debugf("session finished: %v", err)
|
|
return err
|
|
}
|
|
|
|
func (c *Controller) ListWorkers(ctx context.Context, r *controlapi.ListWorkersRequest) (*controlapi.ListWorkersResponse, error) {
|
|
resp := &controlapi.ListWorkersResponse{}
|
|
workers, err := c.opt.WorkerController.List(r.Filter...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, w := range workers {
|
|
resp.Record = append(resp.Record, &apitypes.WorkerRecord{
|
|
ID: w.ID(),
|
|
Labels: w.Labels(),
|
|
Platforms: pb.PlatformsFromSpec(w.Platforms(true)),
|
|
GCPolicy: toPBGCPolicy(w.GCPolicy()),
|
|
BuildkitVersion: toPBBuildkitVersion(w.BuildkitVersion()),
|
|
CDIDevices: toPBCDIDevices(w.CDIManager()),
|
|
})
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *Controller) Info(ctx context.Context, r *controlapi.InfoRequest) (*controlapi.InfoResponse, error) {
|
|
return &controlapi.InfoResponse{
|
|
BuildkitVersion: &apitypes.BuildkitVersion{
|
|
Package: version.Package,
|
|
Version: version.Version,
|
|
Revision: version.Revision,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (c *Controller) gc() {
|
|
c.gcmu.Lock()
|
|
defer c.gcmu.Unlock()
|
|
|
|
workers, err := c.opt.WorkerController.List()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
eg, ctx := errgroup.WithContext(context.TODO())
|
|
|
|
var size int64
|
|
ch := make(chan client.UsageInfo)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
for ui := range ch {
|
|
size += ui.Size
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
for _, w := range workers {
|
|
eg.Go(func() error {
|
|
if policy := w.GCPolicy(); len(policy) > 0 {
|
|
return w.Prune(ctx, ch, policy...)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
err = eg.Wait()
|
|
close(ch)
|
|
if err != nil {
|
|
bklog.G(ctx).Errorf("gc error: %+v", err)
|
|
}
|
|
<-done
|
|
if size > 0 {
|
|
bklog.G(ctx).Debugf("gc cleaned up %d bytes", size)
|
|
go c.throttledReleaseUnreferenced()
|
|
}
|
|
}
|
|
|
|
func parseCacheExportMode(mode string) (solver.CacheExportMode, bool) {
|
|
switch mode {
|
|
case "min":
|
|
return solver.CacheExportModeMin, true
|
|
case "max":
|
|
return solver.CacheExportModeMax, true
|
|
}
|
|
return solver.CacheExportModeMin, false
|
|
}
|
|
|
|
func parseCacheExportIgnoreError(ignoreErrorStr string) (bool, bool) {
|
|
ignoreError, err := strconv.ParseBool(ignoreErrorStr)
|
|
if err != nil {
|
|
return false, false
|
|
}
|
|
return ignoreError, true
|
|
}
|
|
|
|
func toPBGCPolicy(in []client.PruneInfo) []*apitypes.GCPolicy {
|
|
policy := make([]*apitypes.GCPolicy, 0, len(in))
|
|
for _, p := range in {
|
|
policy = append(policy, &apitypes.GCPolicy{
|
|
All: p.All,
|
|
Filters: p.Filter,
|
|
KeepDuration: int64(p.KeepDuration),
|
|
ReservedSpace: p.ReservedSpace,
|
|
MaxUsedSpace: p.MaxUsedSpace,
|
|
MinFreeSpace: p.MinFreeSpace,
|
|
})
|
|
}
|
|
return policy
|
|
}
|
|
|
|
func toPBBuildkitVersion(in client.BuildkitVersion) *apitypes.BuildkitVersion {
|
|
return &apitypes.BuildkitVersion{
|
|
Package: in.Package,
|
|
Version: in.Version,
|
|
Revision: in.Revision,
|
|
}
|
|
}
|
|
|
|
func toPBCDIDevices(manager *cdidevices.Manager) []*apitypes.CDIDevice {
|
|
if manager == nil {
|
|
return nil
|
|
}
|
|
devs := manager.ListDevices()
|
|
out := make([]*apitypes.CDIDevice, 0, len(devs))
|
|
for _, dev := range devs {
|
|
out = append(out, &apitypes.CDIDevice{
|
|
Name: dev.Name,
|
|
AutoAllow: dev.AutoAllow,
|
|
Annotations: dev.Annotations,
|
|
OnDemand: dev.OnDemand,
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func findDuplicateCacheOptions(cacheOpts []*controlapi.CacheOptionsEntry) ([]*controlapi.CacheOptionsEntry, error) {
|
|
seen := map[string]*controlapi.CacheOptionsEntry{}
|
|
duplicate := map[string]struct{}{}
|
|
for _, opt := range cacheOpts {
|
|
k, err := cacheOptKey(opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if _, ok := seen[k]; ok {
|
|
duplicate[k] = struct{}{}
|
|
}
|
|
seen[k] = opt
|
|
}
|
|
|
|
var duplicates []*controlapi.CacheOptionsEntry
|
|
for k := range duplicate {
|
|
duplicates = append(duplicates, seen[k])
|
|
}
|
|
return duplicates, nil
|
|
}
|
|
|
|
func cacheOptKey(opt *controlapi.CacheOptionsEntry) (string, error) {
|
|
if opt.Type == "registry" && opt.Attrs["ref"] != "" {
|
|
return opt.Attrs["ref"], nil
|
|
}
|
|
rawOpt := struct {
|
|
Type string
|
|
Attrs map[string]string
|
|
}{
|
|
Type: opt.Type,
|
|
Attrs: opt.Attrs,
|
|
}
|
|
hash, err := hashstructure.Hash(rawOpt, hashstructure.FormatV2, nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return fmt.Sprint(opt.Type, ":", hash), nil
|
|
}
|
|
|
|
type roContentStore struct {
|
|
content.Store
|
|
}
|
|
|
|
func (cs *roContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
|
return nil, errors.Errorf("read-only content store")
|
|
}
|
|
|
|
func (cs *roContentStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
|
return errors.Errorf("read-only content store")
|
|
}
|
|
|
|
func (cs *roContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
|
return content.Info{}, errors.Errorf("read-only content store")
|
|
}
|
|
|
|
func (cs *roContentStore) Abort(ctx context.Context, ref string) error {
|
|
return errors.Errorf("read-only content store")
|
|
}
|
|
|
|
const timestampKey = "buildkit-current-timestamp"
|
|
|
|
func sendTimestampHeader(srv grpc.ServerStream) error {
|
|
return srv.SendHeader(metadata.Pairs(timestampKey, time.Now().Format(time.RFC3339Nano)))
|
|
}
|
|
|
|
func entitlementsFromPB(elems []string) []entitlements.Entitlement {
|
|
clone := make([]entitlements.Entitlement, len(elems))
|
|
for i, e := range elems {
|
|
clone[i] = entitlements.Entitlement(e)
|
|
}
|
|
return clone
|
|
}
|