I have single threaded C# console application that uses FileSystemWatcher
to watch folder for new files.
My app sees a file being written, waits until it is released, then picks it up and processes it. At the same time it writes a rolling log to a text file:
2014年08月06日 16:20:1.500 - Found file : C:\test1.pdf 2014年08月06日 16:20:1.510 - Waiting for file to become available 2014年08月06日 16:20:2.010 - Processing file 2014年08月06日 16:20:8.256 - Finished processing file: C:\test1.pdf 2014年08月06日 16:20:8.785 - File C:\test2.pdf found! etc..etc..
The current method ensures the following:
- Each file is processed in the order that
FileSystemWatcher
sees it. - The log file is written in a linear fashion, one file at a time.
Files created in the folder seem to be cached FileSystemWatcher
until the current file has finished processing.
This works OK but I don't like the fact that FileSystemWatcher
is caching it, it must have a buffer limit somewhere and files might drop of the end.
I think I need two threads:
FileSystemWatcher
thread that sees the new files and passes to a 'Files to Process' collection.- File processing thread that sees there are items in the 'Files to Process' collection and processes them, FIFO style.
I started work on this and decided to use an ObservableCollection
for the 'Files to Process' list, I thought I could hang methods off the NotifyCollectionChangedAction
events but I am a bit stuck as to where to now put the thread.
My questions are:
- Do I need two threads?
- Is
ObservableCollection
the best object to use to manage the list of files to process? - Where do I put a second thread? I am guessing that I need a new thread each time a new file is added to the
ObservableCollection
but won't that trigger a thread for each file, then the log will be jumbled up with each file that is added to the collection?
The files that FileSystemWatcher
pick up become Document
objects.
Here is the code I have so far:
//Make the objects and set events
watcher = new FileSystemWatcher();
watcher.Created += new FileSystemEventHandler(AddDocument);
docsToProcess = new ObservableCollection<Document>();
docsToProcess.CollectionChanged += new NotifyCollectionChangedEventHandler(CollectionChanged);
//Called by FileSystemWatcher when it sees a new file
void AddDocument(object source, FileSystemEventArgs f)
{
Document doc = new Document(f.FullPath);
docsToProcess.Add(doc);
}
//Called when the collection changes
void CollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
{
switch (e.Action)
{
case NotifyCollectionChangedAction.Add:
foreach (Document doc in e.NewItems)
{
log.Write("Found file : " + doc.FullPath);
ProcessDoc(doc);
}
break;
case NotifyCollectionChangedAction.Remove:
foreach (Document doc in e.OldItems)
{
log.Write("Finished Processing file : " + doc.FullPath);
}
break;
default:
break;
}
}
//Called when a new document is added to the collection
void ProcessDoc(Document doc)
{
//Do processing stuff to the document, also write to the log
//Then remove itself from the collection
docsToProcess.Remove(doc);
}
-
\$\begingroup\$ How exactly are you waiting for the file to become available? \$\endgroup\$svick– svick2014年08月07日 19:42:45 +00:00Commented Aug 7, 2014 at 19:42
-
\$\begingroup\$ @svick I wait by attempting to open the file exclusively, they are pdfs dropped onto an FTP server. If I cannot open I wait 30 seconds and try again. \$\endgroup\$chazjn– chazjn2014年08月07日 19:47:35 +00:00Commented Aug 7, 2014 at 19:47
2 Answers 2
Files created in the folder seem to be cached
FileSystemWatcher
until the current file has finished processing.
Not really cached, FileSystemWatcher
just raises its events one at a time. So, until your event handler returns, you won't get another notification. And since you execute all your code in that event handler, that can take a very long time.
To avoid missing events, follow these guidelines:
- [...]
- Keep your event handling code as short as possible.
So you shouldn't do it this way.
Do I need two threads?
If you want to make reasonably sure you won't miss anything, yes, you need (at least) two threads.
Is
ObservableCollection
the best object to use to manage the list of files to process?
In the current state, ObservableCollection
doesn't make much sense, because there is no list, you will only ever have one item in the collection.
If you switched to two (or more) threads, ObservableCollection
is still not a great choice, since it's not thread-safe.
Where do I put a second thread? I am guessing that I need a new thread each time a new file is added to the
ObservableCollection
but won't that trigger a thread for each file, then the log will be jumbled up with each file that is added to the collection?
If you wanted to process each file in parallel, you would need more threads. To keep the log clean (and to perform any other operations that are not thread-safe), you should use a lock.
In any case, you shouldn't directly use Thread
s. Instead, you should use Task
s, or some higher-level constructs (see below for more), since they are more efficient and easier to work with.
But if you don't need parallelism, a single processing thread (i.e. two threads in total) is enough and it also means you don't need any locks (assuming only this thread writes to the log).
Some specific options on how you could implement this:
Use
ActionBlock
from TPL Dataflow (requires .Net 4.5). This could be the simplest option, since it means you don't need to create anyThread
s orTask
s manually.In the event handler, you would call
Post()
, and the work would be handled by the delegate that you passed toActionBlock
's constructor.ActionBlock
can also work in parallel, if you set itsMaxDegreeOfParallelism
.Use
BlockingCollection
and aTask
with a loop.In the event handler, you would call
Add()
. You would then also create aTask
(usingTask.Run()
orTask.Factory.StartNew()
) with aforeach
loop overGetConsumingEnumerable()
, that process the files.This will work best if there is only a single
Task
, which means it won't be parallel.Create a separate
Task
for each file.In the event handler, you would create a
Task
with a delegate that processes the file.This way, files will be processed in parallel.
-
\$\begingroup\$ Just wanted to let you know that I had a very similar scenario I was working with and after reading your post I got myself an education on TPL Dataflow and successfully implemented the ActionBlock approach you recommended, greatly simplifying and streamlining my code. Thank you! \$\endgroup\$N1njaB0b– N1njaB0b2017年04月24日 21:01:20 +00:00Commented Apr 24, 2017 at 21:01
It looks like CollectionChanged
is a synchronous event. In a limited test app I wrote, I added a task to handle the change in a thread through a ThreadFactory
.
My handle method looked something like this:
void threaded_CollectionChanged(object sender, System.Collections.Specialized.NotifyCollectionChangedEventArgs e) {
factory.StartNew(() => {
object newSender = sender
NotifyCollectionChangedEventArgs newE = e;
CollectionChanged(newSender, newE);
});
}
factory
's declaration: TaskFactory factory = new TaskFactory();
. Always create a new reference to the parameters otherwise threads could receive the wrong ones.
-
\$\begingroup\$ "Always create a new reference to the parameters otherwise threads could receive the wrong ones." Wrong. There are cases where copies like that are necessary (mostly in
foreach
before C# 5), but not here. \$\endgroup\$svick– svick2014年08月07日 19:37:12 +00:00Commented Aug 7, 2014 at 19:37