solver: pipe implementation utilizes generics for better typing

This updates the pipe library to use generics for the request payload
and the status value. This allows the solver to put in explicit types
rather than rely on type casting from interfaces which helps with type
safety and understandability.

The status value used by the solver uses the `any` type instead of an
explicit type because the `unpark` method takes a generic list of pipes
and the different pipes have different result types. We can likely
change this in the future or create a discriminated union for the
types that can be used in this package. That is left for future work
because at least the request payload is typed now.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
This commit is contained in:
Jonathan A. Sternberg
2024-09-12 10:37:12 -05:00
parent 3a7055008a
commit ad64996f49
4 changed files with 115 additions and 109 deletions

View File

@@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
depRequests: map[pipeReceiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
@@ -65,14 +65,14 @@ type edge struct {
op activeOp
edgeState
depRequests map[pipe.Receiver]*dep
depRequests map[pipeReceiver]*dep
deps []*dep
cacheMapReq pipe.Receiver
cacheMapReq pipeReceiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
execReq pipeReceiver
execCacheLoad bool
err error
cacheRecords map[string]*CacheRecord
@@ -99,11 +99,11 @@ type edge struct {
// dep holds state for a dependant edge
type dep struct {
req pipe.Receiver
req pipeReceiver
edgeState
index Index
keyMap map[string]*CacheKey
slowCacheReq pipe.Receiver
slowCacheReq pipeReceiver
slowCacheComplete bool
slowCacheFoundKey bool
slowCacheKey *ExportableCacheKey
@@ -122,7 +122,7 @@ func newDep(i Index) *dep {
// edgePipe is a pipe for requests between two edges
type edgePipe struct {
*pipe.Pipe
*pipe.Pipe[*edgeRequest, any]
From, Target *edge
mu sync.Mutex
}
@@ -198,21 +198,21 @@ func (e *edge) isComplete() bool {
}
// finishIncoming finalizes the incoming pipe request
func (e *edge) finishIncoming(req pipe.Sender) {
func (e *edge) finishIncoming(req pipeSender) {
err := e.err
if req.Request().Canceled && err == nil {
err = context.Canceled
}
if e.debug {
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
}
req.Finalize(&e.edgeState, err)
}
// updateIncoming updates the current value of incoming pipe request
func (e *edge) updateIncoming(req pipe.Sender) {
func (e *edge) updateIncoming(req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
}
req.Update(&e.edgeState)
}
@@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool {
// requests were not completed
// 2. this function may not return outgoing requests if it has completed all
// incoming requests
func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) {
// process all incoming changes
depChanged := false
for _, upt := range updates {
@@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) {
}
// processUpdate is called by unpark for every updated pipe request
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) {
// response for cachemap request
if upt == e.cacheMapReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
@@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() {
// respondToIncoming responds to all incoming requests. completing or
// updating them when possible
func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) {
// detect the result state for the requests
allIncomingCanComplete := true
desiredState := e.state
@@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
for _, req := range incoming {
if !req.Request().Canceled {
allCanceled = false
if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
if r := req.Request().Payload; desiredState < r.desiredState {
desiredState = r.desiredState
if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
allIncomingCanComplete = false
@@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
}
// can close all but one requests
var leaveOpen pipe.Sender
var leaveOpen pipeSender
for _, req := range incoming {
if !req.Request().Canceled {
leaveOpen = req
@@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
// update incoming based on current state
for _, req := range incoming {
r := req.Request().Payload.(*edgeRequest)
r := req.Request().Payload
if req.Request().Canceled {
e.finishIncoming(req)
} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
@@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
// initialize deps state
if e.deps == nil {
e.depRequests = make(map[pipe.Receiver]*dep)
e.depRequests = make(map[pipeReceiver]*dep)
e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
for i := range e.edge.Vertex.Inputs() {
e.deps = append(e.deps, newDep(Index(i)))
@@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
if dep.state < desiredStateDep {
addNew := true
if dep.req != nil && !dep.req.Status().Completed {
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
if dep.req.Request().desiredState != desiredStateDep {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("cancel input request")
@@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("skip input request based on existing request")
@@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
}
return out
}
type (
pipeRequest = pipe.Request[*edgeRequest]
pipeSender = pipe.Sender[*edgeRequest, any]
pipeReceiver = pipe.Receiver[*edgeRequest, any]
)
func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] {
return pipe.New[*edgeRequest, any](req)
}

View File

@@ -8,67 +8,63 @@ import (
"github.com/pkg/errors"
)
type channel struct {
type channel[V any] struct {
OnSendCompletion func()
value atomic.Value
lastValue *wrappedValue
value atomic.Pointer[V]
lastValue *V
}
type wrappedValue struct {
value interface{}
}
func (c *channel) Send(v interface{}) {
c.value.Store(&wrappedValue{value: v})
func (c *channel[V]) Send(v V) {
c.value.Store(&v)
if c.OnSendCompletion != nil {
c.OnSendCompletion()
}
}
func (c *channel) Receive() (interface{}, bool) {
func (c *channel[V]) Receive() (V, bool) {
v := c.value.Load()
if v == nil || v.(*wrappedValue) == c.lastValue {
return nil, false
if v == nil || v == c.lastValue {
return *new(V), false
}
c.lastValue = v.(*wrappedValue)
return v.(*wrappedValue).value, true
c.lastValue = v
return *v, true
}
type Pipe struct {
Sender Sender
Receiver Receiver
type Pipe[Payload, Value any] struct {
Sender Sender[Payload, Value]
Receiver Receiver[Payload, Value]
OnReceiveCompletion func()
OnSendCompletion func()
}
type Request struct {
Payload interface{}
type Request[Payload any] struct {
Payload Payload
Canceled bool
}
type Sender interface {
Request() Request
Update(v interface{})
Finalize(v interface{}, err error)
Status() Status
type Sender[Payload, Value any] interface {
Request() Request[Payload]
Update(v Value)
Finalize(v Value, err error)
Status() Status[Value]
}
type Receiver interface {
type Receiver[Payload, Value any] interface {
Receive() bool
Cancel()
Status() Status
Request() interface{}
Status() Status[Value]
Request() Payload
}
type Status struct {
type Status[Value any] struct {
Canceled bool
Completed bool
Err error
Value interface{}
Value Value
}
func NewWithFunction(f func(context.Context) (interface{}, error)) (*Pipe, func()) {
p := New(Request{})
func NewWithFunction[Payload, Value any](f func(context.Context) (Value, error)) (*Pipe[Payload, Value], func()) {
p := New[Payload, Value](Request[Payload]{})
ctx, cancel := context.WithCancelCause(context.TODO())
@@ -81,27 +77,27 @@ func NewWithFunction(f func(context.Context) (interface{}, error)) (*Pipe, func(
return p, func() {
res, err := f(ctx)
if err != nil {
p.Sender.Finalize(nil, err)
p.Sender.Finalize(*new(Value), err)
return
}
p.Sender.Finalize(res, nil)
}
}
func New(req Request) *Pipe {
cancelCh := &channel{}
roundTripCh := &channel{}
pw := &sender{
func New[Payload, Value any](req Request[Payload]) *Pipe[Payload, Value] {
cancelCh := &channel[Request[Payload]]{}
roundTripCh := &channel[Status[Value]]{}
pw := &sender[Payload, Value]{
req: req,
sendChannel: roundTripCh,
}
pr := &receiver{
pr := &receiver[Payload, Value]{
req: req,
recvChannel: roundTripCh,
sendChannel: cancelCh,
}
p := &Pipe{
p := &Pipe[Payload, Value]{
Sender: pw,
Receiver: pr,
}
@@ -109,7 +105,7 @@ func New(req Request) *Pipe {
cancelCh.OnSendCompletion = func() {
v, ok := cancelCh.Receive()
if ok {
pw.setRequest(v.(Request))
pw.setRequest(v)
}
if p.OnReceiveCompletion != nil {
p.OnReceiveCompletion()
@@ -125,38 +121,36 @@ func New(req Request) *Pipe {
return p
}
type sender struct {
status Status
req Request
sendChannel *channel
type sender[Payload, Value any] struct {
status Status[Value]
req Request[Payload]
sendChannel *channel[Status[Value]]
mu sync.Mutex
}
func (pw *sender) Status() Status {
func (pw *sender[Payload, Value]) Status() Status[Value] {
return pw.status
}
func (pw *sender) Request() Request {
func (pw *sender[Payload, Value]) Request() Request[Payload] {
pw.mu.Lock()
defer pw.mu.Unlock()
return pw.req
}
func (pw *sender) setRequest(req Request) {
func (pw *sender[Payload, Value]) setRequest(req Request[Payload]) {
pw.mu.Lock()
defer pw.mu.Unlock()
pw.req = req
}
func (pw *sender) Update(v interface{}) {
func (pw *sender[Payload, Value]) Update(v Value) {
pw.status.Value = v
pw.sendChannel.Send(pw.status)
}
func (pw *sender) Finalize(v interface{}, err error) {
if v != nil {
pw.status.Value = v
}
func (pw *sender[Payload, Value]) Finalize(v Value, err error) {
pw.status.Value = v
pw.status.Err = err
pw.status.Completed = true
if errors.Is(err, context.Canceled) && pw.req.Canceled {
@@ -165,27 +159,27 @@ func (pw *sender) Finalize(v interface{}, err error) {
pw.sendChannel.Send(pw.status)
}
type receiver struct {
status Status
req Request
recvChannel *channel
sendChannel *channel
type receiver[Payload, Value any] struct {
status Status[Value]
req Request[Payload]
recvChannel *channel[Status[Value]]
sendChannel *channel[Request[Payload]]
}
func (pr *receiver) Request() interface{} {
func (pr *receiver[Payload, Value]) Request() Payload {
return pr.req.Payload
}
func (pr *receiver) Receive() bool {
func (pr *receiver[Payload, Value]) Receive() bool {
v, ok := pr.recvChannel.Receive()
if !ok {
return false
}
pr.status = v.(Status)
pr.status = v
return true
}
func (pr *receiver) Cancel() {
func (pr *receiver[Payload, Value]) Cancel() {
req := pr.req
if req.Canceled {
return
@@ -194,6 +188,6 @@ func (pr *receiver) Cancel() {
pr.sendChannel.Send(req)
}
func (pr *receiver) Status() Status {
func (pr *receiver[Payload, Value]) Status() Status[Value] {
return pr.status
}

View File

@@ -11,10 +11,10 @@ func TestPipe(t *testing.T) {
t.Parallel()
runCh := make(chan struct{})
f := func(ctx context.Context) (interface{}, error) {
f := func(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
return "", context.Cause(ctx)
case <-runCh:
return "res0", nil
}
@@ -27,7 +27,7 @@ func TestPipe(t *testing.T) {
waitSignal <- struct{}{}
}
p, start := NewWithFunction(f)
p, start := NewWithFunction[any](f)
p.OnSendCompletion = signal
go start()
require.Equal(t, false, p.Receiver.Receive())
@@ -35,7 +35,7 @@ func TestPipe(t *testing.T) {
st := p.Receiver.Status()
require.Equal(t, false, st.Completed)
require.Equal(t, false, st.Canceled)
require.Nil(t, st.Value)
require.Zero(t, st.Value)
require.Equal(t, 0, signalled)
close(runCh)
@@ -46,17 +46,17 @@ func TestPipe(t *testing.T) {
require.Equal(t, true, st.Completed)
require.Equal(t, false, st.Canceled)
require.NoError(t, st.Err)
require.Equal(t, "res0", st.Value.(string))
require.Equal(t, "res0", st.Value)
}
func TestPipeCancel(t *testing.T) {
t.Parallel()
runCh := make(chan struct{})
f := func(ctx context.Context) (interface{}, error) {
f := func(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
return "", context.Cause(ctx)
case <-runCh:
return "res0", nil
}
@@ -69,7 +69,7 @@ func TestPipeCancel(t *testing.T) {
waitSignal <- struct{}{}
}
p, start := NewWithFunction(f)
p, start := NewWithFunction[any](f)
p.OnSendCompletion = signal
go start()
p.Receiver.Receive()
@@ -77,7 +77,7 @@ func TestPipeCancel(t *testing.T) {
st := p.Receiver.Status()
require.Equal(t, false, st.Completed)
require.Equal(t, false, st.Canceled)
require.Nil(t, st.Value)
require.Zero(t, st.Value)
require.Equal(t, 0, signalled)
p.Receiver.Cancel()

View File

@@ -13,9 +13,11 @@ import (
"github.com/tonistiigi/go-csvvalue"
)
var debugScheduler = false // TODO: replace with logs in build trace
var debugSchedulerSteps []string
var debugSchedulerStepsParseOnce sync.Once
var (
debugScheduler = false // TODO: replace with logs in build trace
debugSchedulerSteps []string
debugSchedulerStepsParseOnce sync.Once
)
func init() {
if os.Getenv("BUILDKIT_SCHEDULER_DEBUG") == "1" {
@@ -121,17 +123,17 @@ func (s *scheduler) loop() {
// dispatch schedules an edge to be processed
func (s *scheduler) dispatch(e *edge) {
inc := make([]pipe.Sender, len(s.incoming[e]))
inc := make([]pipeSender, len(s.incoming[e]))
for i, p := range s.incoming[e] {
inc[i] = p.Sender
}
out := make([]pipe.Receiver, len(s.outgoing[e]))
out := make([]pipeReceiver, len(s.outgoing[e]))
for i, p := range s.outgoing[e] {
out[i] = p.Receiver
}
e.hasActiveOutgoing = false
updates := []pipe.Receiver{}
updates := []pipeReceiver{}
for _, p := range out {
if ok := p.Receive(); ok {
updates = append(updates, p)
@@ -272,7 +274,7 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error)
wait := make(chan struct{})
p := s.newPipe(e, nil, pipe.Request{Payload: &edgeRequest{desiredState: edgeStatusComplete}})
p := s.newPipe(e, nil, pipeRequest{Payload: &edgeRequest{desiredState: edgeStatusComplete}})
p.OnSendCompletion = func() {
p.Receiver.Receive()
if p.Receiver.Status().Completed {
@@ -298,9 +300,9 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error)
}
// newPipe creates a new request pipe between two edges
func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe {
func (s *scheduler) newPipe(target, from *edge, req pipeRequest) *pipe.Pipe[*edgeRequest, any] {
p := &edgePipe{
Pipe: pipe.New(req),
Pipe: newPipe(req),
Target: target,
From: from,
}
@@ -324,8 +326,8 @@ func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe {
}
// newRequestWithFunc creates a new request pipe that invokes a async function
func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interface{}, error)) pipe.Receiver {
pp, start := pipe.NewWithFunction(f)
func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (any, error)) pipeReceiver {
pp, start := pipe.NewWithFunction[*edgeRequest](f)
p := &edgePipe{
Pipe: pp,
From: e,
@@ -395,7 +397,7 @@ type pipeFactory struct {
s *scheduler
}
func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver {
func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipeReceiver {
target := pf.s.ef.getEdge(ee)
if target == nil {
bklog.G(context.TODO()).
@@ -407,14 +409,14 @@ func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver
return nil, errdefs.Internal(errors.Errorf("failed to get edge: inconsistent graph state in edge %s %s %d", ee.Vertex.Name(), ee.Vertex.Digest(), ee.Index))
})
}
p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req})
p := pf.s.newPipe(target, pf.e, pipeRequest{Payload: req})
if pf.e.debug {
bklog.G(context.TODO()).Debugf("> newPipe %s %p desiredState=%s", ee.Vertex.Name(), p, req.desiredState)
}
return p.Receiver
}
func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipe.Receiver {
func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipeReceiver {
p := pf.s.newRequestWithFunc(pf.e, f)
if pf.e.debug {
bklog.G(context.TODO()).Debugf("> newFunc %p", p)
@@ -422,7 +424,7 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro
return p
}
func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) {
func debugSchedulerPreUnpark(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) {
log := bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
@@ -438,7 +440,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().(*edgeRequest).desiredState
des = dep.req.Request().desiredState
}
log.
WithField("dep_index", i).
@@ -457,7 +459,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_desired_state", req.Payload.(*edgeRequest).desiredState).
WithField("incoming_desired_state", req.Payload.desiredState).
WithField("incoming_canceled", req.Canceled).
Debug("> incoming")
}
@@ -499,7 +501,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
}
}
func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) {
func debugSchedulerPostUnpark(e *edge, inc []pipeSender) {
log := bklog.G(context.TODO())
for i, in := range inc {
log.