Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add Kafka Cluster Monitoring #21736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
piochelepiotr wants to merge 12 commits into master
base: master
Choose a base branch
Loading
from piotr-wolski/add-kafka-integration

Conversation

@piochelepiotr
Copy link
Contributor

@piochelepiotr piochelepiotr commented Oct 24, 2025
edited
Loading

What does this PR do?

Adds Kafka cluster monitoring capabilities to the kafka_consumer integration (preview feature). When enable_cluster_monitoring: true is set, the integration collects:

  • Broker metrics: count, leader count, partition count, and configurations
  • Topic & partition metrics: sizes, offsets, replication status, message rates
  • Consumer group metrics: member count, state, and details
  • Schema Registry metrics: subjects, versions, and full schemas (if URL provided)

Motivation

While the existing kafka_consumer integration provides consumer lag monitoring, customers need deeper visibility into their Kafka clusters without relying solely on JMX-based monitoring. This feature enables:

  • Proactive capacity planning: Track topic/partition growth and broker load distribution
  • Configuration auditing: Monitor broker and topic configurations with automatic change detection
  • Schema management: Track schema evolution and usage across topics
  • Operational insights: Consumer group health, under-replicated partitions, and offline detection

This complements the existing JMX-based kafka integration by providing Admin API-based metadata collection.

Review checklist (to be filled by reviewers)

  • Feature or bugfix MUST have appropriate tests (unit, integration, e2e)
  • Add the qa/skip-qa label if the PR doesn't need to be tested during QA.
  • If you need to backport this PR to another branch, you can add the backport/<branch-name> label to the PR and it will automatically open a backport PR once this one is merged

Copy link

codecov bot commented Oct 24, 2025
edited
Loading

Codecov Report

❌ Patch coverage is 79.60644% with 114 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.00%. Comparing base (bcd706c) to head (03ccd9c).
⚠️ Report is 27 commits behind head on master.

Additional details and impacted files
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

- **Consumer group metadata**: Member details and group state
- **Schema registry**: Schema information (if schema_registry_url is provided)

All cluster monitoring metrics are tagged with `kafka_cluster_id` for easy filtering.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not broker id as well? Cardinality? No info available? Something else?

Copy link
Contributor Author

@piochelepiotr piochelepiotr Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most metrics are cluster wide metrics, so not specific to only one broker. I will make sure that metrics specific to one broker are tagged with broker id.

labbati reacted with thumbs up emoji
Comment on lines +90 to +92
self._collect_broker_metadata = enable_cluster_monitoring
self._collect_topic_metadata = enable_cluster_monitoring
self._collect_consumer_group_metadata = enable_cluster_monitoring
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure why we are splitting the three of them since (1) all have the same value, and (2) in the only usages I can see (link1, link2) they are used in an OR condition or sequential IFs.

Unless you have in mind a future usage where you can use them independently.

Comment on lines +33 to +51
try:
# Collect broker information
if self.config._collect_broker_metadata:
self._collect_broker_metadata()

# Collect topic metadata
if self.config._collect_topic_metadata:
self._collect_topic_metadata()

# Collect consumer group metadata
if self.config._collect_consumer_group_metadata:
self._collect_consumer_group_metadata()

# Collect schema registry information
if self.config._collect_schema_registry:
self._collect_schema_registry_info()

except Exception as e:
self.log.error("Error collecting cluster metadata: %s", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to not collect, for example, topic metadata if collecting broker metadata failed?
I would suggest to split into four try/except (each with the proper error logged) unless:

  • we deliberately want to have no topic data if broker fails (and so on); or
  • it is deterministically know that if one fails also the others will fail; or
  • we do not want half-collected data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

@labbati labbati labbati left review comments

At least 1 approving review is required to merge this pull request.

Assignees

No one assigned

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

AltStyle によって変換されたページ (->オリジナル) /