Add admission plugin for update parallelism

Signed-off-by: helayoty <heelayot@microsoft.com>
This commit is contained in:
helayoty
2026-03-18 14:42:02 +00:00
parent 68e30095de
commit 981a333927
4 changed files with 450 additions and 0 deletions

View File

@@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
"k8s.io/kubernetes/plugin/pkg/admission/job"
"k8s.io/kubernetes/plugin/pkg/admission/limitranger"
"k8s.io/kubernetes/plugin/pkg/admission/network/defaultingressclass"
"k8s.io/kubernetes/plugin/pkg/admission/nodedeclaredfeatures"
@@ -49,6 +50,7 @@ var intentionallyOffPlugins = sets.New[string](
podsecurity.PluginName, // PodSecurity
podtopologylabels.PluginName, // PodTopologyLabels
nodedeclaredfeatures.PluginName, // NodeDeclaredFeatures
job.PluginName, // JobValidation
podresize.PluginName, // PodResize
)

View File

@@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/admission/extendedresourcetoleration"
"k8s.io/kubernetes/plugin/pkg/admission/gc"
"k8s.io/kubernetes/plugin/pkg/admission/imagepolicy"
jobadmission "k8s.io/kubernetes/plugin/pkg/admission/job"
"k8s.io/kubernetes/plugin/pkg/admission/limitranger"
"k8s.io/kubernetes/plugin/pkg/admission/namespace/autoprovision"
"k8s.io/kubernetes/plugin/pkg/admission/namespace/exists"
@@ -102,6 +103,7 @@ var AllOrderedPlugins = []string{
denyserviceexternalips.PluginName, // DenyServiceExternalIPs
podtopologylabels.PluginName, // PodTopologyLabels
nodedeclaredfeatures.PluginName, // NodeDeclaredFeatureValidator
jobadmission.PluginName, // JobValidation, only active when feature gate EnableWorkloadWithJob is enabled.
podresize.PluginName, // PodResizeValidator
// new admission plugins should generally be inserted above here
@@ -157,6 +159,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
certsubjectrestriction.Register(plugins)
podtopologylabels.Register(plugins)
nodedeclaredfeatures.Register(plugins)
jobadmission.Register(plugins)
podresize.Register(plugins)
}
@@ -187,6 +190,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] {
mutatingadmissionpolicy.PluginName, // Mutatingadmissionpolicy, only active when feature gate MutatingAdmissionpolicy is enabled
validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled
nodedeclaredfeatures.PluginName, // NodeDeclaredFeatureValidator, only active when feature gate NodeDeclaredFeatures is enabled
jobadmission.PluginName, // JobValidation, only active when feature gate EnableWorkloadWithJob is enabled
podresize.PluginName, // PodResizeValidator, only active when feature gate InPlacePodVerticalScaling is enabled
)

View File

