Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit ab09f28

Browse files
committed
Add SR serialization to the Azure IMDS producer example
1 parent 6df4cce commit ab09f28

File tree

2 files changed

+104
-17
lines changed

2 files changed

+104
-17
lines changed

‎examples/oauth_oidc_ccloud_azure_imds_producer.py‎

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,52 @@
2222
import logging
2323
import argparse
2424
from confluent_kafka import Producer
25-
from confluent_kafka.serialization import StringSerializer
25+
from confluent_kafka.schema_registry.json_schema import JSONSerializer
26+
from confluent_kafka.serialization import (StringSerializer,
27+
SerializationContext, MessageField)
28+
from confluent_kafka.schema_registry import SchemaRegistryClient
29+
30+
31+
class User(object):
32+
"""
33+
User record
34+
35+
Args:
36+
name (str): User's name
37+
38+
favorite_number (int): User's favorite number
39+
40+
favorite_color (str): User's favorite color
41+
42+
address(str): User's address; confidential
43+
"""
44+
45+
def __init__(self, name, address, favorite_number, favorite_color):
46+
self.name = name
47+
self.favorite_number = favorite_number
48+
self.favorite_color = favorite_color
49+
# address should not be serialized, see user_to_dict()
50+
self._address = address
51+
52+
53+
def user_to_dict(user, ctx):
54+
"""
55+
Returns a dict representation of a User instance for serialization.
56+
57+
Args:
58+
user (User): User instance.
59+
60+
ctx (SerializationContext): Metadata pertaining to the serialization
61+
operation.
62+
63+
Returns:
64+
dict: Dict populated with user attributes to be serialized.
65+
"""
66+
67+
# User._address must not be serialized; omit from dict
68+
return dict(name=user.name,
69+
favorite_number=user.favorite_number,
70+
favorite_color=user.favorite_color)
2671

2772

2873
def producer_config(args):
@@ -45,6 +90,21 @@ def producer_config(args):
4590
return params
4691

4792

93+
def schema_registry_config(args):
94+
params = {
95+
'url': args.schema_registry,
96+
'bearer.auth.credentials.source': 'OAUTHBEARER_AZURE_IMDS',
97+
'bearer.auth.issuer.endpoint.query': args.query,
98+
}
99+
# These two parameters are only applicable when producing to
100+
# confluent cloud where some sasl extensions are required.
101+
if args.logical_cluster and args.identity_pool_id:
102+
params['bearer.auth.logical.cluster'] = args.logical_cluster
103+
params['bearer.auth.identity.pool.id'] = args.identity_pool_id
104+
105+
return params
106+
107+
48108
def delivery_report(err, msg):
49109
"""
50110
Reports the failure or success of a message delivery.
@@ -72,27 +132,54 @@ def delivery_report(err, msg):
72132

73133
def main(args):
74134
topic = args.topic
75-
delimiter = args.delimiter
76135
producer_conf = producer_config(args)
77136
producer = Producer(producer_conf)
78-
serializer = StringSerializer('utf_8')
137+
string_serializer = StringSerializer('utf_8')
138+
schema_str = """
139+
{
140+
"$schema": "http://json-schema.org/draft-07/schema#",
141+
"title": "User",
142+
"description": "A Confluent Kafka Python User",
143+
"type": "object",
144+
"properties": {
145+
"name": {
146+
"description": "User's name",
147+
"type": "string"
148+
},
149+
"favorite_number": {
150+
"description": "User's favorite number",
151+
"type": "number",
152+
"exclusiveMinimum": 0
153+
},
154+
"favorite_color": {
155+
"description": "User's favorite color",
156+
"type": "string"
157+
}
158+
},
159+
"required": [ "name", "favorite_number", "favorite_color" ]
160+
}
161+
"""
162+
schema_registry_conf = schema_registry_config(args)
163+
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
164+
165+
string_serializer = StringSerializer('utf_8')
166+
json_serializer = JSONSerializer(schema_str, schema_registry_client, user_to_dict)
79167

80168
print('Producing records to topic {}. ^C to exit.'.format(topic))
81169
while True:
82170
# Serve on_delivery callbacks from previous calls to produce()
83171
producer.poll(0.0)
84172
try:
85-
msg_data = input(">")
86-
msg = msg_data.split(delimiter)
87-
if len(msg) == 2:
88-
producer.produce(topic=topic,
89-
key=serializer(msg[0]),
90-
value=serializer(msg[1]),
91-
on_delivery=delivery_report)
92-
else:
93-
producer.produce(topic=topic,
94-
value=serializer(msg[0]),
95-
on_delivery=delivery_report)
173+
name = input(">")
174+
user = User(name=name,
175+
address="NA",
176+
favorite_color="blue",
177+
favorite_number=7)
178+
serialized_user = json_serializer(user, SerializationContext(topic, MessageField.VALUE))
179+
producer.produce(topic=topic,
180+
key=string_serializer(name),
181+
value=serialized_user,
182+
on_delivery=delivery_report)
96183
except KeyboardInterrupt:
97184
break
98185

@@ -106,8 +193,8 @@ def main(args):
106193
help="Bootstrap broker(s) (host[:port])")
107194
parser.add_argument('-t', dest="topic", default="example_producer_oauth",
108195
help="Topic name")
109-
parser.add_argument('-d', dest="delimiter", default="|",
110-
help="Key-Value delimiter. Defaults to '|'"),
196+
parser.add_argument('-s', dest="schema_registry", required=True,
197+
help="Schema Registry (http(s)://host[:port]")
111198
parser.add_argument('--query', dest="query", required=True,
112199
help="Query parameters for Azure IMDS token endpoint")
113200
parser.add_argument('--logical-cluster', dest="logical_cluster", required=False, help="Logical Cluster.")

‎src/confluent_kafka/schema_registry/common/_oauthbearer.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _validate(self):
141141
self.token_endpoint = urlunparse(parsed_token_endpoint)
142142
elif not token_endpoint_override:
143143
raise ValueError("bearer.auth.issuer.endpoint.query must be provided "
144-
"when bearer.auth.issuer.endpoint.url isn overridden")
144+
"when bearer.auth.issuer.endpoint.url isn't overridden")
145145

146146

147147
class _StaticFieldProvider(_BearerFieldProvider):

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /