mirror of
https://github.com/containerd/containerd.git
synced 2026-06-30 19:58:29 +00:00
cri: reset pull progress timer on idle→active transition
The pull progress reporter resets lastSeenTimestamp on every tick where activeReqs == 0, but never on the transition to a non-zero count. When a pull is held in content.OpenWriter (idle in HTTP terms) and then unblocks, the next request can be cancelled less than `timeout` after it was actually issued — its first byte must arrive within whatever fraction of `timeout` remains on the timer captured during the previous idle tick. Track the previous tick's activeReqs and reset the timer on the 0→1 transition so a newly-issued request always gets a full timeout window to produce its first byte. This deflakes TestCRIImagePullTimeout/HoldingContentOpenWriterWithLocalPull, which hits ghcr.io directly and can exceed the shrunken window during auth handshakes in CI. Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
@@ -695,6 +695,7 @@ func (reporter *pullProgressReporter) start(ctx context.Context) {
|
||||
|
||||
lastSeenBytesRead = uint64(0)
|
||||
lastSeenTimestamp = time.Now()
|
||||
prevActiveReqs = int32(0)
|
||||
)
|
||||
|
||||
// check progress more frequently if timeout < default internal
|
||||
@@ -718,11 +719,19 @@ func (reporter *pullProgressReporter) start(ctx context.Context) {
|
||||
WithField("reportInterval", reportInterval).
|
||||
Debugf("progress for image pull")
|
||||
|
||||
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
|
||||
// Reset the no-progress timer when idle, when bytes were
|
||||
// read, or when a request has just started after an idle
|
||||
// period. Without the idle→active reset, a request can be
|
||||
// cancelled less than `timeout` after it was issued,
|
||||
// because lastSeenTimestamp was captured on an earlier
|
||||
// tick while activeReqs was still 0.
|
||||
if activeReqs == 0 || bytesRead > lastSeenBytesRead || prevActiveReqs == 0 {
|
||||
lastSeenBytesRead = bytesRead
|
||||
lastSeenTimestamp = time.Now()
|
||||
prevActiveReqs = activeReqs
|
||||
continue
|
||||
}
|
||||
prevActiveReqs = activeReqs
|
||||
|
||||
if time.Since(lastSeenTimestamp) > reporter.timeout {
|
||||
log.G(ctx).Errorf("cancel pulling image %s because of no progress in %v", reporter.ref, reporter.timeout)
|
||||
|
||||
@@ -757,3 +757,80 @@ func TestTransferProgressReporter(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestPullProgressReporter covers the core no-progress cancellation
|
||||
// behavior of pullProgressReporter: a stuck request (active, no bytes)
|
||||
// is eventually cancelled, while a progressing request is not.
|
||||
//
|
||||
// The flaky failure in TestCRIImagePullTimeout/HoldingContentOpenWriterWithLocalPull
|
||||
// — which this fix addresses — is a timing race that's not cleanly
|
||||
// expressible as a unit test: the boundary between the buggy and fixed
|
||||
// cancel times coincides at 1.5*timeout, so any check near that
|
||||
// threshold is scheduler-jitter prone. The semantic regression is
|
||||
// covered by the existing integration test.
|
||||
func TestPullProgressReporter(t *testing.T) {
|
||||
t.Run("StuckRequestStillGetsCancelled", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cancelCalled := make(chan struct{})
|
||||
reporter := newPullProgressReporter("test-image:latest", func() {
|
||||
select {
|
||||
case <-cancelCalled:
|
||||
default:
|
||||
close(cancelCalled)
|
||||
}
|
||||
}, 200*time.Millisecond)
|
||||
|
||||
// Start a request immediately (no idle period) and never produce
|
||||
// bytes. The reporter must cancel after timeout elapses.
|
||||
reporter.reqReporter.incRequest()
|
||||
reporter.start(ctx)
|
||||
|
||||
select {
|
||||
case <-cancelCalled:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("stuck request was not cancelled")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ProgressingRequestIsNotCancelled", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cancelCalled := make(chan struct{})
|
||||
reporter := newPullProgressReporter("test-image:latest", func() {
|
||||
select {
|
||||
case <-cancelCalled:
|
||||
default:
|
||||
close(cancelCalled)
|
||||
}
|
||||
}, 200*time.Millisecond)
|
||||
|
||||
reporter.reqReporter.incRequest()
|
||||
reporter.start(ctx)
|
||||
|
||||
// Advance bytes faster than timeout so the reporter keeps
|
||||
// refreshing lastSeenBytesRead.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
ticker := time.NewTicker(50 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for i := 0; i < 10; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
reporter.reqReporter.incByteRead(1024)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-cancelCalled:
|
||||
t.Fatal("pull was cancelled despite making byte progress")
|
||||
case <-done:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user