@@ -0,0 +1,162 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"context"
"fmt"
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
schedulingv1alpha2listers "k8s.io/client-go/listers/scheduling/v1alpha2"
"k8s.io/component-base/featuregate"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/ptr"
)
const (
// PluginName is the name of the admission plugin.
PluginName = "JobValidation"
)
// Register registers the plugin.
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return NewPlugin(), nil
})
}
var _ admission.Interface = &Plugin{}
var _ admission.ValidationInterface = &Plugin{}
var _ = genericadmissioninitializer.WantsExternalKubeInformerFactory(&Plugin{})
var _ genericadmissioninitializer.WantsFeatures = &Plugin{}
// Plugin is an admission controller that validates Job updates
// against gang-scheduled PodGroups.
type Plugin struct {
*admission.Handler
genericWorkloadEnabled bool
enableWorkloadWithJobEnabled bool
inspectedFeatureGates bool
pgLister schedulingv1alpha2listers.PodGroupLister
}
// NewPlugin creates a new JobValidation admission plugin.
func NewPlugin() *Plugin {
return &Plugin{
Handler: admission.NewHandler(admission.Update),
}
}
func (p *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) {
p.genericWorkloadEnabled = featureGates.Enabled(features.GenericWorkload)
p.enableWorkloadWithJobEnabled = featureGates.Enabled(features.EnableWorkloadWithJob)
p.inspectedFeatureGates = true
}
func (p *Plugin) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
if !p.genericWorkloadEnabled {
return
}
pgInformer := f.Scheduling().V1alpha2().PodGroups()
p.pgLister = pgInformer.Lister()
p.SetReadyFunc(pgInformer.Informer().HasSynced)
}
// ValidateInitialization ensures the lister is set when the feature gate is enabled.
func (p *Plugin) ValidateInitialization() error {
if !p.inspectedFeatureGates {
return fmt.Errorf("%s has not inspected feature gates", PluginName)
}
if p.genericWorkloadEnabled && p.pgLister == nil {
return fmt.Errorf("missing PodGroup lister")
}
return nil
}
// Validate performs admission checks on Job updates that require
// cross-referencing other API objects.
func (p *Plugin) Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
if a.GetResource().GroupResource() != batch.Resource("jobs") {
return nil
}
if a.GetSubresource() != "" {
return nil
}
job, ok := a.GetObject().(*batch.Job)
if !ok {
return nil
}
oldJob, ok := a.GetOldObject().(*batch.Job)
if !ok {
return nil
}
if err := p.validateParallelismChange(a, job, oldJob); err != nil {
return err
}
return nil
}
// validateParallelismChange rejects parallelism changes on Jobs whose
// PodGroup uses gang scheduling.
func (p *Plugin) validateParallelismChange(a admission.Attributes, job, oldJob *batch.Job) error {
if !p.genericWorkloadEnabled && !p.enableWorkloadWithJobEnabled {
return nil
}
if ptr.Equal(job.Spec.Parallelism, oldJob.Spec.Parallelism) {
return nil
}
// When SchedulingGroup is set in the template, look up that PodGroup directly.
sg := oldJob.Spec.Template.Spec.SchedulingGroup
if sg != nil && sg.PodGroupName != nil {
pg, err := p.pgLister.PodGroups(oldJob.Namespace).Get(*sg.PodGroupName)
if err != nil {
return nil
}
if pg.Spec.SchedulingPolicy.Gang != nil {
return admission.NewForbidden(a, fmt.Errorf(
"cannot change parallelism for a Job referencing gang-scheduled PodGroup %q", pg.Name))
}
return nil
}
// When SchedulingGroup is not in the template, scan PodGroups in the namespace owned by this Job.
pgs, err := p.pgLister.PodGroups(oldJob.Namespace).List(labels.Everything())
if err != nil {
return nil
}
for _, pg := range pgs {
if !metav1.IsControlledBy(pg, oldJob) {
continue
}
if pg.Spec.SchedulingPolicy.Gang != nil {
return admission.NewForbidden(a, fmt.Errorf(
"cannot change parallelism for a Job referencing gang-scheduled PodGroup %q", pg.Name))
}
}
return nil
}

View File

