Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

ohun/stomp-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

10 Commits

Repository files navigation

A stomp client base on netty.

Subscribe:

public class StompSubscribeTest {
 @Test
 public void testSub() throws Exception {
 StompClientManager stompClientManager = new StompClientManager();
 stompClientManager.connect("stomp://username:password@host:port");
 stompClientManager.createConsumer("/topic/logon")
 .id("wqteam_test")
 .ackMode(AckMode.AUTO)
 .handler(new MessageHandler() {
 @Override
 public void onMessage(Message message) {
 System.out.println(message);
 }
 })
 .subscribe();
 LockSupport.park();
 }
}

Sender:

public class StompSenderTest {
 @Test
 public void testSend() throws Exception {
 StompClientManager stompClientManager = new StompClientManager();
 stompClientManager.connect("stomp://username:password@10.232.136.85:61613");
 MessageProducer producer = stompClientManager.createProducer("/topic/logon3");
 for (int i = 0; i < 100; i++) {
 if (i > 5) {
 StompTransaction tx = producer.begin();
 Thread.sleep(1000);
 tx.send(i + "风格的歌");
 tx.abort();
 } else {
 producer.send(i + "大苏打");
 }
 try {
 ReceiptFuture future = producer.sendW((i + "大苏打").getBytes("UTF-8"));
 future.await();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 LockSupport.park();
 }
}

For Spring:

<bean id="stompClientManager" class="com.ohun.stomp.StompClientManager"
 init-method="start" destroy-method="stop">
 <!--<property name="uri" 
 value="stomp://username:password@10.232.136.85:61613?connectTimeout=3000"/>
 两种配置方式是一样的,看个人喜好-->
 <property name="config">
 <bean class="com.ohun.stomp.common.StompConfig">
 <property name="host" value="${wangxin.mq.stomp.host}"/>
 <property name="port" value="${wangxin.mq.stomp.port}"/>
 <property name="login" value="${wangxin.stomp.username}"/>
 <property name="pass" value="${wangxin.stomp.password}"/>
 <property name="connectTimeout" value="3000"/>
 <property name="heartbeatX" value="600000"/>
 <property name="heartbeatY" value="100000"/>
 <property name="monitorPeriod" value="60"/>
 <property name="connectCountPreHost" value="1"/>
 </bean>
 </property>
</bean>
<bean id="wx2PubMsgListener" class="com.ohun.test.WX2PublicMsgListener"
 init-method="init" destroy-method="destroy">
 <property name="topic" value="/topic/pamsgfromwx"/>
 <property name="clientId" value="wxadmin"/>
</bean>
public class WX2PublicMsgListener implements MessageHandler {
 @Resource
 private StompClientManager stompClientManager;
 public void init() {
 this.executor = newExecutor();
 MessageConsumer consumer = stompClientManager.createConsumer(topic);
 consumer.id(clientId).executor(executor).handler(this).subscribe();
 }
 @Override
 public void onMessage(final Message message) {
 logger.error(message.getTextBody())
 }
 public void destroy() throws Exception {
 executor.shutdown();
 }
 private ThreadPoolExecutor newExecutor() {
 final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
 .daemon(true).namingPattern("wx-2-pub-%d").build();
 return new ThreadPoolExecutor(2, poolSize, 5L, TimeUnit.MINUTES,
 new LinkedBlockingQueue<Runnable>(queueSize),
 threadFactory,
 new RejectedExecutionHandler() {
 @Override
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 logger.warn("one message task was rejected total="
 + rejectedCount.incrementAndGet()
 + ",poolStatus=" + poolStatus());
 }
 });
 }
 private final Logger logger = LoggerFactory.getLogger(WX2PublicMsgListener.class);
 private AtomicInteger rejectedCount = new AtomicInteger(0);
 private int poolSize = 10, queueSize = 100;
 private ThreadPoolExecutor executor;
 private String topic;
 private String clientId;
}

About

stomp client for java base netty

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors

Languages

AltStyle によって変換されたページ (->オリジナル) /