Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Separate controller client concerns #4253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
cbandy merged 4 commits into CrunchyData:main from cbandy:client-concerns
Aug 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 15 additions & 52 deletions cmd/postgres-operator/main.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,23 @@ func main() {
must(manager.Add(registrar))
token, _ := registrar.CheckToken()

bridgeURL := os.Getenv("PGO_BRIDGE_URL")
bridgeClient := func() *bridge.Client {
client := bridge.NewClient(bridgeURL, versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

// add all PostgreSQL Operator controllers to the runtime manager
addControllersToManager(manager, log, registrar)
must(pgupgrade.ManagedReconciler(manager, registrar))
must(standalone_pgadmin.ManagedReconciler(manager))
must(crunchybridgecluster.ManagedReconciler(manager, func() bridge.ClientInterface {
return bridgeClient()
}))

if features.Enabled(feature.BridgeIdentifiers) {
url := os.Getenv("PGO_BRIDGE_URL")
constructor := func() *bridge.Client {
client := bridge.NewClient(url, versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

must(bridge.ManagedInstallationReconciler(manager, constructor))
must(bridge.ManagedInstallationReconciler(manager, bridgeClient))
}

// Enable upgrade checking
Expand Down Expand Up @@ -307,55 +312,13 @@ func main() {
func addControllersToManager(mgr runtime.Manager, log logging.Logger, reg registration.Registration) {
pgReconciler := &postgrescluster.Reconciler{
Client: mgr.GetClient(),
Owner: postgrescluster.ControllerName,
Recorder: mgr.GetEventRecorderFor(postgrescluster.ControllerName),
Owner: naming.ControllerPostgresCluster,
Recorder: mgr.GetEventRecorderFor(naming.ControllerPostgresCluster),
Registration: reg,
}

if err := pgReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create PostgresCluster controller")
os.Exit(1)
}

upgradeReconciler := &pgupgrade.PGUpgradeReconciler{
Client: mgr.GetClient(),
Owner: "pgupgrade-controller",
Recorder: mgr.GetEventRecorderFor("pgupgrade-controller"),
Registration: reg,
}

if err := upgradeReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create PGUpgrade controller")
os.Exit(1)
}

pgAdminReconciler := &standalone_pgadmin.PGAdminReconciler{
Client: mgr.GetClient(),
Owner: "pgadmin-controller",
Recorder: mgr.GetEventRecorderFor(naming.ControllerPGAdmin),
}

if err := pgAdminReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create PGAdmin controller")
os.Exit(1)
}

constructor := func() bridge.ClientInterface {
client := bridge.NewClient(os.Getenv("PGO_BRIDGE_URL"), versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

crunchyBridgeClusterReconciler := &crunchybridgecluster.CrunchyBridgeClusterReconciler{
Client: mgr.GetClient(),
Owner: "crunchybridgecluster-controller",
// TODO(crunchybridgecluster): recorder?
// Recorder: mgr.GetEventRecorderFor(naming...),
NewClient: constructor,
}

if err := crunchyBridgeClusterReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create CrunchyBridgeCluster controller")
os.Exit(1)
}
}
20 changes: 3 additions & 17 deletions internal/bridge/crunchybridgecluster/apply.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// patch sends patch to object's endpoint in the Kubernetes API and updates
// object with any returned content. The fieldManager is set to r.Owner, but
// can be overridden in options.
// - https://docs.k8s.io/reference/using-api/server-side-apply/#managers
//
// NOTE: This function is duplicated from a version in the postgrescluster package
func (r *CrunchyBridgeClusterReconciler) patch(
ctx context.Context, object client.Object,
patch client.Patch, options ...client.PatchOption,
) error {
options = append([]client.PatchOption{r.Owner}, options...)
return r.Patch(ctx, object, patch, options...)
}

// apply sends an apply patch to object's endpoint in the Kubernetes API and
// updates object with any returned content. The fieldManager is set to
// r.Owner and the force parameter is true.
// updates object with any returned content. The fieldManager is set by
// r.Writer and the force parameter is true.
// - https://docs.k8s.io/reference/using-api/server-side-apply/#managers
// - https://docs.k8s.io/reference/using-api/server-side-apply/#conflicts
//
Expand All @@ -40,7 +26,7 @@ func (r *CrunchyBridgeClusterReconciler) apply(ctx context.Context, object clien

// Send the apply-patch with force=true.
if err == nil {
err = r.patch(ctx, object, apply, client.ForceOwnership)
err = r.Writer.Patch(ctx, object, apply, client.ForceOwnership)
}

return err
Expand Down
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,38 @@ import (

// CrunchyBridgeClusterReconciler reconciles a CrunchyBridgeCluster object
type CrunchyBridgeClusterReconciler struct {
client.Client

Owner client.FieldOwner

// For this iteration, we will only be setting conditions rather than
// setting conditions and emitting events. That may change in the future,
// so we're leaving this EventRecorder here for now.
// record.EventRecorder

// NewClient is called each time a new Client is needed.
// NewClient is called each time a new bridge.Client is needed.
NewClient func() bridge.ClientInterface

Reader interface {
Get(context.Context, client.ObjectKey, client.Object, ...client.GetOption) error
List(context.Context, client.ObjectList, ...client.ListOption) error
}
Writer interface {
Delete(context.Context, client.Object, ...client.DeleteOption) error
Patch(context.Context, client.Object, client.Patch, ...client.PatchOption) error
Update(context.Context, client.Object, ...client.UpdateOption) error
}
StatusWriter interface {
Patch(context.Context, client.Object, client.Patch, ...client.SubResourcePatchOption) error
}
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list,watch}
//+kubebuilder:rbac:groups="",resources="secrets",verbs={list,watch}

// SetupWithManager sets up the controller with the Manager.
func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
mgr ctrl.Manager,
) error {
return ctrl.NewControllerManagedBy(mgr).
// ManagedReconciler creates a [CrunchyBridgeClusterReconciler] and adds it to m.
func ManagedReconciler(m ctrl.Manager, newClient func() bridge.ClientInterface) error {
kubernetes := client.WithFieldOwner(m.GetClient(), naming.ControllerCrunchyBridgeCluster)

reconciler := &CrunchyBridgeClusterReconciler{
NewClient: newClient,
Reader: kubernetes,
StatusWriter: kubernetes.Status(),
Writer: kubernetes,
}

return ctrl.NewControllerManagedBy(m).
For(&v1beta1.CrunchyBridgeCluster{}).
Owns(&corev1.Secret{}).
// Wake periodically to check Bridge API for all CrunchyBridgeClusters.
Expand All @@ -63,7 +74,7 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
runtime.NewTickerImmediate(5*time.Minute, event.GenericEvent{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []ctrl.Request {
var list v1beta1.CrunchyBridgeClusterList
_ = r.List(ctx, &list)
_ = reconciler.Reader.List(ctx, &list)
return runtime.Requests(initialize.Pointers(list.Items...)...)
}),
),
Expand All @@ -72,10 +83,10 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
Watches(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
return runtime.Requests(r.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
return runtime.Requests(reconciler.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
}),
).
Complete(r)
Complete(reconciler)
}

// The owner reference created by controllerutil.SetControllerReference blocks
Expand All @@ -91,7 +102,7 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
func (r *CrunchyBridgeClusterReconciler) setControllerReference(
owner *v1beta1.CrunchyBridgeCluster, controlled client.Object,
) error {
return controllerutil.SetControllerReference(owner, controlled, r.Scheme())
return controllerutil.SetControllerReference(owner, controlled, runtime.Scheme)
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={get,patch,update}
Expand All @@ -113,14 +124,14 @@ func (r *CrunchyBridgeClusterReconciler) Reconcile(ctx context.Context, req ctrl
// copy before returning from its cache.
// - https://github.com/kubernetes-sigs/controller-runtime/issues/1235
crunchybridgecluster := &v1beta1.CrunchyBridgeCluster{}
err := r.Get(ctx, req.NamespacedName, crunchybridgecluster)
err := r.Reader.Get(ctx, req.NamespacedName, crunchybridgecluster)

if err == nil {
// Write any changes to the crunchybridgecluster status on the way out.
before := crunchybridgecluster.DeepCopy()
defer func() {
if !equality.Semantic.DeepEqual(before.Status, crunchybridgecluster.Status) {
status := r.Status().Patch(ctx, crunchybridgecluster, client.MergeFrom(before), r.Owner)
status := r.StatusWriter.Patch(ctx, crunchybridgecluster, client.MergeFrom(before))

if err == nil && status != nil {
err = status
Expand Down Expand Up @@ -684,7 +695,7 @@ func (r *CrunchyBridgeClusterReconciler) GetSecretKeys(
}}

err := errors.WithStack(
r.Get(ctx, client.ObjectKeyFromObject(existing), existing))
r.Reader.Get(ctx, client.ObjectKeyFromObject(existing), existing))

if err == nil {
if existing.Data["key"] != nil && existing.Data["team"] != nil {
Expand All @@ -707,7 +718,7 @@ func (r *CrunchyBridgeClusterReconciler) deleteControlled(
version := object.GetResourceVersion()
exactly := client.Preconditions{UID: &uid, ResourceVersion: &version}

return r.Delete(ctx, object, exactly)
return r.Writer.Delete(ctx, object, exactly)
}

return nil
Expand Down
Loading
Loading

AltStyle によって変換されたページ (->オリジナル) /