session flow control disallows delivery. The requester must keep its incoming-window large enough.
  • The broker’s AMQP 1.0 writer process cannot send replies fast enough to the requester.
  • If message loss is unacceptable, use classic queues instead of Direct Reply-To.

    Examples: AMQP 1.0

    String requestQueue ="request-queue";

    // create the responder
    Responder responder = connection.responderBuilder()
    .requestQueue(requestQueue)
    .handler((ctx, req)->{
    // check whether the requester is still connected (optional)
    if(ctx.isRequesterAlive(req)){
    String in =newString(req.body(),UTF_8);
    String out ="*** "+ in +" ***";
    return ctx.message(out.getBytes(UTF_8));
    }else{
    returnnull;
    }
    }).build();

    // create the requester, it uses direct reply-to by default
    Requester requester = connection.requesterBuilder()
    .requestAddress().queue(requestQueue)
    .requester()
    .build();

    // create the request message
    Message request = requester.message("hello".getBytes(UTF_8));
    // send the request
    CompletableFuture<Message> responseFuture = requester.publish(request);
    // wait for the response
    Message response = responseFuture.get(10,TimeUnit.SECONDS);

    Usage in AMQP 0.9.1

    To use Direct Reply-To, a requester must:

    1. Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this "queue" first (though the client may).
    2. Set the request message’s reply-to to amq.rabbitmq.reply-to.

    When forwarding the request, RabbitMQ transparently rewrites reply-to to amq.rabbitmq.reply-to.<opaque-suffix>, where <opaque-suffix> is not meaningful to clients. The responder then publishes the reply to the default exchange ("") using that value as the routing key.

    If the responder will perform expensive work, it can check whether the client has gone away by passively declaring the generated reply queue name on a disposable channel. Even with passive=false there is no way to create it; the declare either succeeds (0 ready messages, 1 consumer) or fails.

    AMQP 0.9.1 Caveats and Limitations

    Examples: AMQP 0.9.1

    %% 1. Requester consumes from pseudo-queue in no-ack mode.
    amqp_channel:subscribe(RequesterChan,
    #'basic.consume'{queue=<<"amq.rabbitmq.reply-to">>,
    no_ack=true},
    self()),
    CTagRequester=receive#'basic.consume_ok'{consumer_tag=CTag}->CTag
    end,

    %% 2. Requester sends the request.
    amqp_channel:cast(
    RequesterChan,
    #'basic.publish'{routing_key=RequestQueue},
    #amqp_msg{props=#'P_basic'{reply_to=<<"amq.rabbitmq.reply-to">>,
    message_id=RpcId},
    payload=RequestPayload}),

    %% 3. Responder receives the request.
    {ReplyTo,RpcId}=
    receive{#'basic.deliver'{consumer_tag=CTagResponder},
    #amqp_msg{payload=RequestPayload,
    props=#'P_basic'{reply_to=ReplyTo0,
    message_id=RpcId0}}}->
    {ReplyTo0,RpcId0}
    end,

    %% 4. Responder replies.
    amqp_channel:cast(
    ResponderChan,
    #'basic.publish'{routing_key=ReplyTo},
    #amqp_msg{props=#'P_basic'{correlation_id=RpcId},
    payload=ReplyPayload}),

    %% 5. Requester receives the reply
    receive{#'basic.deliver'{consumer_tag=CTagRequester},
    #amqp_msg{payload=ReplyPayload,
    props=#'P_basic'{correlation_id=RpcId}}}->
    %% process reply here...
    ok
    end.

    AltStyle によって変換されたページ (->オリジナル) /