diff --git a/internal/controller/argocdcommitstatus_controller.go b/internal/controller/argocdcommitstatus_controller.go index 3ebbdb46d..c84e623f0 100644 --- a/internal/controller/argocdcommitstatus_controller.go +++ b/internal/controller/argocdcommitstatus_controller.go @@ -91,12 +91,11 @@ type appRevisionKey struct { // ArgoCDCommitStatusReconciler reconciles a ArgoCDCommitStatus object type ArgoCDCommitStatusReconciler struct { - Manager mcmanager.Manager - Recorder record.EventRecorder - SettingsMgr *settings.Manager - KubeConfigProvider *kubeconfig.Provider - localClient client.Client - watchLocalApplications bool + Manager mcmanager.Manager + Recorder record.EventRecorder + SettingsMgr *settings.Manager + KubeConfigProvider *kubeconfig.Provider + localClient client.Client } // URLTemplateData is the data passed to the URLTemplate in the ArgoCDCommitStatus. @@ -154,9 +153,15 @@ func (r *ArgoCDCommitStatusReconciler) Reconcile(ctx context.Context, req mcreco // TODO: we should setup a field index and only list apps related to the currently reconciled app apps := []ApplicationsInEnvironment{} + // Get the current watchLocalApplications setting (hot-reloadable) + watchLocalApplications, err := r.SettingsMgr.GetArgoCDCommitStatusControllersWatchLocalApplications(ctx) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get watchLocalApplications setting: %w", err) + } + // list clusters so we can query argocd applications from all clusters clusters := r.KubeConfigProvider.ListClusters() - if r.watchLocalApplications { + if watchLocalApplications { // The provider doesn't know about the local cluster, so we need to add it ourselves. clusters = append(clusters, mcmanager.LocalCluster) } @@ -444,12 +449,36 @@ func getMostRecentLastTransitionTime(aggregateItem []*aggregate) *metav1.Time { return mostRecentLastTransitionTime } -func lookupArgoCDCommitStatusFromArgoCDApplication(mgr mcmanager.Manager) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { +// filterAndLookupArgoCDCommitStatusFromApplication creates a handler that: +// 1. Filters events based on whether they come from local or provider clusters (hot-reloadable) +// 2. Maps Argo CD Application events to ArgoCDCommitStatus reconciliation requests +func filterAndLookupArgoCDCommitStatusFromApplication(mgr mcmanager.Manager, settingsMgr *settings.Manager) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] { return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, argoCDApplication client.Object) []mcreconcile.Request { - metrics.ApplicationWatchEventsHandled.Inc() - logger := log.FromContext(ctx) + isLocal := clusterName == "" + + // Hot-reload the watchLocalApplications setting + // If we can't read it (e.g., during startup before cache is ready), default to true + // to be safe and process the event rather than silently drop it. + watchLocal := true // safe default + if val, err := settingsMgr.GetArgoCDCommitStatusControllersWatchLocalApplications(ctx); err != nil { + logger.V(4).Info("failed to get watchLocalApplications setting, using default true", "error", err.Error()) + } else { + watchLocal = val + } + + // Filter based on cluster type + if isLocal && !watchLocal { + return nil + } + // Always watch provider clusters + watchProviders := true + if !isLocal && !watchProviders { + return nil + } + + metrics.ApplicationWatchEventsHandled.Inc() application := &argocd.Application{} @@ -539,27 +568,27 @@ func (r *ArgoCDCommitStatusReconciler) SetupWithManager(ctx context.Context, mcM return fmt.Errorf("failed to get ArgoCDCommitStatus max concurrent reconciles: %w", err) } - // Get the controller configuration to check if local Applications should be watched - watchLocalApplications, err := r.SettingsMgr.GetArgoCDCommitStatusControllersWatchLocalApplicationsDirect(ctx) - if err != nil { - return fmt.Errorf("failed to get controller configuration: %w", err) - } - - r.watchLocalApplications = watchLocalApplications - err = mcbuilder.ControllerManagedBy(mcMgr). - For(&promoterv1alpha1.ArgoCDCommitStatus{}, - mcbuilder.WithEngageWithLocalCluster(true), - mcbuilder.WithEngageWithProviderClusters(false), - mcbuilder.WithPredicates(predicate.GenerationChangedPredicate{}), - ). + Named("argocdcommitstatus"). WithOptions(controller.Options{ MaxConcurrentReconciles: maxConcurrentReconciles, RateLimiter: rateLimiter, UsePriorityQueue: ptr.To(true), }). - Watches(&argocd.Application{}, lookupArgoCDCommitStatusFromArgoCDApplication(mcMgr), - mcbuilder.WithEngageWithLocalCluster(watchLocalApplications), + // Watch ArgoCDCommitStatus resources only in the local cluster. + // We use Watches() with OnlyLocalCluster() instead of For() due to a bug in + // multicluster-runtime that prevents setting both WithEngageWithLocalCluster(true) + // and WithEngageWithProviderClusters(false) on For(). + // See: https://github.com/kubernetes-sigs/multicluster-runtime/issues/93 + Watches(&promoterv1alpha1.ArgoCDCommitStatus{}, + OnlyLocalCluster(), + mcbuilder.WithPredicates(predicate.GenerationChangedPredicate{})). + // Watch Applications using a custom handler that dynamically filters by cluster. + // We engage with ONLY provider clusters (not local), then the handler + // hot-reloads the watchLocalApplications setting and filters accordingly. + // This allows the configuration to be changed without restarting the controller. + Watches(&argocd.Application{}, + filterAndLookupArgoCDCommitStatusFromApplication(mcMgr, r.SettingsMgr), mcbuilder.WithEngageWithProviderClusters(true), mcbuilder.WithPredicates(applicationPredicate())). Complete(r) diff --git a/internal/controller/multicluster_handlers.go b/internal/controller/multicluster_handlers.go new file mode 100644 index 000000000..0ce5e6c85 --- /dev/null +++ b/internal/controller/multicluster_handlers.go @@ -0,0 +1,112 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "strings" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + + mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" +) + +// OnlyLocalCluster creates a handler that only processes events from the local cluster. +// The local cluster is identified by the mcmanager.LocalCluster constant. +func OnlyLocalCluster() mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { + return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] { + // LocalCluster is represented by an empty string + if clusterName != "" { + return &noOpHandler{} + } + return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl) + } +} + +// OnlyProviderClusters creates a handler that only processes events from provider clusters (not local). +func OnlyProviderClusters() mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { + return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] { + // Filter OUT local cluster (empty string means local) + if clusterName == "" { + return &noOpHandler{} + } + // Enqueue for all provider clusters + return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl) + } +} + +// FilterByPrefix creates a handler that only processes events from clusters +// matching the given provider prefix. +func FilterByPrefix(prefix string) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { + return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] { + if !strings.HasPrefix(clusterName, prefix+"#") { + return &noOpHandler{} + } + return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl) + } +} + +// FilterByPrefixes creates a handler that only processes events from clusters +// matching any of the given provider prefixes. +func FilterByPrefixes(prefixes ...string) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { + return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] { + for _, prefix := range prefixes { + if strings.HasPrefix(clusterName, prefix+"#") { + return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl) + } + } + return &noOpHandler{} + } +} + +// MaybeLocalOrProviderClusters creates a handler that processes events from local cluster +// (if watchLocal is true) and/or provider clusters (if watchProviders is true). +func MaybeLocalOrProviderClusters(watchLocal bool, watchProviders bool) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] { + return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] { + isLocal := clusterName == "" + + if isLocal && !watchLocal { + return &noOpHandler{} + } + + if !isLocal && !watchProviders { + return &noOpHandler{} + } + + return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl) + } +} + +// noOpHandler drops all events +type noOpHandler struct{} + +func (h *noOpHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +} + +func (h *noOpHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +} + +func (h *noOpHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +} + +func (h *noOpHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +} diff --git a/internal/settings/manager.go b/internal/settings/manager.go index 3892636fa..26794f6dd 100644 --- a/internal/settings/manager.go +++ b/internal/settings/manager.go @@ -123,6 +123,23 @@ func (m *Manager) GetControllerNamespace() string { return m.config.ControllerNamespace } +// GetArgoCDCommitStatusControllersWatchLocalApplications retrieves the WatchLocalApplications setting from the ArgoCDCommitStatus configuration. +// +// This function uses the cached client and should be called during normal reconciliation operations. +// For setup-time configuration before the cache is started, use GetArgoCDCommitStatusControllersWatchLocalApplicationsDirect instead. +// +// Parameters: +// - ctx: Context for the request, used for cancellation and deadlines +// +// Returns the configured WatchLocalApplications value, or an error if the configuration cannot be retrieved. +func (m *Manager) GetArgoCDCommitStatusControllersWatchLocalApplications(ctx context.Context) (bool, error) { + config, err := m.getControllerConfiguration(ctx) + if err != nil { + return false, fmt.Errorf("failed to get controller configuration: %w", err) + } + return config.Spec.ArgoCDCommitStatus.WatchLocalApplications, nil +} + // GetArgoCDCommitStatusControllersWatchLocalApplicationsDirect retrieves the WatchLocalApplications setting from the ArgoCDCommitStatus configuration // using a non-cached read. //