Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add close() to producer #2039

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
peter-lucia wants to merge 18 commits into confluentinc:master
base: master
Choose a base branch
Loading
from peter-lucia:add-producer-close
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5a075d0
Add close() to producer
peter-lucia Aug 27, 2025
7ee433b
Updated Producer.c
peter-lucia Aug 29, 2025
1694aee
Merge branch 'master' of github.com:confluentinc/confluent-kafka-pyth...
peter-lucia Aug 30, 2025
aae0436
Merge branch 'master' of github.com:confluentinc/confluent-kafka-pyth...
peter-lucia Aug 30, 2025
fa7cda2
Update tests, log warning
peter-lucia Aug 30, 2025
391f15f
Updated logging
peter-lucia Aug 30, 2025
e24bdfc
WIP
peter-lucia Sep 2, 2025
69f0aff
Updated Producer.c
peter-lucia Sep 3, 2025
0effb1d
Updated tests
peter-lucia Sep 3, 2025
6e72ad8
Restore pyproject.toml
peter-lucia Sep 3, 2025
a0fdc7c
Cleanup
peter-lucia Sep 3, 2025
01ef6d7
Delete uv.lock
peter-lucia Sep 3, 2025
b05538d
Merge branch 'master' of github.com:confluentinc/confluent-kafka-pyth...
peter-lucia Sep 3, 2025
c35b4e2
Updated test_Producer.py
peter-lucia Sep 3, 2025
af2771f
Merge branch 'master' of github.com:confluentinc/confluent-kafka-pyth...
peter-lucia Sep 5, 2025
bca8b16
Updated Producer.c and test_Producer.py
peter-lucia Sep 9, 2025
12e2e1d
Updated test_Producer.py
peter-lucia Sep 9, 2025
f33b96e
Merge branch 'master' into add-producer-close
peter-lucia Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions DEVELOPER.md
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ If librdkafka is installed in a non-standard location provide the include and li
C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
```

On MacOS, If you installed librdkafka with brew, you can use the following
```bash
export C_INCLUDE_PATH=$(brew --prefix librdkafka)/include
export LIBRARY_PATH=$(brew --prefix librdkafka)/lib
```

4. **Install confluent-kafka-python with optional dependencies**
```bash
pip3 install -e .[dev,tests,docs]
Expand Down Expand Up @@ -87,6 +93,26 @@ python3 tools/unasync.py --check
If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counter parts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the --check flag and fail the build.


## Local Setup with UV

Tested with python 3.11

```bash
# Modify pyproject.toml to require python version >=3.11
# This fixes the cel-python dependency conflict
uv venv --python 3.11
source .venv/bin/activate

uv sync --extra dev --extra tests
uv pip install trivup setuptools
pytest tests/

# When making changes, change project.version in pyproject.toml before re-running:
uv sync --extra dev --extra tests

```


## Tests


Expand Down
46 changes: 45 additions & 1 deletion src/confluent_kafka/src/Producer.c
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,42 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
return cfl_PyInt_FromInt(qlen);
}


static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) {

CallState cs;

if (!self->rk)
Py_RETURN_TRUE;

CallState_begin(self, &cs);

/* Warn if there are pending messages */
int outq_len = rd_kafka_outq_len(self->rk);
if (outq_len > 0) {
const char msg[150];
sprintf(msg, "There are %d message(s) still in producer queue! "
"Use flush() or wait for delivery.", outq_len);
rd_kafka_log_print(
self->rk,
CK_LOG_WARNING,
"CLOSWARN",
msg
);
}
rd_kafka_destroy(self->rk);
rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested");

self->rk = NULL;

if (!CallState_end(self, &cs))
return NULL;

Py_RETURN_TRUE;

}


static PyObject *Producer_init_transactions (Handle *self, PyObject *args) {
CallState cs;
rd_kafka_error_t *error;
Expand Down Expand Up @@ -609,7 +645,15 @@ static PyMethodDef Producer_methods[] = {
" :rtype: int\n"
"\n"
},

{ "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS,
".. py:function:: close()\n"
"\n"
" Request to close the producer on demand.\n"
"\n"
" :rtype: bool\n"
" :returns: True if producer close requested successfully, False otherwise\n"
"\n"
},
{ "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS,
".. py:function:: flush([timeout])\n"
"\n"
Expand Down
10 changes: 10 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.h
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@
#endif


#define CK_LOG_EMERG 0
#define CK_LOG_ALERT 1
#define CK_LOG_CRIT 2
#define CK_LOG_ERR 3
#define CK_LOG_WARNING 4
#define CK_LOG_NOTICE 5
#define CK_LOG_INFO 6
#define CK_LOG_DEBUG 7



/****************************************************************************
*
Expand Down
28 changes: 26 additions & 2 deletions tests/test_Producer.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import json
from struct import pack

import pytest

from confluent_kafka import Producer, KafkaError, KafkaException, \
TopicPartition, libversion

from tests.common import TestConsumer


Expand Down Expand Up @@ -47,6 +48,8 @@ def on_delivery(err, msg):
except KafkaException as e:
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT)

assert p.close(), "Failed to validate that producer was closed."


def test_produce_timestamp():
""" Test produce() with timestamp arg """
Expand Down Expand Up @@ -239,6 +242,8 @@ def test_transaction_api():
assert ex.value.args[0].fatal() is False
assert ex.value.args[0].txn_requires_abort() is False

assert p.close(), "The producer was not closed"


def test_purge():
"""
Expand Down Expand Up @@ -274,6 +279,8 @@ def on_delivery(err, msg):
p.flush(0.002)
assert cb_detector["on_delivery_called"]

assert p.close(), "The producer was not closed"


def test_producer_bool_value():
"""
Expand All @@ -283,3 +290,20 @@ def test_producer_bool_value():

p = Producer({})
assert bool(p)
assert p.close(), "The producer was not fully closed"


def test_producer_close():
"""
Ensures the producer close can be requested on demand
"""
conf = {
'debug': 'all',
'socket.timeout.ms': 10,
'error_cb': error_cb,
'message.timeout.ms': 10
}
producer = Producer(conf)
msg = {"test": "test"}
producer.produce(json.dumps(msg))
assert producer.close(), "The producer could nto be closed on demand"

AltStyle によって変換されたページ (->オリジナル) /