python多线程编程记录

2020/11/07 技术

python多线程编程记录

1. 线程锁额外耗时简单量化

多线程之间, 可借助线程锁共享变量. 线程锁的使用, 会给程序带来多大的性能损耗?

测试示例如下:


from enum import Enum
from threading import Thread, Lock
import time


class BusyMode(Enum):
    IO_BUSY = 0
    CPU_BUSY = 1


class AtomicInteger(object):
    __slots__ = ("_value", "_lock")

    def __init__(self, value: int = 0):
        self._value: int = value
        self._lock = Lock()

    def inc(self) -> int:
        with self._lock:
            self._value += 1
            return self._value

    def dec(self) -> int:
        with self._lock:
            self._value -= 1
            return self._value

    def set(self, value: int) -> int:
        with self._lock:
            self._value = value
            return self._value

    @property
    def value(self) -> int:
        with self._lock:
            return self._value


class Worker(Thread):
    def __init__(self, count: AtomicInteger, busy_mode: BusyMode, times: int, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.count = count
        self._busy_mode = busy_mode
        self.times = times

    def run(self) -> None:
        def io_busy():
            time.sleep(1)

        def cpu_busy():
            for count in range(1000000):
                x = count * count

        func = io_busy if self._busy_mode == BusyMode.IO_BUSY else cpu_busy

        for _ in range(self.times):
            if self.count is not None:
                cur_count = self.count.inc()
                # print("current atomic_count: {}".format(cur_count))
            func()


result_list = []
for _busy_mode in [BusyMode.IO_BUSY, BusyMode.CPU_BUSY]:
    for _use_lock in [False, True]:
        if _use_lock:
            _atomic_count = AtomicInteger(0)
        else:
            _atomic_count = None
        print("busy mode {}, use lock {} >>>".format(_busy_mode, _use_lock))
        time_start = time.time()
        threads = []
        for _ in range(20):
            threads.append(Worker(count=_atomic_count, busy_mode=_busy_mode, times=20))

        for worker in threads:
            worker.start()

        for worker in threads:
            worker.join()

        time_stop = time.time()
        log_line = "busy mode {}, use lock {}, time cost {:.3f}s".format(_busy_mode, _use_lock, time_stop - time_start)
        print("{}\n".format(log_line))
        result_list.append(log_line)

print("\n\nresult:\n{}".format("\n".join(result_list)))

执行结果:

busy mode BusyMode.IO_BUSY, use lock False, time cost 20.066s
busy mode BusyMode.IO_BUSY, use lock True, time cost 20.056s
busy mode BusyMode.CPU_BUSY, use lock False, time cost 32.659s
busy mode BusyMode.CPU_BUSY, use lock True, time cost 30.786s

2. 基于多线程, 实现 master-worker 模式

示例:

import os
import subprocess
import sys
import time
import typing
import unittest
import weakref  # noqa
from threading import Thread


def busy_working_by_subprocess(sleep_time: int):
    subprocess_cmd = """sleep {}""".format(int(sleep_time))

    # 环境变量
    base_env = dict(os.environ)
    path_dir = os.path.dirname(__file__)
    python_path = os.path.join(path_dir, "../")
    base_env.update(dict(PYTHONIOENCODING="utf-8", LANG="en_US.UTF-8", PYTHONPATH="{}".format(os.path.abspath(python_path))))
    subprocess_kwargs = dict(shell=True, env=base_env, )
    subprocess.check_output(subprocess_cmd, **subprocess_kwargs)


class TestThread(unittest.TestCase):
    def setUp(self) -> None:
        pass

    def testMasterWorker(self):
        """ """
        workers: typing.Dict[str, Thread] = {}

        for i in range(10):
            sleep_time = 3
            count = 10
            for j in range(count):
                thread_name = "Thread-{}-{}".format(i, j)
                worker = Thread(target=busy_working_by_subprocess, args=(sleep_time,), name=thread_name)
                worker.daemon = True
                worker.start()
                # workers[thread_name] = weakref.proxy(worker)
                #   if thread.is_alive():
                #   ReferenceError: weakly-referenced object no longer exists
                workers[thread_name] = worker
            print("{} thread start".format(count))

            while True:
                all_done = True
                done_count = 0
                for thread_name, thread in workers.items():
                    if thread.is_alive():
                        all_done = False
                    else:
                        done_count += 1

                print(f"{done_count}, {all_done}")
                if all_done:
                    print("all thread done")
                    workers.clear()
                    break
                time.sleep(1)

    def testMasterWorkerMasterExit(self):
        """ """
        workers: typing.Dict[str, Thread] = {}

        for i in range(10):
            sleep_time = 60
            count = 10
            for j in range(count):
                thread_name = "Thread-{}-{}".format(i, j)
                worker = Thread(target=busy_working_by_subprocess, args=(sleep_time,), name=thread_name)
                worker.daemon = True
                worker.start()
                # workers[thread_name] = weakref.proxy(worker)
                #   if thread.is_alive():
                #   ReferenceError: weakly-referenced object no longer exists
                workers[thread_name] = worker
            print("{} thread start".format(count))

            while True:
                all_done = True
                done_count = 0
                for thread_name, thread in workers.items():
                    if thread.is_alive():
                        all_done = False
                    else:
                        done_count += 1

                print(f"{done_count}, {all_done}")
                if all_done:
                    print("all thread done")
                    workers.clear()
                    break
                time.sleep(1)

                # 强制退出: 模拟 master die
                print("force exit: sys.exit(1)")
                sys.exit(1)
                # 强制退出后, master 退出, worker 继续执行

Search

    Table of Contents