I am trying to prototype a simple structure for a Web crawler in Java. Until now the prototype is just trying to do the below:
- Initialize a Queue with list of starting URLs
- Take out a URL from Queue and submit to a new Thread
- Do some work and then add that URL to a Set of already visited URLs
For the Queue of starting URLs, I am using a ConcurrentLinkedQueue
for synchronizing.
For Set of already visited URLs, I am using Collections.synchronizedSet(new HashSet<URL>())
To spawn new Threads I am using ExecutorService
.
Please review if the design is optimized and if multi-threading is implemented correctly here.
In the block of code in CrawlerTask
:
synchronized (crawler) {
if (!crawler.getUrlVisited().contains(url)) {
new Scraper().scrape(url);
crawler.addURLToVisited(url);
}
}
I think that the lock on crawler
object will let only 1 Thread proceed at a time (since the object is passed to every Callable
), which does not confirm with the multi-threaded design here.
Starting class for the application:
public class CrawlerApp {
private static Crawler crawler;
public static void main(String[] args) {
crawler = new Crawler();
initializeApp();
startCrawling();
}
private static void startCrawling() {
WorkerManager workers = WorkerManager.getInstance();
while (!crawler.getUrlHorizon().isEmpty()) {
URL url = crawler.getUrlHorizon().poll();
if(!crawler.getUrlVisited().contains(url)){
Future future = workers.getExecutor().submit(new CrawlerTask(url, crawler));
}
}
try {
workers.getExecutor().shutdown();
workers.getExecutor().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void initializeApp() {
Properties config = new Properties();
try {
config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
String[] horizon = config.getProperty("urls").split(",");
for (String link : horizon) {
URL url = new URL();
url.setURL(link);
crawler.getUrlHorizon().add(url);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Crawler.java
which maintains the Queue of URLs and Set of already visited URLs.
public class Crawler {
private volatile ConcurrentLinkedQueue<URL> urlHorizon = new ConcurrentLinkedQueue<URL>();
public void setUrlHorizon(ConcurrentLinkedQueue<URL> urlHorizon) {
this.urlHorizon = urlHorizon;
}
public ConcurrentLinkedQueue<URL> getUrlHorizon() {
return urlHorizon;
}
private volatile Set<URL> urlVisited = Collections.synchronizedSet(new HashSet<URL>());
public void setUrlVisited(Set<URL> urlVisited) {
this.urlVisited = urlVisited;
}
public Set<URL> getUrlVisited() {
return urlVisited;
}
public void addURLToVisited(URL url) {
if (getUrlVisited().contains(url)) {
System.out.println("Duplicate found in already visited:" + url.getURL());
return;
} else {
System.out.println("Adding to visited set:" + url.getURL());
getUrlVisited().add(url);
}
}
}
URL.java
is just a class with private String url
and overriden hashCode()
and equals()
.
Also, Scraper.scrape()
just has dummy implementation until now:
public void scrape(URL url){
System.out.println("Done scrapping:"+url.getURL());
}
WorkerManager
to create Threads:
public class WorkerManager {
private static final Integer WORKER_LIMIT = 10;
private final ExecutorService executor = Executors.newFixedThreadPool(WORKER_LIMIT);
public ExecutorService getExecutor() {
return executor;
}
private static volatile WorkerManager instance = null;
private WorkerManager() {
}
public static WorkerManager getInstance() {
if (instance == null) {
synchronized (WorkerManager.class) {
if (instance == null) {
instance = new WorkerManager();
}
}
}
return instance;
}
public Future createWorker(Callable call) {
return executor.submit(call);
}
}
CrawlerTask
which is run in a separate Thread for every URL:
public class CrawlerTask implements Callable {
public CrawlerTask(URL url, Crawler crawler) {
this.url = url;
this.crawler = crawler;
}
URL url;
Crawler crawler;
private void crawlTask() {
synchronized (crawler) {
if (!crawler.getUrlVisited().contains(url)) {
new Scraper().scrape(url);
crawler.addURLToVisited(url);
}
}
}
@Override
public Object call() throws Exception {
crawlTask();
return null;
}
}
-
1\$\begingroup\$ Take look into ExecutorService. It contains: a queue of task and a set of workers. So, crawlers should be the workers of that design and they should add tasks (urls) to that queue. \$\endgroup\$rdllopes– rdllopes2016年05月30日 09:49:25 +00:00Commented May 30, 2016 at 9:49
-
\$\begingroup\$ @rdllopes can you please elaborate? \$\endgroup\$Anurag– Anurag2016年05月30日 10:04:38 +00:00Commented May 30, 2016 at 10:04
-
\$\begingroup\$ Your urlHorizon (ConcurrentLinkedQueue) is unneeded. There is already a task queue inside your executorService. Just add new tasks to your executorService and the workers will receive them. I am preparing a more elaborated review but you can start working with those ideas. \$\endgroup\$rdllopes– rdllopes2016年05月30日 10:59:34 +00:00Commented May 30, 2016 at 10:59
-
1\$\begingroup\$ Fork and Join would be a better approaching. \$\endgroup\$rdllopes– rdllopes2016年05月30日 11:51:53 +00:00Commented May 30, 2016 at 11:51
-
1\$\begingroup\$ @h.j.k. No I am not \$\endgroup\$Anurag– Anurag2016年05月31日 17:21:50 +00:00Commented May 31, 2016 at 17:21
1 Answer 1
I would simplify your solution a little bit. Some classes are unnecessary.
Sugestion for CrawlerApp
private static final Integer WORKER_LIMIT = 10;
private static final BlockingQueue queue = new LinkedBlockingQueue<Runnable>();
// number of active threads...
private static final AtomicInteger NUMBER_ACTIVE_THREADS= new AtomicInteger(0);
private final static ExecutorService executor = new ThreadPoolExecutor(WORKER_LIMIT, WORKER_LIMIT, 0L,
TimeUnit.MILLISECONDS, queue);
private static Crawler crawler;
public static void main(String[] args) throws InterruptedException {
crawler = new Crawler();
initializeApp();
startCrawling();
}
private static void startCrawling() throws InterruptedException {
while (ATOMIC_INTEGER.intValue() > 0 || !queue.isEmpty()){
Thread.sleep(100);
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
private static void initializeApp() {
Properties config = new Properties();
try {
config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
String[] horizon = config.getProperty("urls").split(",");
for (String link : horizon) {
URL url = new URL(link);
executor.submit(new CrawlerTask(url, crawler, executor, NUMBER_ACTIVE_THREADS)); // don't forget to increase/decrease atomic_integer.
}
} catch (IOException e) {
e.printStackTrace();
}
}
There is a huge contention problem in CrawlerTask. Reduce the synchronized area. Better visit twice a page than block all the execution.
CrawlerTask
private void crawlTask() {
if (crawler.getUrlVisited().contains(url)){
return ;
}
new Scraper().scrape(url);
crawler.addURLToVisited(url);
}
-
\$\begingroup\$ as per your design of using
LinkedBlockingQueue
, what is the use of usingThread.sleep(100)
? Since the threads will wait if the queue is empty. \$\endgroup\$Anurag– Anurag2016年05月30日 14:54:09 +00:00Commented May 30, 2016 at 14:54 -
\$\begingroup\$ I am assuming that each task consist of fetching, parsing a URL and adding outgoing links to queues. NOTE: It should be done using submit method of the executor. Since we don't know when it will be completed, there should be a loop verifying when it happen. It just verify and sleep a small portion of time (100 milliseconds). \$\endgroup\$rdllopes– rdllopes2016年05月30日 15:01:48 +00:00Commented May 30, 2016 at 15:01