Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit e191a89

Browse files
committed
add example for offsetsForTimes
1 parent a09aee0 commit e191a89

File tree

1 file changed

+36
-0
lines changed

1 file changed

+36
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
use SimpleKafkaClient\Configuration;
4+
use SimpleKafkaClient\Message;
5+
use SimpleKafkaClient\Consumer;
6+
use SimpleKafkaClient\Producer;
7+
use SimpleKafkaClient\TopicPartition;
8+
9+
error_reporting(E_ALL);
10+
11+
$conf = new Configuration();
12+
$conf->set('client.id', 'pure-php-producer');
13+
$conf->set('metadata.broker.list', 'kafka:9096');
14+
$conf->set('compression.codec', 'snappy');
15+
$conf->set('message.timeout.ms', '5000');
16+
17+
$producer = new Producer($conf);
18+
$topic = $producer->getTopicHandle('pure-php-test-topic-offsets');
19+
$time = time();
20+
$topic->producev(
21+
RD_KAFKA_PARTITION_UA,
22+
RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full
23+
'special-message',
24+
'special-key',
25+
[
26+
'special-header' => 'awesome'
27+
]
28+
);
29+
$result = $producer->flush(20000);
30+
31+
$topicPartition = new TopicPartition('pure-php-test-topic-offsets', 0, $time);
32+
$result = $producer->offsetsForTimes([$topicPartition], 10000);
33+
var_dump($result[0]->getTopicName());
34+
var_dump($result[0]->getPartition());
35+
var_dump($result[0]->getOffset());
36+

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /