开发者社区 云原生 云消息队列 文章 正文

使用阿里云消息队列

2016年05月11日 9617
版权
版权声明:
本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《 阿里云开发者社区用户服务协议》和 《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写 侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
简介:

使用阿里云消息队列

控制台地址:http://ons.console.aliyun.com/#/home/topic

(1)生成Producer ID

点击"申请发布"

示例代码:

Java代码 收藏代码
  1. package com.alibaba.ons.demo;
  2. import java.util.Properties;
  3. import com.aliyun.openservices.ons.api.Message;
  4. import com.aliyun.openservices.ons.api.ONSFactory;
  5. import com.aliyun.openservices.ons.api.Producer;
  6. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  7. import com.aliyun.openservices.ons.api.SendResult;
  8. public class ProducerClient {
  9. public static void main(String[] args) {
  10. Properties properties = new Properties();
  11. properties.put(PropertyKeyConst.ProducerId, "PID_whuang");
  12. properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
  13. properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
  14. Producer producer = ONSFactory.createProducer(properties);
  15. //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
  16. producer.start();
  17. Message msg = new Message(
  18. //Message Topic
  19. "com_hbjltv",
  20. //Message Tag,
  21. //可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
  22. "TagA",
  23. //Message Body
  24. //任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
  25. "Hello ONS".getBytes()
  26. );
  27. // 设置代表消息的业务关键属性,请尽可能全局唯一。
  28. // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。
  29. // 注意:不设置也不会影响消息正常收发
  30. msg.setKey("ORDERID_100");
  31. //发送消息,只要不抛异常就是成功
  32. SendResult sendResult = producer.send(msg);
  33. System.out.println(sendResult);
  34. // 在应用退出前,销毁Producer对象
  35. // 注意:如果不销毁也没有问题
  36. producer.shutdown();
  37. }
  38. }

(2)生成Consumer ID

点击"申请订阅"

示例代码:

Java代码 收藏代码
  1. public class ConsumerTest {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile");
  5. properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
  6. properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
  7. Consumer consumer = ONSFactory.createConsumer(properties);
  8. consumer.subscribe("com_hbjltv", "*", new MessageListener() {
  9. public Action consume(Message message, ConsumeContext context) {
  10. System.out.println("Receive: " + message);
  11. return Action.CommitMessage;
  12. }
  13. });
  14. consumer.start();
  15. System.out.println("Consumer Started");
  16. }
  17. }

(3) clientId 的限制

阿里云消息队列对clientId的名称有严格限制:

