1

I am trying to do some test with RocketMQ, and I use C# with Newlife.RocketMQ. I started RocketMQ on my pc and I see the command window open and printing messages -- so that is ok.

So I wrote a simple C# program to test this:

This is the WPF window:

<Window x:Class="WpfApp1.MainWindow"
 xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
 xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
 xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
 xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
 xmlns:local="clr-namespace:WpfApp1"
 mc:Ignorable="d"
 Title="MainWindow" Height="450" Width="800" >
<DockPanel >
 <TextBox x:Name="txtProduce" Text="message 1" DockPanel.Dock="Top" Height="30" FontSize="16"/>
 <Button x:Name="btnProduce" Content="Produce" HorizontalAlignment="Left" VerticalAlignment="Top" Width="75" Click="BtnProduce_Click" DockPanel.Dock="Top"/>
 <DataGrid x:Name="dgProduce" Height="100" DockPanel.Dock="Top" IsReadOnly="True"/>
 <Label Content="Consumer" DockPanel.Dock="Top"/>
 <DataGrid x:Name="dgConsume" IsReadOnly="True"/>
</DockPanel>
</Window>

And now the C# code:

public partial class MainWindow : Window
{
 Producer _producer;
 Consumer _consumer;
 ObservableCollection<SendResult> _results = new ObservableCollection<SendResult>();
 object _resultsLock = new object();
 ObservableCollection<MessageExt> _messages = new ObservableCollection<MessageExt>();
 object _messagesLock = new object();
 public MainWindow()
 {
 InitializeComponent();
 dgProduce.ItemsSource = _results;
 BindingOperations.EnableCollectionSynchronization(_results, _resultsLock);
 dgConsume.ItemsSource = _messages;
 BindingOperations.EnableCollectionSynchronization(_messages, _messagesLock);
 _producer = new Producer
 {
 Topic = "TopicTest",
 NameServerAddress = "localhost:9876",
 RetryTimesWhenSendFailed = 10
 };
 _producer.Start();
 _consumer = new Consumer
 {
 Topic = "TopicTest",
 NameServerAddress = "localhost:9876"
 };
 _consumer.OnConsume = (q, ms) =>
 {
 foreach(var item in ms.ToList())
 _messages.Add(item);
 return true;
 };
 _consumer.Start();
 }
 private int _i = 1;
 private void BtnProduce_Click(object sender, RoutedEventArgs e)
 {
 try
 {
 var content = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss.fff") + ":" + txtProduce.Text;
 var message = new Message()
 {
 Body = System.Text.Encoding.Default.GetBytes(content),
 Keys = (_i++).ToString(),
 Tags = _i % 2 == 0 ? "even" : "odd",
 Flag = 0,
 WaitStoreMsgOK = true
 };
 var sr = _producer.Publish(message);
 _results.Add(sr);
 txtProduce.Text = "message " + _i;
 }
 catch (Exception ex)
 {
 MessageBox.Show(ex.Message);
 }
 }
}

And now I see problems:

  1. "message 3" appear twice in the consumer message list.
  2. when I start 2 instances of my application, when a message is sent, it is consumed by either application instance 1 or instance 2, and not both.

Why this happen?

asked Nov 30, 2022 at 7:08

1 Answer 1

0

question 1:after return true when consumed, the message will never comes to you again unless commit fail.

question 2: rocketmq has two ways of consuming: cluster and broadcast.

i.e: 2 consumer instances, 100 messages sent,

for cluster: 50 for consumer instance 1, another 50 for consumer instance 2

for broadcast: 100 for consumer instance 1, same 100 for consumer instance 2

consume doc : https://rocketmq.apache.org/docs/4.x/consumer/02push/#cluster-and-broadcast-mode

answered Jan 31, 2023 at 14:28
Sign up to request clarification or add additional context in comments.

Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.