I am working on a multi-threaded webcrawling program in Java. Each WebCrawler
starts at a root page and repeatedly extracts new links and writes them to a database. I have included most of the code below:
public class WebCrawler implements Runnable {
private ConnectionPool connectionPool;
private ExecutorService executor;
private WebPage rootPage;
private int numThreads;
private List<WebPage> currentPageQueue = new ArrayList<WebPage>();
private Set<WebPage> nextPageQueue = Collections.synchronizedSet(new HashSet<>());
private Consumer<String> callback;
private String tableName;
public WebCrawler(int numThreads, URL rootURL, String tableName, String dbUrl, String userName, String password) {
this.numThreads = numThreads;
this.rootPage = new WebPage(rootURL, this);
this.tableName = tableName;
this.connectionPool = new ConnectionPool(dbUrl, userName, password, numThreads);
this.executor = Executors.newFixedThreadPool(numThreads);
}
public void run() {
currentPageQueue.add(rootPage);
try {
while (!Thread.interrupted()) {
for (WebPage currentLink : currentPageQueue) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
executor.execute(currentLink);
}
// Finish crawling all WebPages on this level
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
currentPageQueue.clear();
currentPageQueue.addAll(nextPageQueue);
if (executor.isTerminated()) {
reinitializeExecutor();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
quit();
}
}
public void addSublinks(WebPage page, Set<WebPage> sublinks) throws SQLException {
this.callback.accept(page.getURL());
// Should fix this to account for the possibility that the queue
// runs out of space. However, leave for now since this is unlikely.
this.nextPageQueue.addAll(sublinks);
Connection conn;
try {
conn = connectionPool.getConnection();
} catch (InterruptedException e2) {
return;
}
PreparedStatement stmt = null;
synchronized (conn) {
String Sql = "INSERT INTO " + tableName + " (HASH, URL, VISITED) VALUES " + "(?, ?, false);";
try {
conn.setAutoCommit(false);
stmt = conn.prepareStatement(Sql);
for (WebPage link : sublinks) {
stmt.setInt(1, link.hashCode());
stmt.setString(2, link.getURL());
stmt.execute();
conn.commit();
}
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
} finally {
if (stmt != null) {
stmt.close();
}
conn.setAutoCommit(true);
}
}
connectionPool.returnConnection(conn);
}
public void reinitializeExecutor() {
this.executor = Executors.newFixedThreadPool(this.numThreads);
}
private void quit() {
// TODO: Other cleanup tasks that I forgot?
List<Runnable> notExecuted = executor.shutdownNow();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
if (!executor.isTerminated()) {
// Do something here?
}
connectionPool.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
public void registerCallback(Consumer callback) {
this.callback = callback;
}
}
The GUI:
public class CrawlerGUI extends JFrame {
private WebCrawler[] crawlers;
private Thread[] threads;
private JTextArea textArea;
private final int buttonWidth = 70;
private final int buttonHeight = 30;
private Consumer callback;
public CrawlerGUI(Thread[] threads, WebCrawler[] crawlers) {
this.crawlers = crawlers;
this.threads = threads;
this.callback = new TextUpdater();
for (WebCrawler crawler: this.crawlers){
crawler.registerCallback(callback);
}
initialize();
}
private void initialize() {
JPanel panel = new JPanel();
panel.setLayout(new BorderLayout());
JButton button = new JButton("Quit");
button.setPreferredSize(new Dimension(buttonWidth, buttonHeight));
button.setBorder(new LineBorder(Color.BLACK));
button.setOpaque(true);
textArea = new JTextArea();
textArea.setColumns(20);
textArea.setLineWrap(true);
textArea.setRows(5);
textArea.setWrapStyleWord(true);
textArea.setEditable(false);
JScrollPane jScrollPane = new JScrollPane(textArea);
jScrollPane.setVerticalScrollBarPolicy(ScrollPaneConstants.VERTICAL_SCROLLBAR_ALWAYS);
panel.add(jScrollPane, BorderLayout.CENTER);
panel.add(button, BorderLayout.SOUTH);
addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent windowEvent) {
signalCrawler();
}
});
button.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent event) {
signalCrawler();
}
});
this.getContentPane().add(panel);
this.setBounds(100, 100, 400, 400);
this.setLocationByPlatform(true);
this.setLocationRelativeTo(null);
this.setVisible(true);
}
private void signalCrawler() {
// TODO: Perhaps there are other cleanup tasks I need to consider?
for (Thread crawler : threads) {
crawler.interrupt();
}
this.dispose();
}
private class TextUpdater implements Consumer<String> {
@Override
public void accept(String myLine) {
SwingUtilities.invokeLater(new Runnable() {
public void run() {
textArea.append(myLine+"\n");
}
});
}
}
}
Main:
public class Main {
public static final String userName = "root";
public static final String password = "";
public static final String database = "mariadb";
public static final String serverName = "localhost";
public static final String dbName = "webcrawler";
public static final int portNumber = 3306;
public static final int numThreads = 10;
public static String tableName;
public static void main(String[] args) throws InterruptedException {
EventQueue.invokeLater(new Runnable() {
public void run() {
Object[] options = { "Resume Crawl", "New Crawl" };
JFrame dilogParentFrame = new JFrame();
int optionNum = JOptionPane.showOptionDialog(dilogParentFrame, null, "Resume/Start Crawl",
JOptionPane.YES_NO_CANCEL_OPTION, JOptionPane.PLAIN_MESSAGE, null, options, options[1]);
if (optionNum == JOptionPane.CLOSED_OPTION) {
dilogParentFrame.dispose();
return;
}
boolean isNewCrawl = (optionNum == 1);
if (isNewCrawl) {
tableName = JOptionPane.showInputDialog("Enter the table name");
} else {
// TODO: will implement resuming in the future
}
String dbUrl = "jdbc:" + database + "://" + serverName + ":" + portNumber + "/" + dbName;
// TODO: will need to take root URLs as input from user.
// However, at this stage hardcoding the URLs makes testing easier.
String[] urls = new String[] { "http://www.bmw.ca/en/home.html", "http://www.apple.com/ca/",
"http://www.javaworld.com/" };
WebCrawler[] crawlers = new WebCrawler[urls.length];
Thread[] threads = new Thread[urls.length];
for (int i = 0; i < crawlers.length; i++){
try {
crawlers[i] = new WebCrawler(numThreads, new URL(urls[i]), tableName, dbUrl, userName, password);
threads[i] = new Thread(crawlers[i]);
} catch (MalformedURLException e) {
// TODO Better handling for malformed root urls;
}
}
CrawlerGUI gui = new CrawlerGUI(threads, crawlers);
for (Thread t: threads){
t.start();
}
}
});
}
}
Finally, ConnectionPool
(I believe this class should be threadsafe, but I will post it just in case):
public class ConnectionPool {
private String databaseUrl;
private String userName;
private String password;
private int poolSize;
private final BlockingQueue<Connection> connectionPool;
public ConnectionPool(String databaseUrl, String userName, String password, int poolSize) {
this.databaseUrl = databaseUrl; // Should be of the form
// "jdbc:mysql://localhost:3306/myDatabase";
this.userName = userName;
this.password = password;
this.poolSize = poolSize;
connectionPool = new ArrayBlockingQueue<>(this.poolSize);
initialize();
}
private void initialize() {
while (connectionPool.size() < this.poolSize) {
connectionPool.add(createConnection());
}
}
private Connection createConnection() {
Connection conn = null;
Properties connectionProps = new Properties();
connectionProps.put("user", this.userName);
connectionProps.put("password", this.password);
try {
conn = DriverManager.getConnection(this.databaseUrl, connectionProps);
} catch (SQLException e) {
System.out.println(e);
}
return conn;
}
public Connection getConnection() throws InterruptedException {
return connectionPool.take();
}
public void returnConnection(Connection connection) {
// We should not expect to see IllegalStateException
connectionPool.add(connection);
}
public void close() throws SQLException {
synchronized (connectionPool) {
if (!(connectionPool.size() == this.poolSize)) {
throw new IllegalStateException("Attempting to close with " + (this.poolSize - connectionPool.size())
+ " connections unreturned.");
}
for (Connection c : connectionPool) {
c.close();
}
}
}
}
The exact code for WebPage
is not as relevant. Basically, it is a class which implements Runnable
, and its run()
method establishes a connection to the URL that it is given, reads the HTML from the page, and extracts all sublinks. Then, the sublinks are then returned as a Set
via the addSublinks
method in WebCrawler
.
There are a couple main concerns that I have, and I would appreciate any feedback or advice:
Thread Safety/Synchronization: Is my program threadsafe? If not, please point out any areas where there could potentially be issues, and if possible give an example of how something could go wrong. Alternatively, if there are any places where I do not actually need to be synchronizing, please let me know as well.
Gracefully terminating: Currently, I am using interrupts to tell the
WebCrawler
threads to end what they are doing. However, this is causing some issues which are most likely due to the fact that I am probably not handling the interrupts ideally. For example, I get anInterruptedException
in thequit()
method ofWebCrawler
, which is unexpected sincequit()
is only called once the thread has already been interrupted, and I don't see how it would be interrupted twice. Also, the program seems to be very slow to stop (If I do not dispose the window after interrupting each thread, I can see more links being displayed in the window for at least a couple seconds after pressing the "Quit" button). Please give a few pointers in the right direction if possible, or perhaps suggest an entirely different approach to terminating.Database access: I am considering an alternate producer-consumer based approach to writing sublinks to the database, which would involve a
BlockingQueue
thatWebPages
could use to enqueue their sublinks. A separate consumer thread would then be responsible for taking items off of this queue and writing them to the database. Essentially, this would redirect all interactions with the database through one gateway. Would there be any benefits to this approach, and would it be better than what I am doing now?
In addition, any tips on structure/decoupling, or pointing out any important errors in general would be very much appreciated.
Also, one small issue I encountered in the GUI code was that in order to be able to interrupt the WebCrawler
threads and also be able to submit the callback method for displaying to the GUI to each WebCrawler
object, I had to take both an array of threads and an array of WebCrawlers
, which seems a little redundant. Is there any way that I could avoid this?
1 Answer 1
I'm going to propose a very different design. That's always exciting without any real requirements to work off of, but I think it should be both simpler to work with and more flexible moving forward.
The basic ideas are to (1) simplify your threading to a single pool, and (2) separate concerns to enhance flexibility.
Using a single WebCrawler
which wraps a single thread pool really cleans up the threading. Your client can kick things off by creating the WebCrawler
and passing in an initial set of PageCrawler
(runnable) instances. Those will spawn new PageCrawler
s as they execute.
I also propose a StatusListener
interface to be passed into each PageCrawler
. The listeners get notified whenever anything interesting happens. You can add one for your database, one for your GUI, and you can add/modify/remove them later without touching the main crawling code. They also get created by your client, so the crawlers don't have to know about database/GUI/whatever.
You will probably need a wrapper for URL
that contains the hash code you're writing to the database. That wrapper should be passed into the listeners instead of the child URLs.
I also strongly suggest you use a connection pool library instead of hand-writing one. A good, robust connection pool is a hard thing to do. You have more than one problem in your implementation. For instance, you can grow the connection pool by passing in a connection you didn't get from the pool, and the pool doesn't make sure the returned connection is actually available - if a client closes the connection, future clients will get a closed connection they can't do anything with.
As far as tying together the Client, GUI, and WebCrawler, look at the MVC and MVP patterns.
Webcrawler
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public final class WebCrawler {
private final ExecutorService executor;
public WebCrawler(final int threadCount) {
this.executor = Executors.newFixedThreadPool(threadCount);
}
public void addPageCrawlers(final PageCrawler... pageCrawlers) {
for (final PageCrawler pageCrawler : pageCrawlers) {
this.executor.execute(pageCrawler);
}
}
public void addPageCrawlers(final Collection<PageCrawler> pageCrawlers) {
for (final PageCrawler pageCrawler : pageCrawlers) {
this.executor.execute(pageCrawler);
}
}
public void halt() {
try {
this.executor.awaitTermination(5, TimeUnit.SECONDS);
if (!this.executor.isTerminated()) {
System.err.println("Timed out waiting for executor to terminate cleanly. Shutting down.");
this.executor.shutdownNow();
}
} catch (final InterruptedException e) {
System.err.println("Interrupted while waiting for executor shutdown.");
Thread.currentThread().interrupt();
}
}
}
PageCrawler
(replaces WebPage)
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
public class PageCrawler implements Runnable {
private final WebCrawler webCrawler;
private final URL url;
private final List<StatusListener> statusListeners;
public PageCrawler(final WebCrawler webCrawler, final URL url, final List<StatusListener> statusListeners) {
this.webCrawler = webCrawler;
this.url = url;
this.statusListeners = new ArrayList<>(statusListeners);
}
@Override
public void run() {
throw new UnsupportedOperationException("Implement me!");
// Look at the URL
// Find all child URLs on that page
// notify all status listeners
// Add a new PageCrawler to the WebCrawler for each child URL.
}
}
StatusListener
import java.net.URL;
public interface StatusListener {
public void linksFound(final URL parent, final URL children);
public void linkVisited(final URL link);
}
DatabaseStatusListener
import java.net.URL;
import com.codereview.webcrawler.ConnectionPool;
public final class DatabaseStatusListener implements StatusListener {
private final ConnectionPool connectionPool;
private final String tableName;
public DatabaseStatusListener(final ConnectionPool connectionPool, final String tableName) {
this.connectionPool = connectionPool;
this.tableName = tableName;
}
@Override
public void linksFound(final URL parent, final URL children) {
throw new UnsupportedOperationException("Implement me!");
}
@Override
public void linkVisited(final URL link) {
throw new UnsupportedOperationException("Implement me!");
}
}
GUIStatusListener
import java.net.URL;
import javax.swing.JTextArea;
public final class GUIStatusListener implements StatusListener {
private final JTextArea textArea;
public GUIStatusListener(final JTextArea textArea) {
this.textArea = textArea;
}
@Override
public void linksFound(final URL parent, final URL children) {
throw new UnsupportedOperationException("Implement me!");
}
@Override
public void linkVisited(final URL link) {
throw new UnsupportedOperationException("Implement me!");
}
}
-
\$\begingroup\$ Thank you for the suggestions. If for some reasons that I will not specify here, I am unable/would prefer not to use third party libraries, is my Connection pool implementation sufficient enough, in that if used properly, there should at least be no thread safety issues? Also, how would I incorporate MVC into this design? It looks like updating the textArea is already taken care of in the GUIStatusListener. Thanks \$\endgroup\$b_pcakes– b_pcakes2015年11月19日 22:52:35 +00:00Commented Nov 19, 2015 at 22:52
-
\$\begingroup\$ @SimonZhu Your
ConnectionPool
is adequate if it is used exactly the way it is being used now. Really, though, use one of the many small, free libraries out there to handle it for you.MVC
/MVP
would inform the construction of and use of theWebCrawler
API by the UI. You need something to build a WebCrawler, build the listeners, and start/stop the WebCrawler. \$\endgroup\$Eric Stein– Eric Stein2015年11月20日 01:06:27 +00:00Commented Nov 20, 2015 at 1:06 -
\$\begingroup\$ I have taken your advice and changed my design, but there is one issue I've come across. I would appreciate if you could see the edit to my post and give any suggestions. \$\endgroup\$b_pcakes– b_pcakes2015年11月20日 01:27:31 +00:00Commented Nov 20, 2015 at 1:27
-
\$\begingroup\$ @SimonZhu Use some thread safe collection (either a
ConcurrentHashMap
or synchronized access to aSet
) inWebCrawler
to track what URLs have been added to the crawler through theaddPageCrawler()
method. That may mean it makes sense to rejigger the method so it takes a URL and makes the new page crawler itself. \$\endgroup\$Eric Stein– Eric Stein2015年11月20日 13:59:40 +00:00Commented Nov 20, 2015 at 13:59