Skip to content

Commit f4f5ee9

Browse files
authored
fix: missing jobs and cronjobs controllers and mutations (#40)
1 parent 74de784 commit f4f5ee9

File tree

2 files changed

+44
-6
lines changed

2 files changed

+44
-6
lines changed

pkg/kubernetes/apiserver/webhooks.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/rs/zerolog/log"
1515
admissionv1 "k8s.io/api/admission/v1"
1616
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
17+
batchv1 "k8s.io/api/batch/v1"
1718
corev1 "k8s.io/api/core/v1"
1819
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1920
"k8s.io/apimachinery/pkg/runtime"
@@ -105,6 +106,8 @@ func (w *webhoook) serveMutate(resp http.ResponseWriter, req *http.Request) {
105106
patches = w.processPodMutation(admissionReview)
106107
case "PersistentVolumeClaim":
107108
patches = w.processPVCMutation(admissionReview)
109+
case "Job":
110+
patches = w.processJobMutation(admissionReview)
108111
}
109112

110113
w.sendResponse(resp, admissionReview, patches)
@@ -229,6 +232,37 @@ func (w *webhoook) processPVCMutation(admissionReview *admissionv1.AdmissionRevi
229232
return []map[string]interface{}{patch}
230233
}
231234

235+
// processJobMutation processes the job mutation
236+
func (w *webhoook) processJobMutation(admissionReview *admissionv1.AdmissionReview) []map[string]interface{} {
237+
var job batchv1.Job
238+
if err := json.Unmarshal(admissionReview.Request.Object.Raw, &job); err != nil {
239+
log.Error().Str("component", "webhook").Err(err).Msg("failed to unmarshal job")
240+
return nil
241+
}
242+
243+
log.Debug().Str("component", "webhook").
244+
Str("job", job.Name).
245+
Str("namespace", job.Namespace).
246+
Msg("processing job")
247+
248+
// Add node selector to ensure the job's pods run on our node
249+
patch := map[string]interface{}{
250+
"op": "add",
251+
"path": "/spec/template/spec/nodeSelector",
252+
"value": map[string]string{
253+
"kubernetes.io/hostname": w.nodeName,
254+
},
255+
}
256+
257+
log.Info().Str("component", "webhook").
258+
Str("job", job.Name).
259+
Str("namespace", job.Namespace).
260+
Str("node", w.nodeName).
261+
Msg("setting node selector for job")
262+
263+
return []map[string]interface{}{patch}
264+
}
265+
232266
// sendResponse sends the response to the admission review
233267
func (w *webhoook) sendResponse(resp http.ResponseWriter, admissionReview *admissionv1.AdmissionReview, patches []map[string]interface{}) {
234268
admissionResponse := w.createAdmissionResponse(admissionReview, patches)
@@ -305,9 +339,9 @@ func (w *webhoook) createConfiguration() (*admissionregistrationv1.MutatingWebho
305339
admissionregistrationv1.Create,
306340
},
307341
Rule: admissionregistrationv1.Rule{
308-
APIGroups: []string{"", "apps"},
342+
APIGroups: []string{"", "apps", "batch"},
309343
APIVersions: []string{"v1"},
310-
Resources: []string{"pods", "persistentvolumeclaims"},
344+
Resources: []string{"pods", "persistentvolumeclaims", "jobs"},
311345
},
312346
},
313347
},
@@ -324,7 +358,7 @@ func (w *webhoook) createConfiguration() (*admissionregistrationv1.MutatingWebho
324358

325359
// createOrUpdateConfig creates or updates the webhook configuration
326360
func (w *webhoook) createOrUpdateConfig(webhookConfig *admissionregistrationv1.MutatingWebhookConfiguration) error {
327-
_, err := w.clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(
361+
existingConfig, err := w.clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(
328362
context.Background(),
329363
types.DefaultWebhookName,
330364
metav1.GetOptions{},
@@ -333,12 +367,16 @@ func (w *webhoook) createOrUpdateConfig(webhookConfig *admissionregistrationv1.M
333367
if err != nil {
334368
if k8serrors.IsNotFound(err) {
335369
return w.createConfig(webhookConfig)
336-
} else if k8serrors.IsAlreadyExists(err) {
337-
return w.updateConfig(webhookConfig)
338370
}
339371
return fmt.Errorf("failed to get webhook configuration: %v", err)
340372
}
341373

374+
webhookConfig.ResourceVersion = existingConfig.ResourceVersion
375+
err = w.updateConfig(webhookConfig)
376+
if err != nil {
377+
return err
378+
}
379+
342380
log.Info().Str("component", "webhook").Msgf("webhook %s registered with API server", types.DefaultWebhookName)
343381
return nil
344382
}

pkg/kubernetes/controller/flags.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func (s *service) configureControllerManagerFlags(command *cobra.Command) {
1616
_ = flags.Set("tls-cert-file", s.controllerManagerCertFile)
1717
_ = flags.Set("tls-private-key-file", s.controllerManagerKeyFile)
1818
_ = flags.Set("leader-elect", "false")
19-
_ = flags.Set("controllers", "deployment,replicaset,service,serviceaccount,namespace,attachdetach,endpoint,daemonset,statefulset,root-ca-certificate-publisher-controller,serviceaccount-token-controller,node-ipam-controller,endpointslice-controller,garbage-collector-controller,ttl-after-finished-controller,persistentvolume-binder-controller")
19+
_ = flags.Set("controllers", "deployment,replicaset,service,serviceaccount,namespace,attachdetach,endpoint,daemonset,statefulset,root-ca-certificate-publisher-controller,serviceaccount-token-controller,node-ipam-controller,endpointslice-controller,garbage-collector-controller,ttl-after-finished-controller,persistentvolume-binder-controller,job-controller,cronjob-controller")
2020
_ = flags.Set("profiling", "false")
2121
_ = flags.Set("use-service-account-credentials", "true")
2222
_ = flags.Set("bind-address", "0.0.0.0")

0 commit comments

Comments
 (0)