I'm trying to a write super simple task runner with asyncio.
Basically, my goal is to run 100 requests to google as tasks, whilst also being be able to handle tasks being added at a later time.
from abc import ABC, abstractmethod
import asyncio
import socket
import time
import aiohttp
import requests
class MyScheduler:
def __init__(self, wait=2):
self.work = []
self.wait = wait
def set_initial_callback(self, callback, **kwargs):
self.initial_callback = callback
self.initial_callback_args = kwargs
async def add_task(self, callback, **kwargs):
task = asyncio.create_task(callback(**kwargs))
self.work.append(task)
async def _run(self):
while self.work:
task = self.work.pop()
await task
if len(self.work) == 0:
await asyncio.sleep(self.wait)
async def set_things_up(self, callback, **kwargs):
task = asyncio.create_task(callback(**kwargs))
self.work.append(task)
await self._run()
def go(self):
asyncio.run(self.set_things_up(self.initial_callback, **self.initial_callback_args))
async def google(n):
if n == 100:
return None
await s.add_task(google, n=n + 1)
async with aiohttp.ClientSession() as session:
async with session.get('http://h...content-available-to-author-only...n.org/get') as resp:
print(resp.status)
t = time.time()
s = MyScheduler(wait=1)
s.set_initial_callback(google, n=1)
s.go()
print(time.time() - t)
I benchmarked this against sequentally running requests, and I did see a massive speed up. It's still super rough, but I'd love some pointers on how I could improve my code in terms of readability/exploiting async stuff better.
1 Answer 1
I actually just started learning asyncio
a couple days ago, so I won't be able to comment too deeply. I do see a few things though:
Disregarding asyncio for a sec, I think google
could be set up better. You have the base case of the recursion as n == 100
, and are incrementing n
in each recursive call. To easily allow the caller to decide how many time to run, I'd reverse how n
is being handled. I would decrement it each call, and set the base case as n <= 0
. With how you have it now, say the caller wanted it to run 1000 times, they would need to call it as
google(-900)
which is a little wonky. I'd change the first bit to:
async def google(n):
if n <= 0:
return None
await s.add_task(google, n=n - 1)
. . .
I'm not sure recursion is the cleanest tool for the job here. I'm also not sure entirely why you're using a job queue or why you're using a elaborate class here unless the goal is to be able to handle jobs being added at a later time.
If your goal is just to initiate many requests and wait on them at the same time, you could just gather
them:
import aiohttp
import asyncio as a
# google no longer cares about how many times it runs
# That is arguably beyond the responsibilities of a function intended to make requests
async def google():
async with aiohttp.ClientSession() as session:
async with session.get('http://h...content-available-to-author-only...n.org/get') as resp:
print(resp.status)
async def start_requests(n_requests: int):
routines = [google() for _ in range(n_requests)] # Create a list of reqeust-making coroutines
await a.gather(*routines) # Unpack the routines into gather (since gather is var-arg)
Also, instead of doing timing using a single attempt and plain subtraction, it would be more accurate to use timeit
:
from timeit import timeit
print("t:", timeit(lambda: a.run(start_requests(10)), number=20)) # number is the amount of tests to do
I'm assuming there's no issue using timeit
for async code.
-
\$\begingroup\$ thanks for the feedback! I should have added, yes the goal is to be able to handle jobs being added at a later time, as it offers more flexibility. :) Do you have any recommendations for how I could implement a graceful shutdown? \$\endgroup\$nz_21– nz_212019年08月29日 08:15:22 +00:00Commented Aug 29, 2019 at 8:15
Explore related questions
See similar questions with these tags.