Skip to main content

Async request in Python : Do async I/O on a blocking object

After the introduction of the send method of generators in Python 3.2, the single-threaded coroutiue based co-operative multitasking was made possible. Before 3.5, one could create a coroutine by the asyncio.coroutine (or types.coroutine) decorator and pause a coroutine when doing IO by using yield from which basically a syntactic sugar for yield-ing values from another iterator.

In Python 3.5, the async-await statements were introduced to support the Async operations natively. async replaces the need for an explicit decorator to create a coroutine, and await is just a syntactic sugar for yield from.

Concurrent I/O is a huge performance boost for I/O based programs as multiple threads can't help because of the GIL (Global Interpreter Lock), and spawning multiple processes on a I/O based program seems very costly.

To do concurrent I/O the program needs to be written as such, not all programs are written with that in mind but we can make them Async with the cost of spwaning threads to run them which I'll show below.

To do this, we're gonna use the requests library to get HTTP resources, which is synchronous i.e. doesn't support Async I/O by default.

Let's see the synchronous operation first with the timings.

I'm gonna use Python 3.7 here, but the examples should work on 3.6 as well (and 3.5 if we replace f-strings with regular string formatting).

We start with a list of URLs we're gonna send GET request to:

urls = [
    'https://heemayl.net',
    'https://dealiable.com',
    'https://example.net',
    'https://www.w3.org',
    'https://httpbin.org'
]

Now, let's define a simple function to send GET requests to a list of URLs, and track the timings:

def requests_sync(urls):
    for url in urls:
        print(f'Start: {url}: {time.time()}')
        response = requests.get(url)
        print(f'End: {url}: {response.status_code}: {time.time()}')
    return None

Let's run it now:

>>> request_sync(urls)
Start: https://heemayl.net: 1552823647.6991084
End: https://heemayl.net: 200: 1552823649.6029341
Start: https://dealiable.com: 1552823649.6030045
End: https://dealiable.com: 200: 1552823650.8961256
Start: https://example.net: 1552823650.896201
End: https://example.net: 200: 1552823652.9742491
Start: https://www.w3.org: 1552823652.9743247
End: https://www.w3.org: 200: 1552823655.355746
Start: https://httpbin.org: 1552823655.355821
End: https://httpbin.org: 200: 1552823657.973544

Findings:

  • Each iterated URL is requests.get-ed sequentially
  • Once we get the response back from one, we move onto the next
  • As requests.get is blocking on network I/O, the program seems to be stalled during the blocked time

The above is not a performant solution as one can imagine, as that does not use network efficiently, and spends most of the time blocking on network I/O. Far better solution would be to able to send the requests parallelly (one after another), without waiting for the previous one to get the response. In this manner we have the following advantages:

  • The network is properly used
  • The scheduler can do other tasks while waiting on I/O
  • No extra overhead (overhead is the context switching of coroutines/tasks)

To make the operations Async, we need to leverage individual threads (as a penalty) to run the requests.get functions.

Let's define a thread pool executor first where we'll send the requests.get functions to run:

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

We also need the event loop that will orchestrate the coroutines/tasks/futures:

loop = asyncio.get_event_loop()

Now we get to the base of the idea, where we'll define a async function that will return the await-ed value of the requests.get function run on a URL, and the whole thing is driven by the input event loop after sending the function in a thread pool executor. Phew! Let's see the function:

async def individual_request(loop, executor, url):
    print(f'Start: {url}: {time.time()}')
    response = await loop.run_in_executor(executor, requests.get, url)
    print(f'End: {url}: {response.status_code}: {time.time()}')
    return response

So all of the above is basically done here:

await loop.run_in_executor(executor, request.get, url)

Okay! Now we define a async function that will gather the individual_request(url) coroutines for the input URLs, and run them concurrently:

async def main(urls):
    # Pass in the loop, and executor to `individual_request`
    coroutines = [individual_request(loop, executor, url) for url in urls]
    # `asyncio.gather` takes the coroutines/futures as arguments,
    # schedule them in the event loop, and aggregates the return
    # values as a future which results in a list eventually
    results = await asyncio.gather(*coroutines)
    return results

Now, let's run this in the event loop, and see the timings:

>>> loop.run_until_complete(main(urls))
Start: https://heemayl.net: 1552833809.0163138
Start: https://dealiable.com: 1552833809.0164335
Start: https://example.net: 1552833809.016731
Start: https://www.w3.org: 1552833809.017046
Start: https://httpbin.org: 1552833809.0171287
End: https://dealiable.com: 200: 1552833809.9785948
End: https://example.net: 200: 1552833810.5301158
End: https://httpbin.org: 200: 1552833810.7631378
End: https://www.w3.org: 200: 1552833810.8200846
End: https://heemayl.net: 200: 1552833811.72682

As we can see, all requests.get were fired off concurrently, and the responses came back in an overlapped manner. We can also schedule some computational tasks when the coroutines are waiting on I/O as oen can imagine.

Just to note, here's the content of results list:

[
    <Response [200]>,
    <Response [200]>,
    <Response [200]>,
    <Response [200]>,
    <Response [200]>
]

Finally, I would suggest to go through the documentations of the modules and functions used here to get a better idea:

  1. https://docs.python.org/3/library/asyncio.html
  2. https://docs.python.org/3/library/concurrent.futures.html

Comments

Comments powered by Disqus