2
\$\begingroup\$

After learning the basics (isolated state, communication and computation) of the the Actor-Model, I wrote my own implementation and I'm ready for a thorough code review.

You will be able to see that in the test code I've created 2 tasks which are simultaneously sends multiple messages to an instance of the Actor class.

What I'm wondering the most is if the use of threading within the actor is correct for processing the incoming messages or should I process messages without a thread.

Full code:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ActorModel
{
 /// <summary>
 /// Client/Test Code
 /// </summary>
 public static class Program
 {
 private static void Main()
 {
 var actor = new Actor<string>(x => Console.WriteLine(x.ToUpper())).Start();
 var task1 = Task.Factory.StartNew(() => SendMessagesToActor(actor));
 var task2 = Task.Factory.StartNew(() => SendMessagesToActor(actor));
 Task.WaitAll(new[] {task1, task2});
 Console.WriteLine();
 Console.WriteLine();
 Console.WriteLine("PRESS ENTER TO STOP THE ACTOR");
 Console.ReadLine();
 actor.Stop();
 }
 private static void SendMessagesToActor(Actor<string> actor)
 {
 var counter = 0;
 while (counter < 5)
 {
 actor.Send(String.Format("message #[{0}] from thread #[{1}]", counter,
 Thread.CurrentThread.ManagedThreadId));
 Thread.Sleep(100); // To avoid of OutOfMemory issues
 counter++;
 }
 }
 }
 /// <summary>
 /// Actor-Based Class 
 /// </summary>
 /// <remarks>basics: Immutability, Communication and Computation</remarks>
 /// <typeparam name="TMessage"></typeparam>
 public class Actor<TMessage> where TMessage : class
 {
 private volatile bool _started;
 private readonly Action<TMessage> _messageHandler;
 private readonly ConcurrentQueue<TMessage> _messagesQueue; // Consider replace with TPL workflow
 private readonly Task _processingTask;
 private readonly CancellationTokenSource _source;
 public Actor(Action<TMessage> messageHandler)
 {
 if (messageHandler == null)
 {
 throw new ArgumentNullException("messageHandler");
 }
 _messagesQueue = new ConcurrentQueue<TMessage>();
 _messageHandler = messageHandler;
 _source = new CancellationTokenSource();
 _processingTask = new Task(() => ProcessMessages(_source.Token), _source.Token, TaskCreationOptions.LongRunning);
 }
 //---------------------------------------------------------------------------------------------------------------------------------------------------
 public Actor<TMessage> Start()
 {
 if (!_started)
 {
 _processingTask.Start();
 _started = true;
 }
 return this;
 }
 public void Stop()
 {
 Console.WriteLine("PROCESSING STOP REQUESTED");
 _source.Cancel();
 }
 //---------------------------------------------------------------------------------------------------------------------------------------------------
 public void Send(TMessage message)
 {
 _messagesQueue.Enqueue(message); // any capacity bounding is required here?
 }
 //---------------------------------------------------------------------------------------------------------------------------------------------------
 private void ProcessMessages(CancellationToken ct)
 {
 while (true)
 {
 if (_messagesQueue.Count > 0) 
 {
 TMessage message;
 var hasRemoved = _messagesQueue.TryDequeue(out message);
 if (hasRemoved)
 {
 _messageHandler(message);
 }
 continue;
 }
 if (ct.IsCancellationRequested)
 {
 Console.WriteLine("PROCESSING STOPED");
 return;
 }
 }
 }
 }
}
asked Feb 19, 2016 at 12:00
\$\endgroup\$
1
  • \$\begingroup\$ Are you aware of Akka.net? \$\endgroup\$ Commented Feb 28, 2016 at 1:10

1 Answer 1

1
\$\begingroup\$

There is one drawback(?) in your actor, in some cases it can't be interrupted.
Actor can't be stopped untill there are tasks in inner queue.
So if some outer threads will enque new tasks little bit faster then actor can process it, then you wan't be able to stop your actor.
If it's an actors feature - ok, else you can fix it by removing continue or placing cancellation check at the top of the loop. Another way to handle this case is changing Send method behaviour.

answered Feb 19, 2016 at 15:36
\$\endgroup\$
1
  • \$\begingroup\$ Thanks, it's actually a good point if I want to enforce stopping the actor \$\endgroup\$ Commented Feb 21, 2016 at 16:18

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.