Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit e3d6c66

Browse files
Return stream frame header binary in dispatch chunk callback
This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192
1 parent c7f6cad commit e3d6c66

File tree

2 files changed

+10
-18
lines changed

2 files changed

+10
-18
lines changed

‎deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3571,12 +3571,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35713571
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
35723572

35733573
send_file_callback(?VERSION_1,
3574-
Transport,
35753574
_Log,
35763575
#consumer{configuration =
3577-
#consumer_configuration{socket = S,
3578-
subscription_id =
3579-
SubscriptionId,
3576+
#consumer_configuration{subscription_id = SubId,
35803577
counters = Counters}},
35813578
Counter) ->
35823579
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3587,19 +3584,16 @@ send_file_callback(?VERSION_1,
35873584
?REQUEST:1,
35883585
?COMMAND_DELIVER:15,
35893586
?VERSION_1:16,
3590-
SubscriptionId:8/unsigned>>,
3591-
Transport:send(S, FrameBeginning),
3587+
SubId:8/unsigned>>,
35923588
atomics:add(Counter, 1, Size),
35933589
increase_messages_consumed(Counters, NumEntries),
3594-
set_consumer_offset(Counters, FirstOffsetInChunk)
3590+
set_consumer_offset(Counters, FirstOffsetInChunk),
3591+
FrameBeginning
35953592
end;
35963593
send_file_callback(?VERSION_2,
3597-
Transport,
35983594
Log,
35993595
#consumer{configuration =
3600-
#consumer_configuration{socket = S,
3601-
subscription_id =
3602-
SubscriptionId,
3596+
#consumer_configuration{subscription_id = SubId,
36033597
counters = Counters}},
36043598
Counter) ->
36053599
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3611,12 +3605,12 @@ send_file_callback(?VERSION_2,
36113605
?REQUEST:1,
36123606
?COMMAND_DELIVER:15,
36133607
?VERSION_2:16,
3614-
SubscriptionId:8/unsigned,
3608+
SubId:8/unsigned,
36153609
CommittedChunkId:64>>,
3616-
Transport:send(S, FrameBeginning),
36173610
atomics:add(Counter, 1, Size),
36183611
increase_messages_consumed(Counters, NumEntries),
3619-
set_consumer_offset(Counters, FirstOffsetInChunk)
3612+
set_consumer_offset(Counters, FirstOffsetInChunk),
3613+
FrameBeginning
36203614
end.
36213615

36223616
send_chunks(DeliverVersion,
@@ -3686,9 +3680,7 @@ send_chunks(DeliverVersion,
36863680
Retry,
36873681
Counter) ->
36883682
case osiris_log:send_file(Socket, Log,
3689-
send_file_callback(DeliverVersion,
3690-
Transport,
3691-
Log,
3683+
send_file_callback(DeliverVersion, Log,
36923684
Consumer,
36933685
Counter))
36943686
of

‎rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0
52+
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.0
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /