The problem
When you work with Python interactively (e.g. in an IPython shell or notebook) and run a computationally intensive operation like fitting a machine-learning model that is implemented in a native code, you cannot interrupt the operation since the native code does not return execution control to the Python interpreter until the end of the operation. The problem is not specific to machine learning, although it is typical to run a training process for which you cannot predict the training time. In case it takes longer that you expected, to stop training you need to stop the kernel and thus lose the pre-processed features and other variables stored in the memory, i.e. you cannot interrupt only a particular operation to check a simpler model, which allegedly would be fit faster.
The solution
We propose an Asynchronous Fitter design pattern that runs fitting in a separate process and communicates the results back when they are available. It allows to stop training gracefully by killing the spawned process and then run training of a simpler model. It also allows to train several models simultaneously and work in the IPython notebook during model training. Note that multithreading is probably not an option, since we cannot stop a thread that runs an uncontrolled native code.
Here is a draft implementation:
from multiprocessing import Process, Queue
import time
class AsyncFitter(object):
def __init__(self, model):
self.queue = Queue()
self.model = model
self.proc = None
self.start_time = None
def fit(self, x_train, y_train):
self.terminate()
self.proc = Process(target=AsyncFitter.async_fit_,
args=(self.model, x_train, y_train, self.queue))
self.start_time = time.time()
self.proc.start()
def try_get_result(self):
if self.queue.empty():
return None
return self.queue.get()
def is_alive(self):
return self.proc is not None and self.proc.is_alive()
def terminate(self):
if self.proc is not None and self.proc.is_alive():
self.proc.terminate()
self.proc = None
def time_elapsed(self):
if not self.start_time:
return 0
return time.time() - self.start_time
@staticmethod
def async_fit_(model, x_train, y_train, queue):
model.fit(x_train, y_train)
queue.put(model)
Usage
It is easy to modify a code that uses scikit-learn to adopt the pattern. Here is an example:
import numpy as np
from sklearn.svm import SVC
model = SVC(C = 1e3, kernel='linear')
fitter = AsyncFitter(model)
x_train = np.random.rand(500, 30)
y_train = np.random.randint(0, 2, size=(500,))
fitter.fit(x_train, y_train)
You can check if training is still running by calling fitter.is_alive()
and check the time currently elapsed by calling fitter.time_elapsed()
. Whenever you want, you can terminate()
the process or just train another model that will terminate the previous one. Finally, you can obtain the model by try_get_result()
, which returns None
when training is in progress.
The issues
As far as I understand, the training set is being pickled and copied, which may be a problem if it is large. Is there an easy way to avoid that? Note that training needs only read-only access to the training set.
What happens if someone loses a reference to an AsyncFitter instance that wraps a running process? Is there a way to implement an asynchronous delayed resource cleanup?
2 Answers 2
Two minor notes, your terminate
function should just use the is_alive
test you've already defined.
def is_alive(self):
return self.proc is not None and self.proc.is_alive()
def terminate(self):
if self.is_alive():
self.proc.terminate()
self.proc = None
Also in time_elapsed
you should test if self.start_time is None
. That matches directly to the empty value it begins with and communicates the intent better.
def time_elapsed(self):
if self.start_time is not None:
return 0
-
\$\begingroup\$ Generally in OOP it is not advised to re-use public or protected methods internally since the derived classes may override them in any other way for the sake of their own APIs (I know I break the rule with terminate()). I am not sure though if people in Python care much since about such stuff. :) \$\endgroup\$Roman Shapovalov– Roman Shapovalov2015年10月09日 07:26:48 +00:00Commented Oct 9, 2015 at 7:26
-
\$\begingroup\$ @RomanShapovalov Python generally operates more on trust. It lets the user do stuff if they want to, and it's up to them to take responsibility if they're overwriting a function and then breaking things. But it's up to you whether it's worthwhile here because I hadn't even thought of that concern. \$\endgroup\$SuperBiasedMan– SuperBiasedMan2015年10月09日 09:34:42 +00:00Commented Oct 9, 2015 at 9:34
Usually you know in advance the deadline for the child process to complete. In this case, instead of (constantly) checking elapsed time and testing for is_alive
, you may want to use a Queue.get()
timeout facility, along the lines of:
def try_get_result(self, tmo):
try:
return self.queue.get(True, tmo)
except Queue.Empty:
self.terminate()
return None
-
\$\begingroup\$ This operation is blocking, i.e. you cannot execute other notebook cells while waiting, right? But at least it solves the problem with unexpectedly long training. \$\endgroup\$Roman Shapovalov– Roman Shapovalov2015年10月09日 07:21:51 +00:00Commented Oct 9, 2015 at 7:21
Explore related questions
See similar questions with these tags.