Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 54 additions & 25 deletions internal/controller/argocdcommitstatus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ type appRevisionKey struct {

// ArgoCDCommitStatusReconciler reconciles a ArgoCDCommitStatus object
type ArgoCDCommitStatusReconciler struct {
Manager mcmanager.Manager
Recorder record.EventRecorder
SettingsMgr *settings.Manager
KubeConfigProvider *kubeconfig.Provider
localClient client.Client
watchLocalApplications bool
Manager mcmanager.Manager
Recorder record.EventRecorder
SettingsMgr *settings.Manager
KubeConfigProvider *kubeconfig.Provider
localClient client.Client
}

// URLTemplateData is the data passed to the URLTemplate in the ArgoCDCommitStatus.
Expand Down Expand Up @@ -154,9 +153,15 @@ func (r *ArgoCDCommitStatusReconciler) Reconcile(ctx context.Context, req mcreco
// TODO: we should setup a field index and only list apps related to the currently reconciled app
apps := []ApplicationsInEnvironment{}

// Get the current watchLocalApplications setting (hot-reloadable)
watchLocalApplications, err := r.SettingsMgr.GetArgoCDCommitStatusControllersWatchLocalApplications(ctx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get watchLocalApplications setting: %w", err)
}

// list clusters so we can query argocd applications from all clusters
clusters := r.KubeConfigProvider.ListClusters()
if r.watchLocalApplications {
if watchLocalApplications {
// The provider doesn't know about the local cluster, so we need to add it ourselves.
clusters = append(clusters, mcmanager.LocalCluster)
}
Expand Down Expand Up @@ -444,12 +449,36 @@ func getMostRecentLastTransitionTime(aggregateItem []*aggregate) *metav1.Time {
return mostRecentLastTransitionTime
}

func lookupArgoCDCommitStatusFromArgoCDApplication(mgr mcmanager.Manager) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
// filterAndLookupArgoCDCommitStatusFromApplication creates a handler that:
// 1. Filters events based on whether they come from local or provider clusters (hot-reloadable)
// 2. Maps Argo CD Application events to ArgoCDCommitStatus reconciliation requests
func filterAndLookupArgoCDCommitStatusFromApplication(mgr mcmanager.Manager, settingsMgr *settings.Manager) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, argoCDApplication client.Object) []mcreconcile.Request {
metrics.ApplicationWatchEventsHandled.Inc()

logger := log.FromContext(ctx)
isLocal := clusterName == ""

// Hot-reload the watchLocalApplications setting
// If we can't read it (e.g., during startup before cache is ready), default to true
// to be safe and process the event rather than silently drop it.
watchLocal := true // safe default
if val, err := settingsMgr.GetArgoCDCommitStatusControllersWatchLocalApplications(ctx); err != nil {
logger.V(4).Info("failed to get watchLocalApplications setting, using default true", "error", err.Error())
} else {
watchLocal = val
}

// Filter based on cluster type
if isLocal && !watchLocal {
return nil
}
// Always watch provider clusters
watchProviders := true
if !isLocal && !watchProviders {
return nil
}

metrics.ApplicationWatchEventsHandled.Inc()

application := &argocd.Application{}

Expand Down Expand Up @@ -539,27 +568,27 @@ func (r *ArgoCDCommitStatusReconciler) SetupWithManager(ctx context.Context, mcM
return fmt.Errorf("failed to get ArgoCDCommitStatus max concurrent reconciles: %w", err)
}

// Get the controller configuration to check if local Applications should be watched
watchLocalApplications, err := r.SettingsMgr.GetArgoCDCommitStatusControllersWatchLocalApplicationsDirect(ctx)
if err != nil {
return fmt.Errorf("failed to get controller configuration: %w", err)
}

r.watchLocalApplications = watchLocalApplications

err = mcbuilder.ControllerManagedBy(mcMgr).
For(&promoterv1alpha1.ArgoCDCommitStatus{},
mcbuilder.WithEngageWithLocalCluster(true),
mcbuilder.WithEngageWithProviderClusters(false),
mcbuilder.WithPredicates(predicate.GenerationChangedPredicate{}),
).
Named("argocdcommitstatus").
WithOptions(controller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
RateLimiter: rateLimiter,
UsePriorityQueue: ptr.To(true),
}).
Watches(&argocd.Application{}, lookupArgoCDCommitStatusFromArgoCDApplication(mcMgr),
mcbuilder.WithEngageWithLocalCluster(watchLocalApplications),
// Watch ArgoCDCommitStatus resources only in the local cluster.
// We use Watches() with OnlyLocalCluster() instead of For() due to a bug in
// multicluster-runtime that prevents setting both WithEngageWithLocalCluster(true)
// and WithEngageWithProviderClusters(false) on For().
// See: https://github.com/kubernetes-sigs/multicluster-runtime/issues/93
Watches(&promoterv1alpha1.ArgoCDCommitStatus{},
OnlyLocalCluster(),
mcbuilder.WithPredicates(predicate.GenerationChangedPredicate{})).
// Watch Applications using a custom handler that dynamically filters by cluster.
// We engage with ONLY provider clusters (not local), then the handler
// hot-reloads the watchLocalApplications setting and filters accordingly.
// This allows the configuration to be changed without restarting the controller.
Watches(&argocd.Application{},
filterAndLookupArgoCDCommitStatusFromApplication(mcMgr, r.SettingsMgr),
mcbuilder.WithEngageWithProviderClusters(true),
mcbuilder.WithPredicates(applicationPredicate())).
Complete(r)
Expand Down
112 changes: 112 additions & 0 deletions internal/controller/multicluster_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"strings"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
)

// OnlyLocalCluster creates a handler that only processes events from the local cluster.
// The local cluster is identified by the mcmanager.LocalCluster constant.
func OnlyLocalCluster() mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
// LocalCluster is represented by an empty string
if clusterName != "" {
return &noOpHandler{}
}
return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl)
}
}

