Python/Threading
Python 中的線程用於同時運行多個線程(任務、函數調用)。請注意,這並不意味着它們在不同的 CPU 上執行。如果程序已經使用了 100% 的 CPU 時間,Python 線程將不會讓您的程序運行得更快。在這種情況下,您可能需要研究並行編程。如果您對使用 Python 進行並行編程感興趣,請參閱 此處。
Python 線程用於執行任務需要等待的情況。一個例子是與另一台計算機上託管的服務(如 Web 服務器)交互。線程允許 Python 在等待時執行其他代碼;這可以通過 sleep 函數輕鬆模擬。
Python 3已經停用了thread模塊,並改名為 _thread 模塊。Python 3在_thread 模塊的基礎上開發了更高級的 threading 模塊。threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的其他方法,常用的方法如下:
- threading.current_thread() 返回當前線程的信息
- threading.enumerate() 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
- threading.active_count() 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
直接創建線程
編輯threading.Thread(target=None, name=None, args=(), kwargs={})
target 指要創建的線程的方法名,name 指給此線程命名,命名後可以調用 threading.current_thread().name 方法輸出該線程的名字, args/kwargs 指 target 指向的方法需要傳遞的參數,必須是元組形式,如果只有一個參數,需要以添加逗號。
創建一個線程,打印 1-10 的數字,並在每次打印之間等待一秒鐘:
import threading
import time
def loop1_10():
for i in range(1, 11):
time.sleep(1)
print(i)
threading.Thread(target=loop1_10).start()
類的繼承創建線程
編輯通過直接從 threading.Thread 繼承創建一個新的子類後,必須要重寫其中的 run 方法。實例化後調用 start() 方法啟動新線程,即相當於它調用了線程的 run() 方法。
#!/usr/bin/env python
import threading
import time
class MyThread(threading.Thread):
def run(self): # Default called function with mythread.start()
print("{} started!".format(self.getName())) # "Thread-x started!"
time.sleep(1) # Pretend to work for a second
print("{} finished!".format(self.getName())) # "Thread-x finished!"
def main():
for x in range(4): # Four times...
mythread = MyThread(name = "Thread-{}".format(x)) # ...Instantiate a thread and pass a unique ID to it
mythread.start() # ...Start the thread, run method will be invoked
time.sleep(.9) # ...Wait 0.9 seconds before starting another
if __name__ == '__main__':
main()
輸出為:
Thread-0 started! Thread-1 started! Thread-0 finished! Thread-2 started! Thread-1 finished! Thread-3 started! Thread-2 finished! Thread-3 finished!
守護線程
編輯如果當前python線程是守護線程,那麼意味着這個線程是「不重要」的,「不重要」意味着如果他的主線程結束了但該守護線程沒有運行完,守護線程就會被強制結束。如果線程是非守護線程,那麼父進程只有等到非守護線程運行完畢後才能結束。
只要當前主線程中尚存任何一個非守護線程沒有結束,守護線程就全部工作;只有當最後一個非守護線程結束是,守護線程隨着主線程一同結束工作。
import threading
import time
# 每1秒加1
def job1(num):
while True:
num += 1
print('{} is running >> {}'.format(threading.current_thread().name, num))
time.sleep(1)
# 每2秒加2
def job2(num):
while True:
num += 2
print('{} is running >> {}'.format(threading.current_thread().name, num))
time.sleep(2)
# 线程1,一秒加一
new_job1 = threading.Thread(target=job1, name='Add1', args=(100,))
# 设置为守护线程
new_job1.setDaemon(True)
new_job1.start()
# 线程2,两秒加二
new_job2 = threading.Thread(target=job2, name='Add2', args=(1,))
new_job2.setDaemon(True)
new_job2.start()
# 主线程等待9秒
time.sleep(9)
print('{} Ending'.format(threading.current_thread().name))
隨着輸出 MainThread Ending 後,程序就運行結束了,這表明子線程全為守護線程時,會隨着主線程的結束而強制結束。
線程的阻塞會合
編輯join([timeout])方法會使線程進入等待狀態(阻塞),直到調用join()方法的子線程運行結束。同時也可以通過設置 timeout 參數來設定等待的時間。
線程並發控制
編輯互斥鎖(Lock)
編輯互斥鎖只能加鎖一次然後釋放一次。
import threading
import time
num = 0
lock = threading.Lock()
def job1():
global num
for i in range(1000000):
lock.acquire() # 加锁
num += 1
lock.release() # 释放锁
# 上述代码也可以直接写为
# with lock:
# num += 1
new_job1 = threading.Thread(target=job1, name='Add1')
new_job1.start()
for i in range(1000000):
lock.acquire() # 加锁
num += 2
lock.release() # 释放锁
# 等待线程执行完毕
time.sleep(3)
print('num = {}'.format(num))
遞歸鎖
編輯遞歸鎖(Rlock)允許多層加鎖、釋放鎖:
import threading, time
def run1():
lock.acquire()
print("grab the first part data")
global num
num += 1
lock.release()
return num
def run2():
lock.acquire()
print("grab the second part data")
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(3):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
信號量
編輯import threading
import time
# 设置信号量,即同时执行的线程数为3
lock = threading.BoundedSemaphore(3)
def job1():
lock.acquire()
print('{} is coming, {}'.format(threading.current_thread().name, time.strftime('%H:%M:%S',time.localtime(time.time()))))
time.sleep(3)
lock.release()
for i in range(10):
new_job1 = threading.Thread(target=job1, name='Thread{}'.format(i))
new_job1.start()
事件
編輯threading.Event()類會在全局定義一個Flag,當 Flag=False 時,調用 wait()方法會阻塞所有線程;而當 Flag=True 時,調用 wait() 方法不再阻塞。形象的比喻就是「紅綠燈」:在紅燈時阻塞所有線程,而在綠燈時又會一次性放行所有排隊中的線程。Event類有四個方法:
- set() 將Flag設置為True
- wait() 阻塞所有線程
- clear() 將Flag設置為False
- is_set() 返回bool值,判斷Flag是否為True
線程局部存儲
編輯import threading
# 创建全局ThreadLocal对象
local_data = threading.local()
def add():
# 取出ThreadLocal中的数据
n = local_data.num
local_data.num_add = n + n
def divid():
n = local_data.num
local_data.num_divid = n / 2
def times():
local_data.result = local_data.num_add * local_data.num_divid
def job1(num):
# 将数据存入ThreadLocal中
local_data.num = num
add()
divid()
times()
print('{} result >> {}'.format(threading.current_thread().name, local_data.result))
for i in range(5):
t = threading.Thread(target=job1, args=(i,), name='Thread{}'.format(i))
t.start()
線程池
編輯concurrent.futures.ThreadPoolExecutor
編輯python3新引入的庫concurrent.futures.ThreadPoolExecutor可以並行執行多個線程,適用於 I/O 密集型任務。
concurrent.futures.ThreadPoolExecutor類的常用方法:
- map(func, *iterables, timeout=None, chunksize=1):並行執行一個函數對多個輸入迭代器進行映射。使用 map 方法,有兩個特點:無需提前使用submit方法;返回結果的順序和元素的順序相同,即使子線程先返回也不會獲取結果.
- shutdown(wait=True):停止池的操作,並等待所有提交的任務完成(wait=True)或立即返回(wait=False)。
- submit(fn, *args, **kwargs) 在線程池中提交任務。返回一個 concurrent.futures.Future 對象,用於表示異步操作的結果。
示例:
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
future = executor.submit(task, 2)
print(future.result()) # 输出: 4
concurrent.futures.Future
編輯- concurrent.futures.Future 是一個表示異步操作結果的對象,它提供了一種機制來檢查異步操作是否完成、獲取結果以及處理可能出現的異常。Future 對象是 concurrent.futures 模塊中的核心組件,主要用於線程池和進程池中的異步任務管理。
- cancel():嘗試取消這個任務。如果任務已經完成或已經被取消,返回 False。如果任務尚未執行,則任務會被取消,返回 True。
- cancelled():檢查這個任務是否已經被取消。
- done():檢查任務是否已經完成(無論是成功還是失敗)。
- result(timeout=None): 輸出對應的線程運行後方法的返回結果,如果線程還在運行,那麼其會一直阻塞在那裡,直到該線程運行完,當然,也可以設置
- result(timeout),即如果調用還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。
- exception(timeout=None):獲取任務拋出的異常。如果任務成功完成,則返回 None。如果任務拋出異常,exception() 方法會返回該異常。如果指定了 timeout 參數並且任務還未完成,則會阻塞至超時或任務完成。
- add_done_callback(fn):在任務完成時調用 fn 回調函數。fn 必須是一個可調用對象,接收一個 Future 對象作為參數示例:
- remove_done_callback(fn):從回調函數列表中移除 fn。只有在回調函數還沒有執行時,這個方法才有效
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 2)
print(future.result()) # 输出: 4
as_completed
編輯concurrent.futures.as_completed(fs, timeout=None)
返回一個包含fs所指定的Future實例的迭代器。在沒有任務完成的時候,會一直阻塞;如果設置了 timeout 參數,timeout 秒之後結果仍不可用,則返回的迭代器將引發concurrent.futures.TimeoutError。如果timeout未指定或為 None,則不限制等待時間。當有某個任務完成的時候,該方法會 yield 這個任務,就能執行 for循環體的語句,然後繼續阻塞住,循環到所有的任務結束。先完成的任務會先返回給主線程。
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def add(a, b):
time.sleep(3)
return a + b
task = []
list_a = [1, 2, 3, 4, 5]
list_b = [6, 7, 8, 9, 10]
with ThreadPoolExecutor(2) as pool:
for i in range(len(list_a)):
task.append(pool.submit(add, list_a[i], list_b[i]))
# 使用as_completed遍历
for i in as_completed(task):
print('result = {}'.format(i.result()))
該方法與第一種的直接遍歷所具有的優勢是,不需要等待所有線程全部返回,而是每返回一個子線程就能夠處理,上面的result方法會阻塞後面的線程。
wait方法
編輯wait(fs, timeout=None, return_when=ALL_COMPLETED)
fs 為指定的 Future 實例,timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為 None ,則不限制等待時間。return_when 指定此函數應在何時返回必須為以下常數之一:
- FIRST_COMPLETED 等待第一個線程結束時返回,即結束等待
- FIRST_EXCEPTION 函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當於 ALL_COMPLETED
- ALL_COMPLETED 函數將在所有可等待對象結束或取消時返回