开发者社区 云原生 云消息队列 正文

RocketMQ SimpleConsumer异步订阅消息demo

RocketMQ SimpleConsumer异步订阅消息demo

展开
收起
嘟嘟嘟嘟嘟嘟 2024年08月28日 08:12:43 226 分享 版权

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

3 条回答
写回答
取消 提交回答
  • 根据您的问题,您希望了解如何使用RocketMQ的SimpleConsumer实现异步订阅消息的示例代码。虽然提供的知识内容没有直接包含异步订阅的Demo代码,但我们可以基于RocketMQ的API文档和常规实践来构建一个简单的异步订阅消息的示例。下面是一个简化的Java示例代码,展示如何使用SimpleConsumer(更准确地说,应使用org.apache.rocketmq.client.consumer.DefaultMQPushConsumer,因为SimpleConsumer更多指的是低级API,而异步订阅通常推荐使用PushConsumer模式)实现异步消费消息的功能。
    异步订阅消息的示例代码
    首先,确保您已添加了Apache RocketMQ客户端依赖到您的项目中。如果您使用Maven,可以在pom.xml中添加如下依赖:

    org.apache.rocketmq
    rocketmq-client
    4.9.3 
    

    接下来是异步消费消息的Java代码示例:
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.List;
    public class AsyncConsumerExample {
    public static void main(String[] args) throws Exception {
    // 实例化消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
    // 设置NameServer地址
    consumer.setNamesrvAddr("your_nameserver_addr");
    // 订阅主题和Tag
    consumer.subscribe("your_topic", "*");
    // 设置消费模式,如从第一条开始消费
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    // 打印每个消息的属性和内容
    for (MessageExt msg : msgs) {
    System.out.printf("Received message: %s %n", new String(msg.getBody()));
    }
    // 返回消费状态,这里简单示例为成功消费
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    // 启动消费者
    consumer.start();
    System.out.println("Async Consumer Started.");
    }
    }
    此回答整理自钉群"群2-Apache RocketMQ 中国开发者钉钉群"

    2024年08月28日 10:45:05
    赞同 22 展开评论
  • import com.aliyun.openservices.ons.api.*;
    import com.aliyun.openservices.ons.api.consumer.MessageListener;
    public class SimpleConsumerDemo {
     public static void main(String[] args) {
     //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。
     ClientServiceProvider provider = ClientServiceProvider.loadService();
     String topic = "Your Topic";
     FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
     SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
     //设置消费者分组。
     .setConsumerGroup("Your ConsumerGroup")
     //设置接入点。
     .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
     //设置预绑定的订阅关系。
     .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
     .build();
     List<MessageView> messageViewList = null;
     try {
     //SimpleConsumer需要主动获取消息,并处理。
     messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
     messageViewList.forEach(messageView -> {
     System.out.println(messageView);
     //消费处理完成后,需要主动调用ACK提交消费结果。
     try {
     simpleConsumer.ack(messageView);
     } catch (ClientException e) {
     e.printStackTrace();
     }
     });
     } catch (ClientException e) {
     //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
     e.printStackTrace();
     } 
    }
    }
    

    image.png
    参考链接
    https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-types
    回答不易请采纳

    2024年08月28日 10:14:34
    赞同 32 展开评论
  • SimpleConsumer
    SimpleConsumer是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

    使用方式
    SimpleConsumer的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:

     ClientServiceProvider provider = ClientServiceProvider.loadService();
     String topic = "Your Topic";
     FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
     SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
     //设置消费者分组。
     .setConsumerGroup("Your ConsumerGroup")
     //设置接入点。
     .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
     //设置预绑定的订阅关系。
     .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
     .build();
     List<MessageView> messageViewList = null;
     try {
     //SimpleConsumer需要主动获取消息,并处理。
     messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
     messageViewList.forEach(messageView -> {
     System.out.println(messageView);
     //消费处理完成后,需要主动调用ACK提交消费结果。
     try {
     simpleConsumer.ack(messageView);
     } catch (ClientException e) {
     e.printStackTrace();
     }
     });
     } catch (ClientException e) {
     //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
     e.printStackTrace();
     }
    

    image.png
    参考文档https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-types?spm=a2c4g.11186623.0.i28

    2024年08月28日 08:47:15
    赞同 26 展开评论
相关问答

AltStyle によって変換されたページ (->オリジナル) /

云原生

云消息队列

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系列产品 Serverless 化。RocketMQ 中文社区:https://rocketmq-learning.com/

我要提问

相关文章

  • 别再迷信离线了:流 + 在线模型,才是实时推荐的正解
  • 库存保卫战:电商系统防超卖的5把利刃与Java实战
  • 2 核16G云服务器多少钱?2026年阿里云 2 核 16G 云服务器:价格、性能与适用场景解析
  • GEO时代新基建:用"意图词"在AI流量入口为企业精准卡位
  • 别再被 Exactly-Once 忽悠了:端到端一致性到底是怎么落地的?
  • 热门讨论

    热门文章

    还有其他疑问?
    咨询AI助理