I have a python flask app that waits for requests from user app and than spawns a process with job based on the request it receives. It keeps the status and queue of the jobs in memory. The requests to this service will always have this pattern:
- Submit job
- (optional) upload additional data
- check every 5 seconds if the job is finished
- download results
- delete job data
I have simply created lists and dicts for the queue, running and finished jobs. The logic is that first submit request is called with information if it needs to wait for additional data or not. If no data is needed, it will put it in the queue or spawn a process if there is any CPU free.
If additional data was needed, it will be put into separate queue and once the request with additional data is completed, it will spawn the job (or put it into queue). Since every user app that has submitted a submit request will check every few seconds if their job is done, I have used this to check if any job is done and move it to finished and spawn another job from the queue.
Once the check request returns that it is done, the user app will download the results and than call for termination of the job data.
What surprised me, was that the flask app would spawn together with each thread if it is not protected by if __name__ == '__main__':
even when the spawned job was in different file and is not referencing anything from the flask app file.
Since I'm relatively new to flask and multiprocessing, is there anything else I should be worrying about?
This is the code:
import os
import uuid
from os.path import isfile, isdir
from flask import Flask, request, Response, send_from_directory
from multiprocessing import Process
from werkzeug.utils import secure_filename
from helpers import str2bool
from temporary_test_task import run_task
if __name__ == '__main__':
app = Flask(__name__)
running = {}
finished = []
queue = []
waiting_for_data = {}
queue_list = set()
@app.route('/status', methods=['GET', 'POST'])
def status():
return "OK", 200
def finish_job(job_id):
finished.append(job_id)
last = running.pop(job_id)
last.close()
if len(queue) > 0:
next_job = queue.pop()
queue_list.remove(next_job[0])
start_job(next_job)
def start_job(job=None):
if job is None:
job = queue.pop()
queue_list.remove(job[0])
task_cb = Process(target=run_task, args=(job[0], job[1]))
task_cb.start()
print('started thread')
running[job[0]] = task_cb
def remove_finished():
for j in list(running.keys()):
if not running[j].is_alive():
finish_job(j)
@app.route("/Simulation", methods=['POST'])
def submit_job():
# create id
job_id = str(uuid.uuid4())
job_data = request.data.decode('utf-8')
# check if waiting for data
if str2bool(request.headers.get('UseMlData', False)):
waiting_for_data[str(job_id)] = job_data
status = 'WAITING_FOR_DATA'
else:
status = submit_job_local(job_id, job_data)
return status, 200
def submit_job_local(job_id, job_data):
# check not too many processing jobs
if len(running) >= config.threads:
queue.append((job_id, job_data))
queue_list.add(job_id)
status = 'QUEUED'
else:
start_job((job_id, job_data))
status = 'RUNNING'
return status
@app.route("/Simulation/<uuid:job_id>", methods=['GET'])
def check_status(job_id: uuid):
job_id = str(job_id)
remove_finished()
if job_id in running:
r = 'RUNNING'
elif job_id in queue_list:
r = 'QUEUED'
elif job_id in finished:
r = 'COMPLETED'
else:
r = 'FAILED'
return r, 200
@app.route('/Simulation/<uuid:job_id>/UploadData', methods=['POST'])
def upload_file(job_id):
job_id = str(job_id)
if job_id not in waiting_for_data:
return 'uuid not in waiting for data mode', 400
number_of_files = 0
base_path = os.path.join('uploadedData', job_id)
if not os.path.exists(base_path):
os.makedirs(base_path)
for file in request.files.values():
if file.filename == '':
return 'no file name', 400
if file:
filename = secure_filename(file.filename)
path = os.path.join(base_path, filename)
file.save(path)
number_of_files += 1
submit_job_local(job_id, waiting_for_data.pop(job_id))
return str(number_of_files) + ' files uploaded', 200
@app.route('/Simulation/<uuid:job_id>/Results', methods=['GET'])
def download_results(job_id):
if str(job_id) not in finished:
return 'job id not found', 404
base_path = os.path.join('results', str(job_id))
file_path = os.path.join(base_path, 'result.xml')
if not isfile(file_path):
return 'file not found', 404
return send_from_directory(base_path, 'result.xml')
app.run()
-
\$\begingroup\$ Any reason for not using something like celery to do the processing? \$\endgroup\$hjpotter92– hjpotter922020年11月04日 09:36:07 +00:00Commented Nov 4, 2020 at 9:36
-
\$\begingroup\$ I have never worked with that and from the documentation I have read it looked way too complicated for such a simple task. I also wanted to keep it as simple as possible as it must be maintainable by people not so proficient in python. \$\endgroup\$Ondrej– Ondrej2020年11月05日 11:52:23 +00:00Commented Nov 5, 2020 at 11:52
1 Answer 1
Main guards
You have a kind of anti-guard for __main__
. The purpose of such a guard is to define - but not run - symbols like constants, classes and functions if someone wants to import them. Due to your additional indentation, you've actually prevented this altogether.
To fix this, move your if __name__ == '__main__':
block to the bottom of your file and de-indent everything else.
Globals
Due to the way that Flask declarations work, app
should stay as a global, as it is now. running
, etc. should not be.
If you make finish_job
(for instance) a method on a class that has members for finished
, running
, etc., that's one relatively easy way to get to something that's more easily testable, modular and re-entrant.
RESTful responses
About
@app.route('/status', methods=['GET', 'POST'])
def status():
return "OK", 200
it's unusual to return 200 for a POST
. Why does status
accept POST
at all? POST
usually implies the creation of a new resource.
For submit_job
, it's good that it accepts POST
, but it still shouldn't return 200 OK
- it should probably return 201 CREATED
.
String interpolation
str(number_of_files) + ' files uploaded'
can be
f'{number_of_files} files uploaded'
Is it your fault or mine?
This:
if str(job_id) not in finished:
return 'job id not found', 404
makes sense. The job is not in the list of finished jobs, so kick out the request with a 404. This:
if not isfile(file_path):
return 'file not found', 404
is a little more dubious. If the job is in the list of finished jobs, but is not on the filesystem, is the error really the client's fault? I would sooner guess that this is an erroneous server state, i.e. a 500.
Explore related questions
See similar questions with these tags.