// OnlyProviderClusters creates a handler that only processes events from provider clusters (not local).
func OnlyProviderClusters() mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
// Filter OUT local cluster (empty string means local)
if clusterName == "" {
return &noOpHandler{}
}
// Enqueue for all provider clusters
return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl)
}
}

// FilterByPrefix creates a handler that only processes events from clusters
// matching the given provider prefix.
func FilterByPrefix(prefix string) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
if !strings.HasPrefix(clusterName, prefix+"#") {
return &noOpHandler{}
}
return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl)
}
}

// FilterByPrefixes creates a handler that only processes events from clusters
// matching any of the given provider prefixes.
func FilterByPrefixes(prefixes ...string) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
for _, prefix := range prefixes {
if strings.HasPrefix(clusterName, prefix+"#") {
return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl)
}
}
return &noOpHandler{}
}
}

// MaybeLocalOrProviderClusters creates a handler that processes events from local cluster
// (if watchLocal is true) and/or provider clusters (if watchProviders is true).
func MaybeLocalOrProviderClusters(watchLocal bool, watchProviders bool) mchandler.TypedEventHandlerFunc[client.Object, mcreconcile.Request] {
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, mcreconcile.Request] {
isLocal := clusterName == ""

if isLocal && !watchLocal {
return &noOpHandler{}
}

if !isLocal && !watchProviders {
return &noOpHandler{}
}

return mchandler.Lift(&handler.EnqueueRequestForObject{})(clusterName, cl)
}
}

// noOpHandler drops all events
type noOpHandler struct{}

func (h *noOpHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
}

func (h *noOpHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
}

func (h *noOpHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
}

func (h *noOpHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
}
17 changes: 17 additions & 0 deletions internal/settings/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ func (m *Manager) GetControllerNamespace() string {
return m.config.ControllerNamespace
}

// GetArgoCDCommitStatusControllersWatchLocalApplications retrieves the WatchLocalApplications setting from the ArgoCDCommitStatus configuration.
//
// This function uses the cached client and should be called during normal reconciliation operations.
// For setup-time configuration before the cache is started, use GetArgoCDCommitStatusControllersWatchLocalApplicationsDirect instead.
//
// Parameters:
// - ctx: Context for the request, used for cancellation and deadlines
//
// Returns the configured WatchLocalApplications value, or an error if the configuration cannot be retrieved.
func (m *Manager) GetArgoCDCommitStatusControllersWatchLocalApplications(ctx context.Context) (bool, error) {
config, err := m.getControllerConfiguration(ctx)
if err != nil {
return false, fmt.Errorf("failed to get controller configuration: %w", err)
}
return config.Spec.ArgoCDCommitStatus.WatchLocalApplications, nil
}

// GetArgoCDCommitStatusControllersWatchLocalApplicationsDirect retrieves the WatchLocalApplications setting from the ArgoCDCommitStatus configuration
// using a non-cached read.
//
Expand Down
Loading