Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Set up type hinting: add fixes in schema-registry module (mostly already typed) #2107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
fangnx wants to merge 30 commits into master
base: master
Choose a base branch
Loading
from typehinting-sr-fix

Conversation

@fangnx
Copy link
Member

@fangnx fangnx commented Oct 17, 2025
edited
Loading

What

Follow-up PR after #2041: adding missing types + correcting existing types, according to mypy static checker, in schema-registry module

What's left are functions that might require refactoring and more thorough investigation to get the types right:

  • common/avro.py, json_schema.py, protobuf.py: schemas are defined as unions, and we need to add guards to verify types accordingly during transformations
  • schema_registry_client: handling None return types for several functions
  • rules/cel: adding types for CEL operations
  • rules/encryption: handling None values propagated through functions. Need to understand the logic better

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA: https://confluentinc.atlassian.net/browse/DGS-22076

Test & Review

Open questions / Follow-ups

Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

This comment has been minimized.

@fangnx fangnx changed the title (削除) WIP: add type-related fixes in schema-registry module (削除ここまで) (追記) Set up type hinting: add fixes in schema-registry module (mostly already typed) (追記ここまで) Oct 20, 2025
@fangnx fangnx marked this pull request as ready for review October 20, 2025 22:15
@fangnx fangnx requested review from a team and MSeal as code owners October 20, 2025 22:15

This comment has been minimized.

This comment has been minimized.


def __init__(
self, key_uri: Optional[str], token: Optional[str], ns: Optional[str] = None,
self, key_uri: str, token: Optional[str], ns: Optional[str] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the optional part as we are already checking emptiness for key_url in https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/rules/encryption/hcvault/hcvault_driver.py#L53, and I think HcVaultKmsClient is supposed to be only created with a specified key_uri, according to the function doc

Not sure if this is considered a breaking change (if customers initializes HcVaultKmsClient directly in their code). @rayokota would love to hear your thoughts on this


def __init__(
self, key_uri: Optional[str], credentials: TokenCredential
self, key_uri: str, credentials: TokenCredential
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar change as the one in hcvault_client.py

This comment has been minimized.

return parsed_schema

named_schemas = _resolve_named_schema(schema, self._registry)
if schema.schema_str is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice the schema_str field should never be empty, and even it is I think it makes sense to raise the error to fail early here

Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions on the PR. We are introducing a lot of ignore comments in places like avro that feel off but maybe not worth tackling in this pass anyway.

referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)
ref_named_schemas = await _resolve_named_schema(referenced_schema.schema, schema_registry_client)
# References in registered schemas are validated by server to be complete
referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True) # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a need for type ignoring here? Can we set the ref/referenced_schema type to avoid this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, subject and version of SchemaReference never be None, but probably for historical reason (or tech debt) they have been typed as optional. I think updating the types will be a breaking change

Here, get_version() requires non-empty subject and version, so that's why I added the type ignore there. Alternatively we can do:

 if ref.subject is None or ref.version is None:
 # maybe log something
 continue
 referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)

schema_name = parsed_schema.get("name", schema_dict.get("type"))
if schema.schema_str is not None:
schema_dict = json.loads(schema.schema_str)
schema_name = parsed_schema.get("name", schema_dict.get("type")) # type: ignore[union-attr]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment that 'type' in the dict is for the schema type and not any language type hinting? Seeing it might be confusing without context right next to a type ignore call.

fangnx reacted with thumbs up emoji
def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
transform(rule_ctx, parsed_schema, msg, field_transform))
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None,
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None,# type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should spread this function args out per line if we're type commenting one. It read weird like this and I think is error prone to refactor bugs

buffer = fo.getvalue()

if latest_schema is not None:
if latest_schema is not NoneandctxisnotNoneandsubjectisnotNone:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changed the logic here. Are we certain it's a correct / tested change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_execute_rules_with_phase() already requires SerializationContext and subject to build RuleContext (

)

Base automatically changed from typehinting-kafka to master October 22, 2025 22:06
Copilot AI review requested due to automatic review settings October 23, 2025 04:15
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

This comment has been minimized.

Copy link

Failed

  • 75.50% Coverage on New Code (is less than 80.00%)

Analysis Details

42 Issues

