I am processing files in directories. I want to use multi-threading in two ways:
- 10 threads to process files/folders concurrently
- 10 threads to process all lines in each file concurrently
In this code, FolderProcessor
implements (1), and DocProcessor
implements (2). Is there any flaw in this implementation? My testing seems to show that (2) works and reduces time a lot, but (1) doesn't really reduce the processing time.
class FolderProcessor{
...
void processFolder(String inputPath, String outputPath){
File inputFolder = new File(inputPath);
String[] filenames = inputFolder.list();
ExecutorService pool = Executors.newFixedThreadPool(10);
for (String filename : filenames) {
String filePath = inputPath + filename;
File inputfile = new File(filePath);
if (inputfile.isDirectory()) {
processFolder(filePath, outputPath + filename);
} else {
pool.execute(new Runnable() {
public void run() {
log.info("Start processing " + filePath);
Writer.write(filePath, outputPath);
}
});
}
}
pool.shutdown();
try {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class DocumentWriter{
...
public static void write(String inputFile, String outputFile) {
try {
File test_output = new File(outputFile);
test_output.getParentFile().mkdirs();
test_output.createNewFile();
FileWriter write_test_output = new FileWriter(outputFile);
List<Document> docs = DocPrecessor.processDocs(inputFile);
for (Document doc : docs) {
...
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
class DocPrecessor{
......
public static List<Document> processDocs(String filePath) {
BufferedReader br = null;
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Document> processedDocs = new ArrayList<>();
try {
...
String line = null;
int docNo = 0;
List<Future<Document>> tasks = new ArrayList<>();
while ((line = br.readLine()) != null) {
Callable<Document> callable = new TextThread(line, ++docNo);
tasks.add(pool.submit(callable));
}
for (Future<Document> task : tasks) {
try {
processedDocs.add(task.get());
} catch (InterruptedException e) {
log.error("InterruptedException Failure: " + line);
} catch (ExecutionException e) {
log.error("Thread ExecutionException e: " + line);
e.printStackTrace();
}
}
pool.shutdown();
try {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (br != null)
br.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
return processedDocs;
}
1 Answer 1
FolderProcessor
if (inputfile.isDirectory()) {
processFolder(filePath, outputPath + filename);
}
This is why you're not seeing any benefits from parallelism; you're processing subfolders within the same thread! But even so, you'd be making a new ExecutorService
each invocation; pool
should be a class-level field:
private final ExecutorService pool;
public FolderProcessor(ExecutorService pool){
this.pool = pool;
}
After running some tests, I noticed that your path resolution is broken; the path separator isn't being added for subpaths! You can use all the operating system's rules by switching to Path
s:
String filePath = inputFolder.toPath().resolve(filename).toString();
Some quick cleanup and a change to lambda notation (Java 8 is so much cleaner):
class FolderProcessor{
private static final Logger log = Logger.getGlobal();
private final ExecutorService pool;
public FolderProcessor(ExecutorService pool){
this.pool = pool;
}
void processFolder(String inputPath, String outputPath){
File inputFolder = new File(inputPath);
for (String filename : inputFolder.list()) {
String filePath = inputFolder.toPath().resolve(filename).toString();
String writeTo = new File(outputPath).toPath().resolve(filename).toString();
if (new File(filePath).isDirectory()) {
pool.execute(()->processFolder(filePath, writeTo));
} else {
pool.execute(()->{
log.info("Start processing " + filePath);
DocumentWriter.write(filePath, writeTo);
});
}
}
}
}
Note: My IDE warns me that inputFolder.list()
could throw a NullPointerException
if the File
isn't a directory. While your current usage ensures that it is, you may want to put in a check down the line.
DocumentWriter
Since we're missing the meat of the method (the contents of the for
loop), it's hard to make too many recommendations. I am concerned with the shift in naming convention (Java uses camelCase), and the lack of checking for errors in file creation. Also, you create a file and discard it as soon as you've made your checks, when you could save the FileWriter
some work by passing it along.
After some cleanup:
class DocumentWriter{
private static final Logger log = Logger.getGlobal();
public static void write(String inputPath, String outputPath) {
try {
File outputFile = new File(outputPath);
if(outputFile.getParentFile().mkdirs()){
outputFile.createNewFile();
FileWriter outputWriter = new FileWriter(outputFile);
List<Document> docs = DocProcessor.processDocs(inputPath);
for(Document doc : docs){
// Missing code
}
}else{
// You may want to log this
log.config("File could not be created: " + outputPath);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
DocProcessor
I assumed you wanted the typo fixed. There's not much else to say about this class, since all the logic is either tied up in a class we can't see (TextThread
) or omitted. I will say that if you're running this method in parallel, you're going to be creating an ExecutorService
for each invocation, possibly creating hundreds of threads. A shared ExecutorService
, (could be static, but preferably a final instance variable) would be a better solution.
Other Notes
You probably want to use Executors.newWorkStealingPool(10)
since it keeps the threads active while waiting for I/O operations, which will give you better performance. newFixedThreadPool(10)
tends to work better for CPU-intensive operations as opposed to I/O-intensive operations.
-
\$\begingroup\$ " pool.execute(()->processFolder(filePath, writeTo));" this line doesn't compile. What's the arrow? Also, where is the Runnable? \$\endgroup\$user697911– user6979112017年07月11日 21:45:26 +00:00Commented Jul 11, 2017 at 21:45
-
\$\begingroup\$ It compiles in Java 8, and it's called Lambda Notation. docs.oracle.com/javase/tutorial/java/javaOO/… If your language level is less than 8, you can replace with
pool.execute(new Runnable(){public void run(){processFolder(filePath, writeTo);}});
, although I recommend updating to 8, since 7 was phased out in 2015. java.com/en/download/faq/java_7.xml \$\endgroup\$ndm13– ndm132017年07月11日 21:52:40 +00:00Commented Jul 11, 2017 at 21:52 -
\$\begingroup\$ I am using Java 8. Maybe there is a little syntax error in the code? Please double check. It is complaining the processFolder()'s arguments. \$\endgroup\$user697911– user6979112017年07月11日 21:54:16 +00:00Commented Jul 11, 2017 at 21:54
-
\$\begingroup\$ Yes, it's the typo error. sorry. \$\endgroup\$user697911– user6979112017年07月11日 21:57:17 +00:00Commented Jul 11, 2017 at 21:57
-
\$\begingroup\$ hastebin.com/osiwutiful.java is what I'm using. It compiles and runs fine. \$\endgroup\$ndm13– ndm132017年07月11日 22:07:15 +00:00Commented Jul 11, 2017 at 22:07