@@ -0,0 +1,282 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"testing"
schedulingv1alpha2 "k8s.io/api/scheduling/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/ptr"
)
func TestGangSchedulingParallelism(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
gangPG := &schedulingv1alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "gang-pg",
Namespace: metav1.NamespaceDefault,
},
Spec: schedulingv1alpha2.PodGroupSpec{
SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{
Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: 4},
},
},
}
basicPG := &schedulingv1alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "basic-pg",
Namespace: metav1.NamespaceDefault,
},
Spec: schedulingv1alpha2.PodGroupSpec{
SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{
Basic: &schedulingv1alpha2.BasicSchedulingPolicy{},
},
},
}
indexedMode := batch.IndexedCompletion
baseJob := func(sgName *string) *batch.Job {
j := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: metav1.NamespaceDefault,
UID: types.UID("test-job-uid"),
},
Spec: batch.JobSpec{
CompletionMode: &indexedMode,
Completions: ptr.To[int32](4),
Parallelism: ptr.To[int32](4),
},
}
if sgName != nil {
j.Spec.Template.Spec.SchedulingGroup = &api.PodSchedulingGroup{
PodGroupName: sgName,
}
}
return j
}
cases := map[string]struct {
enableFeatureGate bool
oldJob *batch.Job
newJob *batch.Job
podGroups []*schedulingv1alpha2.PodGroup
wantErr bool
}{
"feature gate disabled: allows parallelism change": {
oldJob: baseJob(ptr.To("gang-pg")),
newJob: func() *batch.Job {
j := baseJob(ptr.To("gang-pg"))
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
podGroups: []*schedulingv1alpha2.PodGroup{gangPG},
},
"no schedulingGroup: skips check (handled by validation)": {
enableFeatureGate: true,
oldJob: baseJob(nil),
newJob: func() *batch.Job {
j := baseJob(nil)
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
},
"parallelism unchanged: allows": {
enableFeatureGate: true,
oldJob: baseJob(ptr.To("gang-pg")),
newJob: baseJob(ptr.To("gang-pg")),
podGroups: []*schedulingv1alpha2.PodGroup{gangPG},
},
"gang PodGroup: rejects parallelism change": {
enableFeatureGate: true,
oldJob: baseJob(ptr.To("gang-pg")),
newJob: func() *batch.Job {
j := baseJob(ptr.To("gang-pg"))
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
podGroups: []*schedulingv1alpha2.PodGroup{gangPG},
wantErr: true,
},
"basic PodGroup: allows parallelism change": {
enableFeatureGate: true,
oldJob: baseJob(ptr.To("basic-pg")),
newJob: func() *batch.Job {
j := baseJob(ptr.To("basic-pg"))
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
podGroups: []*schedulingv1alpha2.PodGroup{basicPG},
},
"PodGroup not found: allows parallelism change": {
enableFeatureGate: true,
oldJob: baseJob(ptr.To("missing-pg")),
newJob: func() *batch.Job {
j := baseJob(ptr.To("missing-pg"))
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
},
"controller-created gang PodGroup - rejects parallelism change": {
enableFeatureGate: true,
oldJob: baseJob(nil),
newJob: func() *batch.Job {
j := baseJob(nil)
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
podGroups: []*schedulingv1alpha2.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-created-pg",
Namespace: metav1.NamespaceDefault,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Kind: "Job",
Name: "test-job",
UID: types.UID("test-job-uid"),
Controller: ptr.To(true),
},
},
},
Spec: schedulingv1alpha2.PodGroupSpec{
SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{
Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: 4},
},
},
},
},
wantErr: true,
},
"controller-created basic PodGroup - allows parallelism change": {
enableFeatureGate: true,
oldJob: baseJob(nil),
newJob: func() *batch.Job {
j := baseJob(nil)
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
podGroups: []*schedulingv1alpha2.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-created-pg",
Namespace: metav1.NamespaceDefault,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Kind: "Job",
Name: "test-job",
UID: types.UID("test-job-uid"),
Controller: ptr.To(true),
},
},
},
Spec: schedulingv1alpha2.PodGroupSpec{
SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{
Basic: &schedulingv1alpha2.BasicSchedulingPolicy{},
},
},
},
},
},
"gang PodGroup owned by different Job - allows parallelism change": {
enableFeatureGate: true,
oldJob: baseJob(nil),
newJob: func() *batch.Job {
j := baseJob(nil)
j.Spec.Parallelism = ptr.To[int32](2)
return j
}(),
podGroups: []*schedulingv1alpha2.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "other-gang-pg",
Namespace: metav1.NamespaceDefault,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Kind: "Job",
Name: "other-job",
UID: types.UID("other-job-uid"),
Controller: ptr.To(true),
},
},
},
Spec: schedulingv1alpha2.PodGroupSpec{
SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{
Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: 4},
},
},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGatesDuringTest(t,
utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
features.GenericWorkload: tc.enableFeatureGate,
features.EnableWorkloadWithJob: tc.enableFeatureGate,
})
p := NewPlugin()
p.InspectFeatureGates(utilfeature.DefaultFeatureGate)
informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc())
p.SetExternalKubeInformerFactory(informerFactory)
for _, pg := range tc.podGroups {
if err := informerFactory.Scheduling().V1alpha2().PodGroups().Informer().GetStore().Add(pg); err != nil {
t.Fatalf("failed to add PodGroup: %v", err)
}
}
attrs := admission.NewAttributesRecord(
tc.newJob, tc.oldJob,
schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"},
metav1.NamespaceDefault, "test-job",
batch.Resource("jobs").WithVersion("v1"),
"", admission.Update, &metav1.UpdateOptions{}, false, nil,
)
err := p.Validate(ctx, attrs, nil)
if tc.wantErr && err == nil {
t.Error("expected error, got nil")
}
if !tc.wantErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}