Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 1070e00

Browse files
committed
chore: rollback catching Throwable in callback proxies
1 parent 57d26fe commit 1070e00

9 files changed

+53
-93
lines changed

‎src/RdKafka/FFI/ConsumeCallbackProxy.php

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,9 @@ class ConsumeCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $nativeMessage, ?CData $opaque = null): void
1313
{
14-
try {
15-
($this->callback)(
16-
new Message($nativeMessage),
17-
OpaqueMap::get($opaque)
18-
);
19-
} catch (\Throwable $exception) {
20-
error_log($exception->getMessage(), E_ERROR);
21-
}
14+
($this->callback)(
15+
new Message($nativeMessage),
16+
OpaqueMap::get($opaque)
17+
);
2218
}
2319
}

‎src/RdKafka/FFI/DrMsgCallbackProxy.php

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@ class DrMsgCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $producer, CData $nativeMessage, ?CData $opaque = null): void
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($producer),
18-
new Message($nativeMessage),
19-
OpaqueMap::get($opaque)
20-
);
21-
} catch (\Throwable $exception) {
22-
error_log($exception->getMessage(), E_ERROR);
23-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($producer),
17+
new Message($nativeMessage),
18+
OpaqueMap::get($opaque)
19+
);
2420
}
2521
}

‎src/RdKafka/FFI/ErrorCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@ class ErrorCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $consumerOrProducer, int $err, string $reason, ?CData $opaque = null): void
1313
{
14-
try {
15-
($this->callback)(
16-
RdKafka::resolveFromCData($consumerOrProducer),
17-
$err,
18-
$reason,
19-
OpaqueMap::get($opaque)
20-
);
21-
} catch (\Throwable $exception) {
22-
error_log($exception->getMessage(), E_ERROR);
23-
}
14+
($this->callback)(
15+
RdKafka::resolveFromCData($consumerOrProducer),
16+
$err,
17+
$reason,
18+
OpaqueMap::get($opaque)
19+
);
2420
}
2521
}

‎src/RdKafka/FFI/LogCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@ class LogCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $rdkafka, int $level, string $facility, string $message): void
1313
{
14-
try {
15-
($this->callback)(
16-
RdKafka::resolveFromCData($rdkafka),
17-
$level,
18-
$facility,
19-
$message
20-
);
21-
} catch (\Throwable $exception) {
22-
error_log($exception->getMessage(), E_ERROR);
23-
}
14+
($this->callback)(
15+
RdKafka::resolveFromCData($rdkafka),
16+
$level,
17+
$facility,
18+
$message
19+
);
2420
}
2521
}

‎src/RdKafka/FFI/NativePartitionerCallbackProxy.php

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,14 @@ public function __invoke(
2525
?CData $topic_opaque = null,
2626
?CData $msg_opaque = null
2727
): int {
28-
try {
29-
return (int) Library::{$this->partitionerMethod}(
30-
$topic,
31-
$keydata,
32-
$keylen,
33-
$partition_cnt,
34-
OpaqueMap::get($topic_opaque),
35-
OpaqueMap::get($msg_opaque)
36-
);
37-
} catch (\Throwable $exception) {
38-
error_log($exception->getMessage(), E_ERROR);
39-
}
40-
41-
return RD_KAFKA_PARTITION_UA;
28+
return (int) Library::{$this->partitionerMethod}(
29+
$topic,
30+
$keydata,
31+
$keylen,
32+
$partition_cnt,
33+
OpaqueMap::get($topic_opaque),
34+
OpaqueMap::get($msg_opaque)
35+
);
4236
}
4337

4438
public static function create(string $partitionerMethod): Closure

‎src/RdKafka/FFI/OffsetCommitCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,11 @@ class OffsetCommitCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($consumer),
18-
$err,
19-
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
20-
OpaqueMap::get($opaque)
21-
);
22-
} catch (\Throwable $exception) {
23-
error_log($exception->getMessage(), E_ERROR);
24-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumer),
17+
$err,
18+
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
19+
OpaqueMap::get($opaque)
20+
);
2521
}
2622
}

‎src/RdKafka/FFI/PartitionerCallbackProxy.php

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,11 @@ public function __invoke(
1717
?CData $topic_opaque = null,
1818
?CData $msg_opaque = null
1919
): int {
20-
try {
21-
return (int) ($this->callback)(
22-
$keydata === null ? null : FFI::string($keydata, $keylen),
23-
$partition_cnt,
24-
OpaqueMap::get($topic_opaque),
25-
OpaqueMap::get($msg_opaque)
26-
);
27-
} catch (\Throwable $exception) {
28-
error_log($exception->getMessage(), E_ERROR);
29-
}
30-
31-
return RD_KAFKA_PARTITION_UA;
20+
return (int) ($this->callback)(
21+
$keydata === null ? null : FFI::string($keydata, $keylen),
22+
$partition_cnt,
23+
OpaqueMap::get($topic_opaque),
24+
OpaqueMap::get($msg_opaque)
25+
);
3226
}
3327
}

‎src/RdKafka/FFI/RebalanceCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,11 @@ class RebalanceCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($consumer),
18-
$err,
19-
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
20-
OpaqueMap::get($opaque)
21-
);
22-
} catch (\Throwable $exception) {
23-
error_log($exception->getMessage(), E_ERROR);
24-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumer),
17+
$err,
18+
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
19+
OpaqueMap::get($opaque)
20+
);
2521
}
2622
}

‎src/RdKafka/FFI/StatsCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,12 @@ class StatsCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumerOrProducer, CData $json, int $json_len, ?CData $opaque = null): int
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($consumerOrProducer),
18-
FFI::string($json, $json_len),
19-
$json_len,
20-
OpaqueMap::get($opaque)
21-
);
22-
} catch (\Throwable $exception) {
23-
error_log($exception->getMessage(), E_ERROR);
24-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumerOrProducer),
17+
FFI::string($json, $json_len),
18+
$json_len,
19+
OpaqueMap::get($opaque)
20+
);
2521

2622
return 0;
2723
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /