Python How to create multiple Threads for async

eye-catch Python

Multi-threading is crucial for an application. If it has only a single thread and it takes a long time to do something and it blocks other tasks. The UI is not reacted in this case.

Let’s learn how to create threads so that the main task can go on.

Sponsored links

Running a task without additional threads

Let’s see the code that we want to make faster. This code simulates downloading files. It takes time to do I/O related job. This code calls sleep function to simulate such a I/O job.

from typing import Callable, List
import time
from threading import Thread
from concurrent.futures import Future, ThreadPoolExecutor
from queue import Queue


def download_file(index: int):
    time.sleep(0.1)
    print(f"download_file ends [{index}]")


NUMBER_OF_FILES = 5


def print_elapsed_timer(action: Callable[[], None]):
    print(f"--- {action.__name__} ---")
    start = time.time()
    action()
    end = time.time()
    elapsed_time = end - start
    print(format(elapsed_time, ".3f"))


def without_thread():
    for index in range(NUMBER_OF_FILES):
        download_file(index)

print_elapsed_timer(without_thread)
# --- without_thread ---
# download_file ends [0]
# download_file ends [1]
# download_file ends [2]
# download_file ends [3]
# download_file ends [4]
# 0.501

The code above downloads a file one by one. It takes 500 ms in total. If the download time is 30 sec for a single download, it takes too long to wait for the completion.

Sponsored links

Creating a new Thread

It’s easy to create a thread and run a task there. Create a new instance of Thread and pass the desired function to target. args is the arguments that will be passed to the function specified to target.

def with_thread1():
    for index in range(NUMBER_OF_FILES):
        worker = Thread(target=download_file, daemon=True, args=[index])
        worker.start()
        worker.join()

print_elapsed_timer(with_thread1)
# --- with_thread1 ---
# download_file ends [0]
# download_file ends [1]
# download_file ends [2]
# download_file ends [3]
# download_file ends [4]
# 0.502

The thread can start by start() method. join() method is used to wait for the completion. Since it is called in the loop, the result is the same as the synchronous version.

join method must be called outside of the loop.

def with_thread2():
    workers: List[Thread] = []
    for index in range(NUMBER_OF_FILES):
        worker = Thread(target=download_file, daemon=True, args=[index])
        workers.append(worker)
        worker.start()

    for worker in workers:
        worker.join()

print_elapsed_timer(with_thread2)
# --- with_thread2 ---
# download_file ends [0]
# download_file ends [1]
# download_file ends [2]
# download_file ends [3]
# download_file ends [4]
# 0.102

The total time is reduced to 1/5 because the next thread is created and the task is started without waiting for the previous task completion.

Using Queue to manage Threads

Let’s try to use Queue instead to manage the Threads. Queue can be used if we want to process one after another in the same order that we push.

Not to block the main thread

It can get an item by get() method if it has at least one item. Then, notify the completion by task_done().

def with_thread3():
    q = Queue(maxsize=3)

    def download_file_with_queue():
        while True:
            item = q.get()
            download_file(item)
            q.task_done()

    worker = Thread(target=download_file_with_queue, daemon=True)
    worker.start()

    for index in range(NUMBER_OF_FILES):
        q.put(index, block=True, timeout=None)
    q.join()

print_elapsed_timer(with_thread3)
# --- with_thread3 ---
# download_file ends [0]
# download_file ends [1]
# download_file ends [2]
# download_file ends [3]
# download_file ends [4]
# 0.502

The items are pushed to the queue without waiting for the task completion but the total time is the same as the synchronous version.

Why? Because there are only two threads there. One is the main thread and another is used for the queue. download_file_with_queue() is running on the thread but it is synchronous work in the function. It downloads a file when it receives a request.

If it’s enough not to block the main thread, this way is one of the solutions.

Using a fixed number of Threads

The previous way can be used not to block the main thread. How can we implement if there are two different tasks and want to process them asynchronously? Use two Queues.

def with_thread4():
    q1 = Queue(maxsize=3)
    q2 = Queue(maxsize=3)

    def download_file_with_queue(q: Queue, num: int):
        while True:
            item = q.get()
            download_file(num + item)
            q.task_done()

    Thread(target=download_file_with_queue, daemon=True, args=[q1, 10]).start()
    Thread(target=download_file_with_queue, daemon=True, args=[q2, 20]).start()

    for index in range(NUMBER_OF_FILES):
        q1.put(index, block=True, timeout=None)
        q2.put(index, block=True, timeout=None)
    q1.join()
    q2.join()

print_elapsed_timer(with_thread4)
# --- with_thread4 ---
# download_file ends [20]
# download_file ends [10]
# download_file ends [21]
# download_file ends [11]
# download_file ends [22]
# download_file ends [12]
# download_file ends [23]
# download_file ends [13]
# download_file ends [24]
# download_file ends [14]
# 0.503

The total time is the same but the total number of tasks is double. Two additional threads are created and both of them are running asynchronous way.

It’s ok if two threads are enough for the task but if we need more threads, this is not maintainable.

Using ThreadPoolExecutor to restrict the number of Threads

It could cause a problem if the application creates as many threads as the number of tasks. It could eat up the resources. We should manage the number of Threads where it’s possible.

Luckily, Python offers a good module ThreadPoolExecutor. We can specify the max threads to the constructor. When submit method is called with the desired function, the function is called on another thread.

from concurrent.futures import ThreadPoolExecutor

def with_thread5():
    executor = ThreadPoolExecutor(max_workers=3)
    for index in range(NUMBER_OF_FILES):
        executor.submit(download_file, index)

    executor.shutdown()

print_elapsed_timer(with_thread5)
# max_workers=1 ->  0.502
# max_workers=2 ->  0.301
# max_workers=3 ->  0.201
# max_workers=4 ->  0.201
# max_workers=5 ->  0.102

The total time changes depending on the number of max workers. We don’t have to control them. Call shutdown() at a place where we want to wait for the result.

Get the result of a function from other threads

If the target function has a return value, we need to somehow get it for the post process. In this case, use Future.

from concurrent.futures import Future, ThreadPoolExecutor

def with_thread6():
    executor = ThreadPoolExecutor(max_workers=30)

    def download_file_with_returned_value(index):
        download_file(index)
        return f"returned value: {index}"

    futures: List[Future] = []
    for index in range(NUMBER_OF_FILES):
        future = executor.submit(download_file_with_returned_value, index)
        futures.append(future)

    executor.shutdown()
    print("finish the process")
    for future in futures:
        print(future.result())

print_elapsed_timer(with_thread6)
# -- with_thread6 ---
# download_file ends [0]
# download_file ends [1]
# download_file ends [2]
# download_file ends [3]
# download_file ends [4]
# finish the process
# returned value: 0
# returned value: 1
# returned value: 2
# returned value: 3
# returned value: 4
# 0.101

The returned value can be used on the main thread.

Comments

Copied title and URL