(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,

例如:

CID_tv_mobile@@@86458fd 是合法的

CID_tv_mobile@@86458fd 是非法的,因为只有两个@

(b)总长度不能超过23个字符

例如

CID_tv_mobile@@@86458_A是合法的

CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符



(4)在手机端(客户端)增加订阅逻辑

Java代码 收藏代码
  1. package com.service;
  2. import java.security.InvalidKeyException;
  3. import java.security.NoSuchAlgorithmException;
  4. import org.eclipse.paho.client.mqttv3.MqttClient;
  5. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  6. import org.eclipse.paho.client.mqttv3.MqttException;
  7. import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
  8. import android.app.Service;
  9. import android.content.Context;
  10. import android.content.Intent;
  11. import android.content.SharedPreferences;
  12. import android.os.IBinder;
  13. import com.common.util.SystemHWUtil;
  14. import com.dict.Constants3;
  15. import com.jianli.R;
  16. import com.push.PushCallback;
  17. import com.string.widget.util.ValueWidget;
  18. import com.util.MacSignature;
  19. import com.util.ShopUtil;
  20. /**
  21. * @author Dominik Obermaier
  22. */
  23. public class MQTTService extends Service {
  24. // public static final String BROKER_URL =
  25. // "tcp://broker.mqttdashboard.com:1883";
  26. // public static String BROKER_URL = "tcp://172.16.15.50:1883";
  27. public static String BROKER_URL_FORMAT = "tcp://%s:%s";
  28. // public static final String BROKER_URL = "tcp://test.mosquitto.org:1883";
  29. /*
  30. * In a real application, you should get an Unique Client ID of the device
  31. * and use this, see
  32. * http://android-developers.blogspot.de/2011/03/identifying
  33. * -app-installations.html
  34. */
  35. public static String clientId = null;
  36. /**
  37. * 不能含有英文句点,可以包含下划线
  38. */
  39. public static final String TOPIC = "com_hbjltv";
  40. private MqttClient mqttClient;
  41. // private String ip="182.92.80.122";
  42. /***
  43. * 是否连接上activeMQ
  44. */
  45. private boolean online = false;
  46. boolean isAliyun=false;
  47. public IBinder onBind(Intent intent) {
  48. return null;
  49. }
  50. @Override
  51. public void onCreate() {
  52. super.onCreate();
  53. }
  54. private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{
  55. return new MqttClient(serverURL, clientId,
  56. new MemoryPersistence());
  57. }
  58. /***
  59. *
  60. * @param serverURL
  61. * @param clientId
  62. * : 最大长度:23
  63. * @param isAllowOffline
  64. * @param username
  65. * @param password
  66. * @throws MqttException
  67. */
  68. private void connectAndSubscribe(String serverURL, String clientId,
  69. /* String topicFilter, */boolean isAllowOffline, String username,
  70. String password) throws MqttException {
  71. if(isAliyun){
  72. if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){
  73. return;
  74. }
  75. }
  76. mqttClient = createMqttClient(serverURL, clientId);
  77. MqttConnectOptions options = new MqttConnectOptions();
  78. options.setCleanSession(!isAllowOffline);// mqtt receive offline message
  79. if (ValueWidget.isNullOrEmpty(username)) {
  80. username = null;
  81. }
  82. String sign=null;
  83. if(isAliyun){
  84. try {
  85. sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password);
  86. password=sign;
  87. } catch (InvalidKeyException e) {
  88. e.printStackTrace();
  89. } catch (NoSuchAlgorithmException e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. if (ValueWidget.isNullOrEmpty(password)) {
  94. password = null;
  95. } else {
  96. options.setPassword(password.toCharArray());
  97. }
  98. options.setUserName(username);
  99. options.setConnectionTimeout(10);
  100. options.setKeepAliveInterval(10);
  101. if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
  102. mqttClient = createMqttClient(serverURL, clientId);
  103. }
  104. mqttClient.setCallback(new PushCallback(this));
  105. boolean isSuccess=false;
  106. mqttClient.connect(options);
  107. isSuccess=true;
  108. // Subscribe to all subtopics of homeautomation
  109. // mqttClient.subscribe(topicFilter);
  110. if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
  111. mqttClient = createMqttClient(serverURL, clientId);
  112. }
  113. if(isAliyun){
  114. final String p2ptopic = TOPIC+"/p2p/";
  115. //同时订阅两个topic,一个是基于标准mqtt协议的发布订阅模式,一个是扩展的点对点推送模式
  116. final String[] topicFilters=new String[]{TOPIC,p2ptopic};
  117. mqttClient.subscribe(topicFilters);
  118. }else{
  119. mqttClient.subscribe(new String[] { TOPIC, clientId });
  120. }
  121. }
  122. @Override
  123. public void onStart(Intent intent, int startId) {
  124. final boolean isRestart=intent.getBooleanExtra("isRestart", false);
  125. ShopUtil.logger2("restart MQTT service:"+isRestart);
  126. // super.onStart(intent, startId);
  127. // if (intent == null) {//重启服务时intent 确实为空
  128. // Log.d(Constants.LOG_TAG, "intent is null");
  129. // return;
  130. // }
  131. Context context = getApplicationContext();
  132. clientId = ShopUtil.getIMEI(context);
  133. // Bundle bundle=intent.getExtras();
  134. // String ip=bundle.getString(Constants.ACTIVEMQ_IP);
  135. // final String ip = context.getString(R.string.pushserver_ip);
  136. SharedPreferences preferences = getApplicationContext()
  137. .getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME,
  138. Context.MODE_PRIVATE);
  139. final String ip ="mqtt.ons.aliyun.com";// preferences.getString("pushserver_ip", context.getString(R.string.pushserver_ip));
  140. final String port = preferences.getString("pushserver_port", "1883");
  141. isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS", "false"));
  142. // String topic=bundle.getString(Constants.ACTIVEMQ_TOPIC);
  143. System.out.println("push ip:"+ip);
  144. new Thread(new Runnable() {
  145. /****
  146. * 尝试连接的次数,为什么要尝试连接几次那?
  147. * (1)无wifi时启动,则肯定连接失败,所以尝试连接三次,只要在这个期间启动wifi就可以连接上activeMQ;<br />
  148. * (2)之前连接上,然后断开wifi,然后又启动wifi,<br />
  149. * 这时容易报 "Broker unavailable"异常,暂时不清楚原因,所以也需要继续尝试连接;<br />
  150. *
  151. */
  152. private int tryTime = 5;
  153. @Override
  154. public void run() {
  155. System.out.println(tryTime+","+mqttClient+","+isOnline() );
  156. while (tryTime > 0
  157. && (!isOnline() || mqttClient == null || (!mqttClient
  158. .isConnected())||isRestart)) {
  159. try {
  160. ShopUtil.logger2("start push service");
  161. ShopUtil.logger2("push server:"+ip);
  162. String prefix=Constants3.CONSUMER_ID_TV+"@@@";
  163. int remainingLength=23-prefix.length();
  164. String suffix=null;
  165. if(clientId.length()>remainingLength){
  166. suffix=clientId.substring(0,remainingLength);
  167. }else{
  168. suffix=clientId;
  169. }
  170. String clientId2=prefix+suffix;
  171. connectAndSubscribe(String.format(
  172. MQTTService.BROKER_URL_FORMAT, ip, port),
  173. clientId2, /* topic, */true, ""/*自己申请的access key*/, ""/*secret*/);
  174. ShopUtil.logger2("clientId:" + clientId2);
  175. ShopUtil.logger2("succeed to connect to activeMQ");
  176. setOnline(true);
  177. } catch (MqttException e) {
  178. setOnline(false);
  179. mqttClient=null;
  180. e.printStackTrace();
  181. ShopUtil.logger2("抛异常:"+e.getMessage());
  182. ShopUtil.logger2("ip:" + ip + " ,port:" + port);
  183. try {
  184. Thread.sleep(10000);
  185. } catch (InterruptedException e1) {
  186. e1.printStackTrace();
  187. }
  188. }
  189. tryTime--;
  190. }
  191. }
  192. }).start();
  193. // new Thread(new Runnable() {
  194. // @Override
  195. // public void run() {
  196. // System.out.println("start:"+System.currentTimeMillis());
  197. // try {
  198. // Thread.sleep(10000);
  199. // } catch (InterruptedException e) {
  200. // e.printStackTrace();
  201. // }
  202. // while(true){
  203. // try {
  204. // Thread.sleep(10000);
  205. // if(mqttClient!=null&& !mqttClient.isConnected()){
  206. // System.out.println("disConnected:"+System.currentTimeMillis());
  207. // }
  208. // } catch (InterruptedException e) {
  209. // e.printStackTrace();
  210. // }
  211. // }
  212. // }
  213. // }).start();
  214. }
  215. @Override
  216. public void onDestroy() {
  217. setOnline(false);
  218. try {
  219. ShopUtil.logger2("MQTTService destory");
  220. mqttClient.disconnect(0);
  221. } catch (MqttException e) {
  222. // Toast.makeText(getApplicationContext(),
  223. // "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG)
  224. // .show();
  225. e.printStackTrace();
  226. }
  227. mqttClient = null;
  228. stopForeground(true);
  229. Intent intent = new Intent("com.dbjtech.waiqin.destroy");
  230. sendBroadcast(intent);
  231. }
  232. public boolean isOnline() {
  233. return online;
  234. }
  235. public void setOnline(boolean online) {
  236. this.online = online;
  237. }
  238. @Override
  239. public int onStartCommand(Intent intent, int flags, int startId) {
  240. flags = START_STICKY;
  241. return super.onStartCommand(intent, flags, startId);
  242. }
  243. }
目录
热门文章
最新文章

AltStyle によって変換されたページ (->オリジナル) /