session flow control disallows delivery.
The requester must keep its incoming-window large enough.
If message loss is unacceptable, use classic queues instead of Direct Reply-To.
String requestQueue ="request-queue";
// create the responder
Responder responder = connection.responderBuilder()
.requestQueue(requestQueue)
.handler((ctx, req)->{
// check whether the requester is still connected (optional)
if(ctx.isRequesterAlive(req)){
String in =newString(req.body(),UTF_8);
String out ="*** "+ in +" ***";
return ctx.message(out.getBytes(UTF_8));
}else{
returnnull;
}
}).build();
// create the requester, it uses direct reply-to by default
Requester requester = connection.requesterBuilder()
.requestAddress().queue(requestQueue)
.requester()
.build();
// create the request message
Message request = requester.message("hello".getBytes(UTF_8));
// send the request
CompletableFuture<Message> responseFuture = requester.publish(request);
// wait for the response
Message response = responseFuture.get(10,TimeUnit.SECONDS);
A complete example is available in the RabbitMQ Amqp1.0 .NET Client repo
conststring requestQueue ="amqp10.net-request-queue";
// create the responder
IResponder responder =await connection.ResponderBuilder().
RequestQueue(requestQueue).Handler(
(context, message)=>
{
// "message" parameter is the incoming message
Trace.WriteLine(TraceLevel.Information,$"[Responder] Message received: {message.BodyAsString()} ");
// create a reply message
IMessage reply = context.Message("reply message");
return Task.FromResult(reply);
}
).BuildAsync();
// create the requester, it uses direct reply-to by default
IRequester requester =await connection.RequesterBuilder().RequestAddress().
Queue(requestQueue).Requester().BuildAsync();
IMessage response =await requester.PublishAsync(
newAmqpMessage("Hello"));
Trace.WriteLine(TraceLevel.Information,$"[Requester] Response received: {response.BodyAsString()}");
%% 1. Requester attaches its receiving link.
OpnConfRequester=OpnConfRequester0#{notify_with_performative=>true},
{ok,ConnRequester}=amqp10_client:open_connection(OpnConfRequester),
{ok,SessionRequester}=amqp10_client:begin_session_sync(ConnRequester),
Source=#{address=>undefined,
durable=>none,
expiry_policy=><<"link-detach">>,
dynamic=>true,
capabilities=>[<<"rabbitmq:volatile-queue">>]},
AttachArgs=#{name=><<"receiver requester">>,
role=>{receiver,Source,self()},
snd_settle_mode=>settled,
rcv_settle_mode=>first},
{ok,ReceiverRequester}=amqp10_client:attach_link(SessionRequester,AttachArgs),
%% Requester learns the broker-generated reply address.
Addr=receive{amqp10_event,{link,ReceiverRequester,{attached,Attach}}}->
#'v1_0.attach'{
source=#'v1_0.source'{
address={utf8,Addr0}}}=Attach,
Addr0
end,
%% Requester must grant link credit before sending the first request.
ok=amqp10_client:flow_link_credit(ReceiverRequester,1000,500),
%% 2. Requester sends the request.
ok=amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id=>RpcId,
reply_to=>Addr},
amqp10_msg:new(DeliveryTag,RequestPayload,true))),
%% 3. Responder receives the request and reads relevant properties.
...
#{message_id:=RpcId,
reply_to:=ReplyToAddr}=amqp10_msg:properties(RequestMsg),
%% Optionally, the responder checks whether the requester is still connected.
{ok,#{queue:=ReplyQueue}}=rabbitmq_amqp_address:to_map(ReplyToAddr),
caserabbitmq_amqp_client:get_queue(LinkPairResponder,ReplyQueue)of
{ok,#{}}->
%% requester is still there
ok;
_->
throw(requester_absent)
end,
%% 4. Responder replies (attached to the anonymous terminus).
ok=amqp10_client:send_msg(
SenderResponder,
amqp10_msg:set_properties(
#{to=>ReplyToAddr,
correlation_id=>RpcId},
amqp10_msg:new(Tag,ReplyPayload,true))),
%% 5. Requester receives the reply.
receive{amqp10_msg,ReceiverRequester,ReplyMsg}->
%% process reply here...
ok
end.
A complete example is available in the client repo
A complete example with the Azure amqp client is available in the tutorials repository.
const requestQueue ="go-amqp1.0-request-queue"
// RPC client creates a responder
responder, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: requestQueue,
Handler:func(ctx context.Context, request *amqp.Message)(*amqp.Message,error){
return request,nil
},
})
requester, err := clientConn.NewRequester(context.TODO(),&rabbitmqamqp.RequesterOptions{
RequestQueueName: requestQueue,
// the option to enable the DirectReplyTo feature
DirectReplyTo:true,
})
resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte("hello")))
m, ok :=<-resp
if!ok {
fmt.Println("timed out waiting for response")
continue
}
fmt.Printf("response: %s\n", m.GetData())
To use Direct Reply-To, a requester must:
amq.rabbitmq.reply-to in no-ack mode.
There is no need to declare this "queue" first (though the client may).reply-to to amq.rabbitmq.reply-to.When forwarding the request, RabbitMQ transparently rewrites reply-to to amq.rabbitmq.reply-to.<opaque-suffix>, where <opaque-suffix> is not meaningful to clients.
The responder then publishes the reply to the default exchange ("") using that value as the routing key.
If the responder will perform expensive work, it can check whether the client has gone away by passively declaring the generated reply queue name on a disposable channel.
Even with passive=false there is no way to create it; the declare either succeeds (0 ready messages, 1 consumer) or fails.
amq.rabbitmq.reply-to and to publish the request.amq.rabbitmq.reply-to is used in basic.consume and the reply-to property as if it were a queue; however it is not.
It cannot be deleted and does not appear in the management plugin or rabbitmqctl list_queues.mandatory flag, amq.rabbitmq.reply-to.* is treated as a queue for routing.
Whether the requester is still present is not checked at routing time.
In other words, a message routed solely to this name is considered "routed", and RabbitMQ will not send a basic.return.basic.consume) per channel.%% 1. Requester consumes from pseudo-queue in no-ack mode.
amqp_channel:subscribe(RequesterChan,
#'basic.consume'{queue=<<"amq.rabbitmq.reply-to">>,
no_ack=true},
self()),
CTagRequester=receive#'basic.consume_ok'{consumer_tag=CTag}->CTag
end,
%% 2. Requester sends the request.
amqp_channel:cast(
RequesterChan,
#'basic.publish'{routing_key=RequestQueue},
#amqp_msg{props=#'P_basic'{reply_to=<<"amq.rabbitmq.reply-to">>,
message_id=RpcId},
payload=RequestPayload}),
%% 3. Responder receives the request.
{ReplyTo,RpcId}=
receive{#'basic.deliver'{consumer_tag=CTagResponder},
#amqp_msg{payload=RequestPayload,
props=#'P_basic'{reply_to=ReplyTo0,
message_id=RpcId0}}}->
{ReplyTo0,RpcId0}
end,
%% 4. Responder replies.
amqp_channel:cast(
ResponderChan,
#'basic.publish'{routing_key=ReplyTo},
#amqp_msg{props=#'P_basic'{correlation_id=RpcId},
payload=ReplyPayload}),
%% 5. Requester receives the reply
receive{#'basic.deliver'{consumer_tag=CTagRequester},
#amqp_msg{payload=ReplyPayload,
props=#'P_basic'{correlation_id=RpcId}}}->
%% process reply here...
ok
end.