Coverage and Duplications

  • Coverage 75.50% Coverage (67.00% Estimated after merge)
  • Duplications No duplication information (5.00% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

ValueError: If ctx is None.
"""
if ctx is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the name_strategy functions a bit: they have to follow the same function signature but SerializationContext is only required for some

self.bearer_field_provider = _StaticFieldProvider(static_token, logical_cluster, identity_pool)
if not isinstance(static_token, string_type):
raise TypeError("bearer.auth.token must be a str, not " + str(type(static_token)))
if self.bearer_auth_credentials_source == 'OAUTHBEARER':
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the indentation is intended to address the type issue of logical_cluster and identity_pool for building _AsyncOAuthClient: they must be non-empty, which we already check in line 280 and 284

This doesn't affect code logic: we check self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'} for the outer block, and those if-else branches are for OAUTHBEARER and STATIC_TOKEN respectively



def get_type(schema: JsonSchema) -> FieldType:
if isinstance(schema, list):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JsonSchema union type is either bool or dict. This was likely coped from avro.py

@fangnx fangnx requested review from MSeal and Copilot October 24, 2025 19:34
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 42 out of 42 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

await self.generate_access_token()

if self.token is None:
raise ValueError("Token is not set after the at")
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected incomplete error message from 'after the at' to 'after the attempt to generate it'.

Suggested change
raise ValueError("Token is not set after the at")
raise ValueError("Token is not set after the attempt to generate it")

Copilot uses AI. Check for mistakes.
self.token_endpoint, logical_cluster, identity_pool,
self.max_retries, self.retries_wait_ms,
self.retries_max_wait_ms)
else: # STATIC_TOKEN
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elif condition check has been replaced with else without verifying all possible values. This assumes only 'OAUTHBEARER' and 'STATIC_TOKEN' are valid values for bearer_auth_credentials_source. Consider adding an explicit elif condition for 'STATIC_TOKEN' to make the code more maintainable and guard against unexpected values.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already the outer block if self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'}:

self.token_endpoint, logical_cluster, identity_pool,
self.max_retries, self.retries_wait_ms,
self.retries_max_wait_ms)
else: # STATIC_TOKEN
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elif condition check has been replaced with else without verifying all possible values. This assumes only 'OAUTHBEARER' and 'STATIC_TOKEN' are valid values for bearer_auth_credentials_source. Consider adding an explicit elif condition for 'STATIC_TOKEN' to make the code more maintainable and guard against unexpected values.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already the outer block if self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'}: (line 273)

if subject_name is not None:
query['subject'] = subject_name
query: dict[str, Any] = {'offset': offset, 'limit': limit}
if subject_name is not None: query['subject'] = subject_name
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is incorrect on line 867. The 'query['subject'] = subject_name' statement should be on its own line with proper indentation.

Suggested change
if subject_name is not None:query['subject'] =subject_name
if subject_name is not None:
query['subject'] = subject_name

Copilot uses AI. Check for mistakes.
if subject_name is not None:
query['subject'] = subject_name
query: dict[str, Any] = {'offset': offset, 'limit': limit}
if subject_name is not None: query['subject'] = subject_name
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is incorrect on line 868. The 'query['subject'] = subject_name' statement should be on its own line with proper indentation.

Suggested change
if subject_name is not None:query['subject'] =subject_name
if subject_name is not None:
query['subject'] = subject_name

Copilot uses AI. Check for mistakes.
Comment on lines 83 to 84
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a mutable default argument (empty list) is dangerous as it will be shared across all instances. Use None as default and initialize inside the function instead.

Suggested change
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
self.topic_partitions = topic_partitions iftopic_partitionsisnotNoneelse []

Copilot uses AI. Check for mistakes.
inline_tags = get_inline_tags(reader_schema) if reader_schema is not None else None
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
reader_schema_raw, obj_dict,
inline_tags,field_transformer)
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space after comma between 'inline_tags' and 'field_transformer' arguments.

Suggested change
inline_tags,field_transformer)
inline_tags,field_transformer)

Copilot uses AI. Check for mistakes.
@fangnx fangnx requested a review from Copilot October 30, 2025 17:15
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 37 out of 38 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +295 to +298
if dek is None or dek.version is None:
new_version = 1
else:
new_version = dek.version + 1 if is_expired else 1
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for calculating new_version is confusing. When is_expired is False, new_version should be 1, but when is_expired is True, it should be dek.version + 1. The current conditional expression has this backwards - it increments when is_expired is True but sets to 1 when False. Consider: new_version = (dek.version + 1) if is_expired else 1

Copilot uses AI. Check for mistakes.
Comment on lines +212 to +215
if isinstance(schema, bool):
return FieldType.COMBINED
elif isinstance(schema, dict):
schema_type = schema.get("type")
else:
# string schemas; this could be either a named schema or a primitive type
schema_type = schema

schema_type = schema.get("type")
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling .get() on a boolean value will raise AttributeError. After checking isinstance(schema, bool), the function should return immediately before attempting to call schema.get(). The early return is missing on line 213.

Copilot uses AI. Check for mistakes.
Comment on lines +89 to +93
if ctx is None:
raise ValueError(
"SerializationContext is required for topic_subject_name_strategy. "
"Either provide a SerializationContext or use record_subject_name_strategy."
)
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a ValueError when ctx is None changes the API behavior. Previously, these functions accepted Optional[SerializationContext], and None would result in an AttributeError at line 94. The new explicit check is clearer, but this is a breaking change that could affect existing code that catches AttributeError. Consider documenting this as a breaking change.

Copilot uses AI. Check for mistakes.
if not isinstance(cache_capacity, (int, float)):
raise TypeError("cache.capacity must be a number, not " + str(type(cache_capacity)))
self.cache_capacity = cache_capacity
self.cache_capacity = int(cache_capacity)
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Converting float to int with int() truncates the decimal. If cache_capacity is 100.9, it becomes 100. Consider using round() instead for more intuitive behavior, or document that fractional values are truncated.

Suggested change
self.cache_capacity = int(cache_capacity)
self.cache_capacity = round(cache_capacity)

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +128
def _handle_partial_failures(
self,
batch_messages: List[Dict[str, Any]]
) -> None:
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This appears to be a pure formatting change (moving parameters to separate lines) unrelated to type hinting. Such formatting changes should typically be in a separate commit to keep type hinting changes focused.

Suggested change
def _handle_partial_failures(
self,
batch_messages: List[Dict[str, Any]]
) -> None:
def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

Copilot code review Copilot Copilot left review comments

@MSeal MSeal Awaiting requested review from MSeal MSeal is a code owner

Requested changes must be addressed to merge this pull request.

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

3 participants

AltStyle によって変換されたページ (->オリジナル) /