求助: activemq中的topic,有时会丢失,消息接收端收不到消息
发布于 6 年前 作者 zhilongyan 4805 次浏览 来自 问答

业务逻辑

一共3个项目.其中一个是和wx相关的.他在收到wx发过来的支付成功消息之后,将信息广播出去,另外两个项目监听这个topic,去做对应的处理.

遇到的问题:

另外2个项目会出现接收不到topic的情况.但是不是一直没有,是偶尔出现. 并且这两个项目丢失的情况不是同一个topic丢失了,而是有的topic一个项目监听到了,另外一个没有收到,每个项目都有大约1/5的几率收不到信息,

这种的不知道该如何下手去处理.

/**
 * activemq
 *
 * [stompit](https://github.com/gdaws/node-stomp)
 */
"use strict";
const {activemq} = require('../config/settings').mq;
const stompit = require('stompit');
const connectOptions = {
 'host' : activemq.host,
 'port' : 61613,
 'connectHeaders': {
 'host' : '/',
 'login' : activemq.user,
 'passcode' : activemq.passwd,
 'heart-beat': '5000,5000'
 }
};
const connectionManager = new stompit.ConnectFailover();
connectionManager.addServer(connectOptions);
// alwarsConnected:
const channel = new stompit.Channel(connectionManager); // 使用这种的会自动重连
// 将任务添加到队列中
exports.publishQueue = publishQueue;
function publishQueue(name, message) {
 const sendHeaders = {
 'destination' : '/queue/' + name,
 'content-type': 'text/plain'
 };
 
 log.silly('添加任务: ', name);
 channel.send(sendHeaders, message, function (err) {
 log.silly('添加任务成功: ', name);
 if (err) {
 log.error('amq -> publishQueue: ' + err)
 }
 });
}
// 发送广播
exports.publishTopic = publishTopic;
function publishTopic(name, message) {
 
 const sendHeaders = {
 'destination' : '/topic/' + name,
 'content-type': 'text/plain'
 };
 
 channel.send(sendHeaders, message, function (err) {
 log.silly('发送广播成功:', name, message);
 if (err) {
 log.error('amq -> publishTopic: ' + err)
 }
 });
}
// 接受广播
exports.subscribeTopic = subscribeTopic;
async function subscribeTopic(name, cb) {
 
 const subscribeHeaders = {
 'destination': '/topic/' + name,
 'ack' : 'client-individual'
 };
 
 channel.subscribe(subscribeHeaders, function (error, message) {
 log.silly('接收广播成功: ', name);
 if (error) {
 return log.error('activemq, subscribeTopic:', error);
 }
 message.readString('utf-8', function (error, body) {
 if (error) {
 return log.error('activemq, subscribeTopic: readString: ', error);
 }
 // channel.ack(message);
 typeof cb === 'function' && cb(body);
 channel.ack(message);
 });
 })
}
// 读取任务
exports.subscribeQueue = subscribeQueue;
async function subscribeQueue(name, cb) {
 const subscribeHeaders = {
 'destination': '/queue/' + name,
 'ack' : 'client-individual'
 };
 
 channel.subscribe(subscribeHeaders, function (error, message) {
 log.silly('读取任务成功: ', name);
 if (error) {
 return log.error('activemq, subscribeQueue:', error, message);
 }
 message.readString('utf-8', function (error, body) {
 if (error) {
 return log.error('activemq, subscribeQueue readString:', error, body);
 }
 // channel.ack(message);
 typeof cb === 'function' && cb(body, channel, message);
 // channel.ack(message); // 这个在每个处理完队列中的任务的时候都必须执行,否则系统会认为这个任务没有执行完成,下次重启的时候会重新读出来,并且堆积的也有数量限制
 });
 })
}
回到顶部

AltStyle によって変換されたページ (->オリジナル) /