I'm struggling to (integration) test a component that makes use of an infinite generator.
Background: At a really high level, this component is essentially a "middleman" in a kafka workflow. It subscribes to a kafka topic, consumes messages, does some operations on the messages, and re-publishes them on a different topic.
Our integration tests run in (very close to) a production-like environment: We bring up a docker compose stack, and then exec
into various containers to execute tests (with pytest). We just avoid actually executing the startup scripts. So the tests in question here are executing against a real kafka cluster, (and real database, but the tests use a magic session that runs everything in a transaction that is rolled back at the end of a test). Similarly, we use the real kafka cluster, but we don't turn on our producers/consumers (we just let specific tests use the kafka cluster for the test). I think the best way to describe the philosophy is that we want production-like connections available to be tested, but we still try to preserve test-independence. The tests don't depend on pre-loaded data, and (attempt to) clean up after themselves so as to not leave behind data that could interfere with another test. Application code is tested using pytest and calling application functions/methods directly.
Note: We do have a higher-level integration test setup that tests "fully-externally" only using available API endpoints, etc.
High-level implementation (Python):
def startup():
config = get_config()
kafka_consumer = Consumer(config.kafka, INPUT_TOPIC)
kafka_producer = Producer(config.kafka, OUTPUT_TOPIC)
consume(kafka_consumer.get_messages(), kafka_producer.publish)
def consume(messages: Iterable, publish: Callable):
for message in messages:
payload = transform(message)
publish(payload)
Now, testing consume
is easy: I can pass it any iterable of messages (as python objects) and any callable. All the business logic-y stuff is easy to test. (It's even unit testable instead of integration testable).
However, we would still like to have some tests that verify the integration. ie. that the component can connect to the kafka cluster, uses the correct topics, etc.
Our current solution for this is:
get_messages
accepts parameters fornum_messages
andtimeout
(both default to sentinel values indicating "infinity/no timeout")- the
get_messages
generator exits afternum_messages
or aftertimeout
startup
also accepts these parameters/defaults, and passes them on.- running production code uses the defaults
Then: integration tests take the form
def test_startup(kafka_producer): # pytest fixtures
test_message = "..."
kafka_producer.publish(test_message)
startup(num_messages=1, timeout=5)
# startup exits after one message is processed
# if error causes message not to be processed, startup exits after 5 seconds
# this prevents tests from running forever if there are problems
# assert things
As you can see, all of the complicated "message limit" checking that goes on in the production code is just to make sure that tests will actually fail instead of hanging forever.
This feels like a lot of complexity has been added, just to enable testing. Also, this still doesn't really test the infinite case (different branches in the num_messages
/timeout
checking, compared to when these parameters are default/unused).
I don't think this problem is necessarily specific to kafka, but more to infinite generators in general. How can I test the code that configures an infinite generator?
Things I've tried:
- Leaving
get_messages
as an (always) infinite generator. I can manually step (next(generator)
) or manually close (generator.close()
) the generator, which is great for testingget_messages
directly, or for testing anything thatget_messages
is passed to as a parameter, but I can't see how to extend this to testingstartup
because its where the generator is created and the test does not have access to it. This feels close to what I want, as if it is possible, but I can't quite see to connect the dots. - Dependency Injection: Pass the generator into
startup
. This makesstartup
testable, but it just moves the issue to a different place. Something, somewhere has to create and start the generator, and I would like to test it. - Minimizing the code that lacks testing: Originally, the
consume
function was a part ofstartup
. Extracting it out and accepting the generator as a parameter enabled it to be tested more easily. Its an improvement, but as with dependency injection, it still avoids the issue.
1 Answer 1
Removing the message and time limit from production code involves reorienting your view of this integration test. Ideally the test should connect to the Kafka queue that the System Under Test is reading messages from (kafka_consumer). The test publishes a pre-determined message to that queue, and then begins listening for an appropriate response from the Kafka queue that the System Under Test should be publishing to (kafka_producer). You want to test the input and output, but not the bits in between.
Now, your test will still need a timeout, but that timeout will exist in the test, because it will be waiting for a message to come in from the kafka_producer queue.
You haven't gone into a lot of the specifics of the messages, but your test will want to include some sort of value in its initial message so it can detect the proper message being published to the other queue. This will ensure you don't get a false negative, or worse yet, a false positive.
Essentially your test becomes:
kafka_consumer = Consumer(config.kafka, INPUT_TOPIC)
kafka_producer = Producer(config.kafka, OUTPUT_TOPIC)
kafka_consumer.publish(test_message);
# 1. Subscribe to kafka_producer and wait for the other message
# 2. But only wait for X number of seconds (or milliseconds) before failing the test
-
Hmm, this would imply then that the test is expecting to be run against a running service, right? This would be a pretty big shift for us, since our tests currently all run by invoking the code under test directly. I agree it feels a lot cleaner though. Will definitely take this under considerationLuke Nelson– Luke Nelson2024年01月31日 22:05:27 +00:00Commented Jan 31, 2024 at 22:05
-
@LukeNelson, can you clarify what you mean by "Our integration tests run in (very close to) a production-like environment" in your question? That made me think you had real services and message queues running. Is that not true?Greg Burghardt– Greg Burghardt2024年01月31日 22:34:40 +00:00Commented Jan 31, 2024 at 22:34
-
I updated the main post with more info about our test setup. Does that help?Luke Nelson– Luke Nelson2024年01月31日 23:09:31 +00:00Commented Jan 31, 2024 at 23:09
Explore related questions
See similar questions with these tags.
startup
method". Either way, @GregBurghardt has the right answer. If testing "the component", then we should let it run and talk to it as a client (Greg's answer). If testing "the code", then I think letting the test runstartup
on a separate process (multiprocessing), so it can be terminated after is probably the way to do this (Inspired by Greg's answer).