-
Couldn't load subscription status.
- Fork 935
Add close() to producer #2039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add close() to producer #2039
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ peter-lucia
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds a close() method to the Kafka Producer class to enable explicit cleanup of producer resources. This change supports proper resource management, live credential rotation use cases, and prevents memory leaks by allowing users to explicitly destroy producer instances.
- Implements Producer_close() function with proper resource cleanup
- Adds method definition to Producer_methods array with documentation
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...on into add-producer-close
...on into add-producer-close
...on into add-producer-close
Hi @peter-lucia thanks for the PR and the checks.
A producer close isn't necessary as long as there are no network calls to do before destroying. Given you can flush before deallocating the producer, and in case you don't do that messages are discarded, there were no calls to do, but we've to check the recently introduced telemetry push.
Going through the comments:
The RD_KAFKA_DESTROY_F_IMMEDIATE doesn't seem a problem as it's not used in Producer_dealloc. Yes, it leaks some memory, but it should be used only when you have to terminate an application immediately.
PyObject_GC_UnTrack(self)' isn't disabling the GC but it's removing self` from GC tracking, so it avoids cycles that can prevent the GC.
librdkafka internally has a reference count for some C objects so even if you call destroy in a different place and there are references left it would hang, so we have any of those problem before any API change.
rkid is ever increasing and that's OK, it's meant to tell a producer instance from a different one in the logs.
rd_kafka_global_cnt should reach zero if you force the GC
I tried reproducing it with your example but couldn't, this is the code I tried:
import gc import sys import json from time import sleep from confluent_kafka import Producer class Kafka: def __init__(self, conf): def error_cb(*args, **kwargs): print(args, kwargs) self.conf = { 'socket.timeout.ms': 10, 'error_cb': error_cb, 'message.timeout.ms': 10 ,**conf } self.producer = Producer(self.conf) def recreate_producer(self): print('Create new producer') prior_producer = self.producer prior_producer.flush(10) # prior_producer.close() # This would be preferred del prior_producer gc.collect() # technically redundant, since garbage collection will happen after the method returns self.producer = Producer(self.conf) while True: handler = Kafka({"bootstrap.servers": sys.argv[1]}) msg = {"test": "test"} handler.producer.produce(json.dumps(msg)) handler.recreate_producer() sleep(0.5)
A simple and reliable way to see if there are active librdkafka instances it to check process threads, if they're growing, but after many iterations they're the same:
ps -T -p $(pgrep -f reproduce)
PID SPID TTY TIME CMD
3739260 3739260 pts/13 00:00:18 python
3739260 3819989 pts/13 00:00:00 rdk:main
3739260 3819990 pts/13 00:00:00 rdk:broker-1
3739260 3819992 pts/13 00:00:00 rdk:broker0
If you find some changes to the reproducer code that make it happen please tell us or if it needs to be run for a longer time.
Hi @emasab,
Thank you for taking a look at this PR and going through the investigation details. The early investigatory writings around librdkafka internals were included here to detail out background on what's happening under the hood in librdkafka and how that relates to the python package as we were digging into it. They were intended to document possible reasons for why producer resources could be living on longer than they should be. Related to this is how memory leaks could happen, or how the python garbage collection against the producer object is adjusted.
Your explanations and comments on our investigation notes generally match our latest understanding as we are digging into this. Going through them:
The RD_KAFKA_DESTROY_F_IMMEDIATE doesn't seem a problem as it's not used in Producer_dealloc. Yes, it leaks some memory, but it should be used only when you have to terminate an application immediately.
Yes, initially it was more of a concern but given that it doesn't appear to be used by the dealloc, we're in agreement here that while it leaks memory, as long as it is not used now or in the future, it should be okay.
PyObject_GC_UnTrack(self)' isn't disabling the GC but it's removing self` from GC tracking, so it avoids cycles that can prevent the GC.
Yes, we didn't mean that it was disabling the GC entirely (if so, we'd have bigger issues!). We're aligned on this: the garbage collector is disabled for the Producer object (self).
Follow up on this -- the dealloc method doesn't confirm the kafka objects are destroyed. The call to wait before they are confirmed to be destroyed can block up until the provided timeout, so we could see why that might not have been done as dealloc's won't generally have timeouts provided -- a default would have to be provided which might not apply to all use cases. For the most certainty during app-initiated cleanup operations where timing is important, a confirmation that the cleanup was actually successful is helpful, which is why we added it to Producer.close().
rkid is ever increasing and that's OK, it's meant to tell a producer instance from a different one in the logs.
Yes, makes sense.
rd_kafka_global_cnt should reach zero if you force the GC
The newly added unit test test_producer_close exposes the issue we're seeing with only relying on GC. It will fail when there are lingering producer resources not cleaned up by other tests. This calls into question when/if the dealloc is actually being called or whether it's actually running successfully. More deterministic behavior around this would be helpful, motivating adding producer.close().
Can you reproduce this by running the unit tests but commenting out the p.close() calls we've added in all other unit tests except for test_producer_close? This will cause test_producer_close to fail. You might have to run a few times to reproduce the failure. We had to run a couple times today to reproduce it again. Our adjustments to the other tests in this PR ensure test_producer_close passes more reliably, but we might want to look at adding p.close() in even more of the producer unit tests to be 100% sure the resources are always cleaned up.
Expanding on this:
One of the additions to producer.close() is a call to rd_kafka_wait_destroyed, which will block until all the underlying kafka objects can be confirmed to be destroyed or until the default (5s) or provided timeout is reached. This, combined with the on demand availability of cleaning up the producer, would provide more deterministic behavior, especially for context managers (e.g. __exit__ clause), or when the producer needs to be fully deleted because credentials are expiring, or there is uncertainty about recent changes (e.g. open telemetry) that could cause the producer to live on. This enables the producer to have the same capabilities as a Consumer, which also has a close() method, but with added clarity on whether or not all underlying kafka objects are actually destroyed.
rd_kafka_wait_destroyed will return 0 if all kafka object are destroyed. This causes the producer.close() method to return True.
rd_kafka_wait_destroyed will return -1 if not all kafka objects are destroyed when the timeout was reached. This causes the python producer.close() method to return False
Please let us know if you can reproduce our findings on your end through the unit testing or if you see any discrepancies. We will look into the additional mechanisms of detecting lingering librdkafka instances but believe the rd_kafka_wait_destroyed method should also be sufficient.
Some observations:
- which version are you testing? There are some fixes that could block client destruction
- there's need for a test to reproduce it before any change
- you cannot call
rd_kafka_wait_destroyedbecause it awaits all instances are destroyed and that's not what's needed you could have a consumer and a producer and only deallocate the producer and create a new one while the consumer stays active
...on into add-producer-close
- which version are you testing? There are some fixes that could block client destruction
We're testing w/ 2.11.1. Python 3.11
- there's need for a test to reproduce it before any change
Yes the unit tests are the easiest way to do with the changes that are in this branch. Here's what I was describing
#!/usr/bin/env python # -*- coding: utf-8 -*- import gc import json import pytest from confluent_kafka import Producer, KafkaError, KafkaException, \ TopicPartition, libversion from struct import pack from time import sleep from tests.common import TestConsumer def error_cb(err): print('error_cb', err) class MockHandler: def __init__(self, *args, **kwargs): self.conf = { 'debug': 'all', 'socket.timeout.ms': 10, 'error_cb': error_cb, 'message.timeout.ms': 10 } self.producer = Producer(self.conf) def recreate_producer(self, timeout: float) -> bool: prior_producer = self.producer prior_producer.flush(10) destroyed = prior_producer.close(timeout=timeout) self.producer = Producer(self.conf) return destroyed def test_basic_api(): """ Basic API tests, these wont really do anything since there is no broker configured. """ with pytest.raises(TypeError) as ex: p = Producer() assert ex.match('expected configuration dict') p = Producer({'socket.timeout.ms': 10, 'error_cb': error_cb, 'message.timeout.ms': 10}) p.produce('mytopic') p.produce('mytopic', value='somedata', key='a key') def on_delivery(err, msg): print('delivery', err, msg) # Since there is no broker, produced messages should time out. assert err.code() == KafkaError._MSG_TIMED_OUT print('message latency', msg.latency()) p.produce(topic='another_topic', value='testing', partition=9, callback=on_delivery) p.poll(0.001) p.flush(0.002) p.flush() try: p.list_topics(timeout=0.2) except KafkaException as e: assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT) # assert p.close(), "Failed to validate that producer was closed." def test_produce_timestamp(): """ Test produce() with timestamp arg """ p = Producer({'socket.timeout.ms': 10, 'error_cb': error_cb, 'message.timeout.ms': 10}) # Requires librdkafka >=v0.9.4 try: p.produce('mytopic', timestamp=1234567) except NotImplementedError: # Should only fail on non-supporting librdkafka if libversion()[1] >= 0x00090400: raise p.flush() # Should be updated to 0.11.4 when it is released @pytest.mark.skipif(libversion()[1] < 0x000b0400, reason="requires librdkafka >=0.11.4") def test_produce_headers(): """ Test produce() with timestamp arg """ p = Producer({'socket.timeout.ms': 10, 'error_cb': error_cb, 'message.timeout.ms': 10}) binval = pack('hhl', 1, 2, 3) headers_to_test = [ [('headerkey', 'headervalue')], [('dupkey', 'dupvalue'), ('empty', ''), ('dupkey', 'dupvalue')], [('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')], [('key_with_null_value', None)], [('binaryval', binval)], [('alreadyutf8', u'Småland'.encode('utf-8'))], [('isunicode', 'Jämtland')], {'headerkey': 'headervalue'}, {'dupkey': 'dupvalue', 'empty': '', 'dupkey': 'dupvalue'}, # noqa: F601 {'dupkey': 'dupvalue', 'dupkey': 'diffvalue'}, # noqa: F601 {'key_with_null_value': None}, {'binaryval': binval}, {'alreadyutf8': u'Småland'.encode('utf-8')}, {'isunicode': 'Jämtland'} ] for headers in headers_to_test: print('headers', type(headers), headers) p.produce('mytopic', value='somedata', key='a key', headers=headers) p.produce('mytopic', value='somedata', headers=headers) with pytest.raises(TypeError): p.produce('mytopic', value='somedata', key='a key', headers=('a', 'b')) with pytest.raises(TypeError): p.produce('mytopic', value='somedata', key='a key', headers=[('malformed_header')]) with pytest.raises(TypeError): p.produce('mytopic', value='somedata', headers={'anint': 1234}) p.flush() # Should be updated to 0.11.4 when it is released @pytest.mark.skipif(libversion()[1] >= 0x000b0400, reason="Old versions should fail when using headers") def test_produce_headers_should_fail(): """ Test produce() with timestamp arg """ p = Producer({'socket.timeout.ms': 10, 'error_cb': error_cb, 'message.timeout.ms': 10}) with pytest.raises(NotImplementedError) as ex: p.produce('mytopic', value='somedata', key='a key', headers=[('headerkey', 'headervalue')]) assert ex.match('Producer message headers requires confluent-kafka-python built for librdkafka version >=v0.11.4') def test_subclassing(): class SubProducer(Producer): def __init__(self, conf, topic): super(SubProducer, self).__init__(conf) self.topic = topic def produce_hi(self): super(SubProducer, self).produce(self.topic, value='hi') sp = SubProducer(dict(), 'atopic') assert isinstance(sp, SubProducer) # Invalid config should fail with pytest.raises(KafkaException): sp = SubProducer({'should.fail': False}, 'mytopic') sp = SubProducer({'log.thread.name': True}, 'mytopic') sp.produce('someother', value='not hello') sp.produce_hi() def test_dr_msg_errstr(): """ Test that the error string for failed messages works (issue #129). The underlying problem is that librdkafka reuses the message payload for error value on Consumer messages, but on Producer messages the payload is the original payload and no rich error string exists. """ p = Producer({"message.timeout.ms": 10}) def handle_dr(err, msg): # Neither message payloads must not affect the error string. assert err is not None assert err.code() == KafkaError._MSG_TIMED_OUT assert "Message timed out" in err.str() # Unicode safe string p.produce('mytopic', "This is the message payload", on_delivery=handle_dr) # Invalid unicode sequence p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr) # p.flush() def test_set_partitioner_murmur2(): """ Test ability to set built-in partitioner type murmur """ Producer({'partitioner': 'murmur2'}) def test_set_partitioner_murmur2_random(): """ Test ability to set built-in partitioner type murmur2_random """ Producer({'partitioner': 'murmur2_random'}) def test_set_invalid_partitioner_murmur(): """ Assert invalid partitioner raises KafkaException """ with pytest.raises(KafkaException) as ex: Producer({'partitioner': 'murmur'}) assert ex.match('Invalid value for configuration property "partitioner": murmur') def test_transaction_api(): """ Excercise the transactional API """ p = Producer({"transactional.id": "test"}) with pytest.raises(KafkaException) as ex: p.init_transactions(0.5) assert ex.value.args[0].code() == KafkaError._TIMED_OUT assert ex.value.args[0].retriable() is True assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False # Any subsequent APIs will fail since init did not succeed. with pytest.raises(KafkaException) as ex: p.begin_transaction() assert ex.value.args[0].code() == KafkaError._CONFLICT assert ex.value.args[0].retriable() is True assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False consumer = TestConsumer({"group.id": "testgroup"}) group_metadata = consumer.consumer_group_metadata() consumer.close() with pytest.raises(KafkaException) as ex: p.send_offsets_to_transaction([TopicPartition("topic", 0, 123)], group_metadata) assert ex.value.args[0].code() == KafkaError._CONFLICT assert ex.value.args[0].retriable() is True assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False with pytest.raises(KafkaException) as ex: p.commit_transaction(0.5) assert ex.value.args[0].code() == KafkaError._CONFLICT assert ex.value.args[0].retriable() is True assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False with pytest.raises(KafkaException) as ex: p.abort_transaction(0.5) assert ex.value.args[0].code() == KafkaError._CONFLICT assert ex.value.args[0].retriable() is True assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False # assert p.close(), "The producer was not closed" def test_purge(): """ Verify that when we have a higher message.timeout.ms timeout, we can use purge() to stop waiting for messages and get delivery reports """ p = Producer( {"socket.timeout.ms": 10, "error_cb": error_cb, "message.timeout.ms": 30000} ) # 30 seconds # Hack to detect on_delivery was called because inner functions can modify nonlocal objects. # When python2 support is dropped, we can use the "nonlocal" keyword instead cb_detector = {"on_delivery_called": False} def on_delivery(err, msg): cb_detector["on_delivery_called"] = True # Because we are purging messages, we should see a PURGE_QUEUE kafka error assert err.code() == KafkaError._PURGE_QUEUE # Our message won't be delivered, but also won't timeout yet because our timeout is 30s. p.produce(topic="some_topic", value="testing", partition=9, callback=on_delivery) p.flush(0.002) assert not cb_detector["on_delivery_called"] # When in_queue set to false, we won't purge the message and get delivery callback p.purge(in_queue=False) p.flush(0.002) assert not cb_detector["on_delivery_called"] # When we purge including the queue, the message should have delivered a delivery report # with a PURGE_QUEUE error p.purge() p.flush(0.002) assert cb_detector["on_delivery_called"] # assert p.close(), "The producer was not closed" def test_producer_bool_value(): """ Make sure producer has a truth-y bool value See https://github.com/confluentinc/confluent-kafka-python/issues/1427 """ p = Producer({}) assert bool(p) # assert p.close(), "The producer was not fully closed" @pytest.mark.parametrize("timeout,destroyed_expected,exception_expected", [ (10.0, True, None), (5.0, True, None), (-100, None, ValueError), ("wrong", None, ValueError) ]) def test_producer_close(timeout, destroyed_expected, exception_expected): """ Ensures the producer is fully cleaned up when closed before being recreated """ handler = MockHandler() msg = {"test": "test"} handler.producer.produce(json.dumps(msg)) if exception_expected is not None: with pytest.raises(exception_expected): _ = handler.recreate_producer(timeout) else: destroyed_actual = handler.recreate_producer(timeout) assert destroyed_actual == destroyed_expected
We have reproduced this on multiple workstations at this point. It should be reproducible after also updating your Producer.c from what's added in this PR. The two tests failing indicates that the global ref count did not reach 0 within the provided timeout (10s and 5s) when producer.close() is not called in the other unit tests (you can see it's commented out).
- you cannot call
rd_kafka_wait_destroyedbecause it awaits all instances are destroyed and that's not what's needed you could have a consumer and a producer and only deallocate the producer and create a new one while the consumer stays active
If an overall shutdown method of both the consumer and producer is what's required if we're to call rd_kafka_wait_destroyed, where would be a good place for such a consumer+producer shutdown function?
I checked the failing tests and they're failing because the new code is calling rd_kafka_wait_destroyed that awaits termination of all other instances. That means that until GC is called the close call cannot terminate.
There's no need to call rd_kafka_wait_destroyed (that is deprecated) as when rd_kafka_destroy returns all librdkafka threads from that instance are joined and all memory freed.
So if you don't call rd_kafka_wait_destroyed in producer.close() or if you call gc.collect() or gc.set_threshold(1, 1, 1) at the beginning of test_producer_close, or if you don't call producer.close() before replacing the producer, the tests are passing.
That doesn't mean that for other reasons the rd_kafka_destroy cannot block but in that case it's something to search in librdkafka code that's why I'm asking for a test to reproduce the missing instance destruction without new code.
About the telemetry push in rd_kafka_destroy have to investigate more as it can last up to 1s, but if a rd_kafka_producer_close() is to be added it should be in librdkafka before confluent-kafka-python. The Producer_dealloc code is releasing the GIL so it shouldn't block other Python threads from continuing but just the garbage collection.
I checked the failing tests and they're failing because the new code is calling rd_kafka_wait_destroyed that awaits termination of all other instances. That means that until GC is called the close call cannot terminate.
Let's dig into this a bit more. The close() call returns once all underlying kafka objects are destroyed or if the timeout (default=5s) is reached before that happens. In the two failing tests, the timeout was reached first, indicating that either there were lingering producers still running or the current number of active producers was greater than 0 (not cleaned up).
GC will run right after the completion of the other test functions. GC should be handling the cleanup of any lingering consumers or producers left over from other tests before the start of new tests. The fact that these two tests fail fairly consistently when we don't call producer.close() at the end of the other tests suggests that the GC alone is not cleaning up all producers.
Relying on the GC alone may be allowing the python to move on and asynchronously create new producers before the previous ones have been cleared. This may explain why GC alone doesn't clean up all underlying producers.
GC should be handling the cleanup of any lingering consumers or producers left over from other tests before the start of new tests.
That isn't happening for sure, the GC cyclic garbage collector isn't triggered automatically there and neither it's triggered at specific time intervals but it happens when there is a certain number of allocations - deallocations, that's why it's not called when the close is awaiting and if you call it explicitly with gc.collect() or reduce the generation thresholds with gc.set_threshold(1, 1, 1) it doesn't block and the test passes.
rd_kafka_wait_destroyed isn't currently used in confluent-kafka-python and it shouldn't be as it's awaiting all other existing instances and causing these deadlocks when the GC didn't run before it.
But there's one additional thing, the cyclic GC is needed because the producer isn't automatically destroyed for reference count zero. I check if there's some cycle that can prevent the destruction by refcount or some dangling reference count that prevents it from reaching zero and calling the destructor immediately after the previous tests return.
That could make that the producer is surviving for longer until collection happens.
Yes, I think our understandings align here: the GC isn't happening and like you are saying it isn't triggered automatically after the completion of the previous tests.
the cyclic GC is needed because the producer isn't automatically destroyed for reference count zero
Yes
I check if there's some cycle that can prevent the destruction by refcount or some dangling reference count that prevents it from reaching zero and calling the destructor immediately after the previous tests return.
That could make that the producer is surviving for longer until collection happens.
Did your checks reveal anything or is this still an open question?
I have updated the PR to no longer include the rd_kafka_wait_destroyed. This will enable the producer to be closed on demand, which is the core of what we're after here.
@peter-lucia I finished my checks and found that in failing tests what was preventing the immediate garbage collection was the exceptions thrown in test_basic_api and test_transaction_api. Given they keep a traceback with all local variables including the producer. If you delete the local variables with del ex the tests failing with rd_kafka_wait_destroyed (just for debug purpose) are passing.
I understand there's need for a more predictable producer disposal just this way after calling close() all other methods like closed_producer.poll() are causing a segmentation fault. We've to add null checks to the API like for the consumer:
if (!self->rk) { PyErr_SetString(PyExc_RuntimeError, "Consumer closed"); return NULL; }
If you prefer we can continue and finish these changes.
If you prefer we can continue and finish these changes.
That is fine by me, go ahead!
Uh oh!
There was an error while loading. Please reload this page.
What
Before this change:
producer.close()method existed for producers.After this change:
producer.close()to enable the producer to be shut down on demand.Checklist
Test & Review
Other Information