Please take a look at the following code. This is my attempt at understanding concurrent applications.
class Program
{
static void Main(string[] args)
{
var consumers = new List<Consumer>
{
new Consumer(1000),
new Consumer(900),
new Consumer(800),
new Consumer(700),
new Consumer(600),
};
var resourceManager = new ResourceManager(consumers);
resourceManager.Process();
}
}
public class Resource
{
private int Capacity { get; set; } = 1000;
private Guid? _currentConsumer;
public int GetCapacity(Guid? id)
{
while (id.HasValue && _currentConsumer.HasValue && id != _currentConsumer)
{
Thread.Sleep(5);
}
_currentConsumer = id;
return Capacity;
}
public void SetCapacity(int cap, Guid id)
{
if (_currentConsumer.HasValue && id != _currentConsumer)
return;
Capacity = cap;
_currentConsumer = null;
}
}
public class Consumer
{
private readonly int _sleep;
private Guid _id = Guid.NewGuid();
public Consumer(int sleep)
{
_sleep = sleep;
}
public void ConsumeResource(Resource resource)
{
var capture = resource.GetCapacity(_id);
Thread.Sleep(_sleep); // some calsulations and stuff
if (resource.GetCapacity(_id) != capture)
throw new SystemException("Something went wrong");
resource.SetCapacity(resource.GetCapacity(_id) - 1, _id);
}
}
public class ResourceManager
{
private readonly List<Consumer> _consumers;
public readonly Resource _resource;
public ResourceManager(List<Consumer> consumers)
{
_consumers = consumers;
_resource = new Resource();
}
public void Process()
{
Parallel.For(0, _consumers.Count, i =>
{
var consumer = _consumers[i];
consumer.ConsumeResource(_resource);
});
}
}
The lines of code from Consumer::ConsumeResource
Thread.Sleep(_sleep); // some calsulations and stuff
if (resource.GetCapacity(_id) != capture)
throw new SystemException("Something went wrong");
are meant to simulate a real application situation, when there are several consumers that might use the same resource, and which calculations can be broken if resource's state changes in process of calculations (probably by another consumer).
I created a solution for that, using good old Thread.Sleep
, therefore locking resource from being used while it is already being used by another consumer, but I feel like there are more correct ways to achieve the same.
Looking forward for your review notes!
EDITED:
After some research about lock
s and stuff, I wrote that little helper class:
public class ConcurrentAccessProvider<TObject>
{
private readonly Func<TObject> _getter;
private readonly Action<TObject> _setter;
private readonly object _lock = new object();
public ConcurrentAccessProvider(Func<TObject> getter, Action<TObject> setter)
{
_getter = getter;
_setter = setter;
}
public TObject Get()
{
lock (_lock)
{
return _getter();
}
}
public void Set(TObject value)
{
lock (_lock)
{
_setter(value);
}
}
public void Access(Action accessAction)
{
lock (_lock)
{
accessAction();
}
}
}
With that, I rewrote Resource
and Consumer
in order to make it thread-safe:
public class Resource
{
public ConcurrentAccessProvider<int> CapacityAccessProvider { get; }
private int _capacity;
public Resource()
{
CapacityAccessProvider = new ConcurrentAccessProvider<int>(() => _capacity, val => _capacity = val);
}
public int Capacity
{
get => CapacityAccessProvider.Get();
set => CapacityAccessProvider.Set(value);
}
}
public class Consumer
{
private readonly int _sleep;
public Consumer(int sleep)
{
_sleep = sleep;
}
public void ConsumeResource(Resource resource)
{
resource.CapacityAccessProvider.Access(() =>
{
var capture = resource.Capacity;
Thread.Sleep(_sleep); // some calsulations and stuff
if (resource.Capacity != capture)
throw new SystemException("Something went wrong");
resource.Capacity -= 1;
Console.WriteLine(resource.Capacity);
});
}
}
In the provided example those manipulations effectively kill all possible profits from concurrency, but it is because there is only one Resource
instance. In real world application when there are thousands of resources and only several conflicting cases, that will work just fine.
Nevertheless, I would still love to hear about how I could improve the concurrency related code!
1 Answer 1
Let us first examine the content of ConsumeResource
to understand why some form of locking
(now provided by resource.CapacityAccessProvider.Access
) is needed. The ConsumeResource
does this:
- Get available resource total to local variable
capture = resource.Capacity
- Do some work (
Thread.Sleep
) assuming nobody changed available resources. - Check that nobody used any resource, that it still is same as our
capture
, throw if not. - Update available resource total, reducing it by the amount the work consumed.
The problem, if no locking
was used, would be that two or more consumers (threads) can simultaneously reach resource.SetCapacity(resource.GetCapacity(_id) - 1, _id);
passing all checks. Let us imagine having 10 of the resource (e.g. steel) and 2 workers (producing nails), both (threads) could reach that line changing total resource to 10-1=9, but it should be 8! SetCapacity(resource.GetCapacity(_id) - 1)
consists of three operations: get, substract and set. All the threads will get
10, substract
1 and reach the set
having 9 in hand.
resource.CapacityAccessProvider.Access
avoids this race-condition, by locking the whole process and you have correctly identified, that it does not run concurrently (now, when the code got wrapped by the Access
, which was not originally there). The question is: Does the very work need to acces shared state? Or just:
- Aquire resource (check that it is available and reduce total if it is)
- Do some work with the acquired resource
- Maybe output product and maybe loop (goto 1).
The difference is, that only the first part needs to be locked:
public void ConsumeResource(Resource resource)
{
// acquire - only one thread can do this, other threads must wait
lock(resource) // or resource.CapacityAccessProvider.Access(() =>
{
var capture = resource.GetCapacity(_id);
if (capture < 1)
throw new InvalidOperationException("Not enough resource");
resource.SetCapacity(capture - 1, _id);
}
// consume (work) - any number of threads can do this
Thread.Sleep(_sleep);
// maybe output:
// lock(output) output.Add(nail);
}
Another alternative could be to use atomic operation and the acquisition should probably become part of Resource
(which is kinda resource manager and ResourceManager
is rather process manager).
Explore related questions
See similar questions with these tags.
Capacity
and what it is doing with that? \$\endgroup\$ConsumeResource
and possible race-conditions, lack of locking. See lock and Interlocked \$\endgroup\$capacity
and stuff like that. It is only to demonstrate that I somewhat rely onResource
s state. In real application it is something like a cell, which can contain only one object at a time, and while one thread is calculating what should be done with that cell, the other might be doing the same, and when one thread completes calculations and places new object in the cell, the other thread, which finishes calculations a few ticks later, cannot use already occupied cell. Hope this helps ;D \$\endgroup\$ConsumeResource
not wrapped by the lock/Access and it should be ok, the points are still valid. \$\endgroup\$