Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 2994535

Browse files
committed
revert debug code
1 parent 7379ed5 commit 2994535

File tree

3 files changed

+13
-29
lines changed

3 files changed

+13
-29
lines changed

‎src/ext-php-rdkafka/php-kafka-lib/highLevelConsumer.php

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@
3838
)
3939
->withAdditionalBroker('kafka:9096')
4040
->withConsumerGroup('php-kafka-lib-high-level-consumer')
41-
->withSubscription('pure-php-test-topic')
41+
->withSubscription('php-kafka-lib-test-topic')
4242
->build();
4343

4444
$consumer->subscribe();
45-
$c = 0;
45+
4646
while (true) {
4747
try {
4848
$message = $consumer->consume(10000);
49-
++$c;
5049
} catch (KafkaConsumerTimeoutException|KafkaConsumerEndOfPartitionException $e) {
5150
echo 'Didn\'t receive any messages, waiting for more...' . PHP_EOL;
5251
continue;
@@ -55,18 +54,15 @@
5554
continue;
5655
}
5756

58-
if($c%1000 === 0) {
59-
echo sprintf('%d messages consumed', $c) . PHP_EOL;
60-
}
61-
/*echo sprintf(
57+
echo sprintf(
6258
'Read message with key:%s payload:%s topic:%s partition:%d offset:%d headers:%s',
6359
$message->getKey(),
6460
$message->getBody(),
6561
$message->getTopicName(),
6662
$message->getPartition(),
6763
$message->getOffset(),
6864
implode(',', $message->getHeaders())
69-
) . PHP_EOL;*/
65+
) . PHP_EOL;
7066

71-
//$consumer->commit($message);
67+
$consumer->commit($message);
7268
}

‎src/ext-php-rdkafka/pure-php/highLevelConsumer.php

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,10 @@
3838

3939
// Subscribe to one or multiple topics
4040
$consumer->subscribe(['pure-php-test-topic']);
41-
$c = 0;
41+
4242
while (true) {
4343
// Try to consume messages for the given timout (20s)
4444
$message = $consumer->consume(20000);
45-
$c++;
4645

4746
if (RD_KAFKA_RESP_ERR__PARTITION_EOF === $message->err) {
4847
echo 'Reached end of partition, waiting for more messages...' . PHP_EOL;
@@ -55,23 +54,19 @@
5554
continue;
5655
}
5756

58-
if($c%1000 === 0) {
59-
echo sprintf('%d messages consumed', $c) . PHP_EOL;
60-
}
61-
62-
/*echo sprintf(
57+
echo sprintf(
6358
'Read message with key:%s payload:%s topic:%s partition:%d offset:%d',
6459
$message->key,
6560
$message->payload,
6661
$message->topic_name,
6762
$message->partition,
6863
$message->offset
69-
) . PHP_EOL;*/
64+
) . PHP_EOL;
7065
// Here is where you do your business logic to process your message
7166
// after you have done so, commit the message offset to the broker
7267

7368
// commit the message(s) offset synchronous back to the broker
74-
//$consumer->commit($message);
69+
$consumer->commit($message);
7570

7671
// you can also commit the message(s) offset in an async manner, which is slightly faster
7772
// but poses of course the challenge of handling errors in an async manner as well

‎src/ext-php-rdkafka/pure-php/producer.php

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
$conf->set('compression.codec', 'snappy');
1717
// set timeout, producer will retry for 5s
1818
$conf->set('message.timeout.ms', '5000');
19-
$conf->set('linger.ms', '1000');
2019
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
2120
//$conf->set('enable.idempotence', 'true');
2221

@@ -29,7 +28,7 @@
2928
echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL;
3029
} else {
3130
// message successfully delivered
32-
//echo sprintf('Message sent SUCCESSFULLY with payload => %s', 'x') . PHP_EOL;
31+
echo sprintf('Message sent SUCCESSFULLY with payload => %s', $message->payload) . PHP_EOL;
3332
}
3433
});
3534

@@ -58,7 +57,7 @@
5857
// initialize producer topic
5958
$topic = $producer->newTopic('pure-php-test-topic');
6059
// Produce 10 test messages
61-
$amountTestMessages = 1000000;
60+
$amountTestMessages = 10;
6261

6362
// Loop to produce some test messages
6463
for ($i = 0; $i < $amountTestMessages; ++$i) {
@@ -70,19 +69,13 @@
7069
$topic->producev(
7170
$partition,
7271
RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full
73-
//sprintf('test message-%d',$i),
74-
str_repeat('12312313123213123',10000),
72+
sprintf('test message-%d',$i),
7573
sprintf('test-key-%d', $i),
7674
[
7775
'some' => sprintf('header value %d', $i)
7876
]
7977
);
80-
if ($i%1000 === 0) {
81-
echo sprintf('Queued message number: %d', $i) . PHP_EOL;
82-
}
83-
if ($i%5000 === 0) {
84-
$producer->flush(10000);
85-
}
78+
echo sprintf('Queued message number: %d', $i) . PHP_EOL;
8679

8780
// Poll for events e.g. producer callbacks, to handle errors, etc.
8881
// 0 = non-blocking

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /