目录
- Python 中多线程主要有以下几种类型的锁:
- threading.Lock():
- threading.RLock()
- threading.Semaphore():
- threading.BoundedSemaphore:
- threading.Condition:
- threading.Barrier:
Python 中多线程主要有以下几种类型的锁:
注意,在Python中,由于全局解释器锁(GIL)的存在,同一时刻只允许一个线程执行Python字节码,因此Python的多线程并不能实现真正的并行计算。如果需要进行并行计算,可以使用multiprocessing模块,或者使用其他的并行计算框架,如concurrent.futures。
threading.Lock():
线程锁,可用于同步多个线程对共享资源的访问。
错误示范:
存在一个问题,就是在每次循环的时候都创建了一个新的锁。这样的话,每个锁都是独立的,它们之间不能保证对共享资源 counter 的互斥访问,因此在多线程环境下,counter += 1 操作可能会出现竞态条件,导致结果不是预期的 10000000。
import threading
counter = 0 # 共享资源
def increment():
global counter
for _ in range(1000000):
with threading.Lock(): # 获取线程锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 输出 10000000
正确示例
import threading
counter = 0 # 共享资源
lock = threading.Lock() # 创建一个共享的锁
def increment():
global counter
for _ in range(1000000):
with lock: # 获取线程锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 输出应该是10000000
threading.RLock()
可重入锁,是一种特殊的线程锁,允许同一线程多次获得锁。
import threading
# 创建一个RLock对象
lock = threading.RLock()
def recursive_function(level):
with lock:
if level > 0:
print(\’Entering level\’, level)
recursive_function(level – 1)
print(\’Exiting level\’, level)
else:
print(\’Base case\’)
# 在主线程中运行递归函数
recursive_function(5)
在这个例子中,recursive_function 是一个递归函数,它会在每个递归级别上获取一次锁。由于我们使用的是 threading.RLock,同一线程可以多次获取锁,所以这个代码能够正常运行。
但是如果我们使用 threading.Lock 替代 threading.RLock,那么在第二次尝试获取锁时,线程将被阻塞,因为 threading.Lock 不允许同一线程多次获取锁。这会导致死锁,程序将无法继续运行。
threading.RLock(Reentrant Lock)在Python中是一种可重入锁,也就是说,它允许同一线程在没有释放其所有权的情况下多次获取同一个锁。这在某些情况下是非常有用的,例如在递归函数或者嵌套调用中。
以下是一些具体的应用场景:
需要注意的是,虽然 threading.RLock 在某些情况下非常有用,但是在大多数情况下,你仍然应该使用更简单的 threading.Lock。因为过度使用 threading.RLock 可能会使你的代码更复杂,更难以理解和维护。而且,不正确的使用 threading.RLock 可能会导致死锁。
threading.Semaphore():
信号量,用于限制同时访问某一资源的线程数量。
模拟一个有限资源池(例如数据库连接池),限制同时访问资源的线程数量。
import threading
import time
# 定义一个有限资源池
RESOURCE_POOL_SIZE = 3
semaphore = threading.Semaphore(RESOURCE_POOL_SIZE)
def access_resource(thread_id):
print(f\”Thread {thread_id} is requesting access to the resource pool.\”)
with semaphore:
print(f\”Thread {thread_id} has acquired access to the resource pool.\”)
time.sleep(1) # 模拟资源使用
print(f\”Thread {thread_id} has released access to the resource pool.\”)
# 创建10个线程
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
在这个例子中,我们有一个有限的资源池,其大小由 RESOURCE_POOL_SIZE 定义。我们使用一个 threading.Semaphore 对象,将其初始值设为资源池的大小,以限制同时访问资源池的线程数量。
我们创建了10个线程,每个线程都尝试访问资源池。由于我们使用了信号量,一次只能有 RESOURCE_POOL_SIZE 个线程同时访问资源池。其他线程将等待,直到有线程释放资源。这样,我们可以限制同时访问资源的线程数量,防止资源竞争或过载。
threading.BoundedSemaphore:
有界信号量
import threading
# 创建一个有界信号量,初始值为2
semaphore = threading.BoundedSemaphore(2)
def access_resource(thread_id):
print(f\”Thread {thread_id} is requesting access to the resource.\”)
with semaphore:
print(f\”Thread {thread_id} has acquired access to the resource.\”)
# 模拟资源使用
# 创建3个线程
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# 尝试释放未获取的信号量,将引发 ValueError
try:
semaphore.release()
except ValueError as e:
print(\”Caught exception:\”, e)
在这个代码段中,semaphore.release() 尝试释放(即增加)信号量的计数。在Python中,信号量是一个用来限制线程并发数量的同步原语,它内部有一个计数器。当一个线程调用 acquire() 方法时,信号量的计数器减一;当一个线程调用 release() 方法时,信号量的计数器加一。
threading.BoundedSemaphore 与 threading.Semaphore 的行为基本相同,但有一点不同:如果在调用 release() 后,信号量的计数器的值大于创建信号量时设定的初始值,threading.BoundedSemaphore 将抛出 ValueError 异常。这种行为有助于检测程序中的一些错误,例如错误地多次释放了信号量。
所以在这个示例中,try/except 块是为了捕获并处理可能由 semaphore.release() 抛出的 ValueError 异常。如果在调用 release() 后,信号量的计数器的值大于创建信号量时设定的初始值,那么将会捕获到 ValueError,并打印出 “Caught exception:” 及其错误信息。
threading.Condition:
条件变量,允许一个或多个线程等待某个条件满足,然后唤醒。
import threading
import time
# 创建一个条件变量
condition = threading.Condition()
# 创建一个共享资源
resource = []
def producer():
for i in range(5):
time.sleep(1) # 模拟生产过程
with condition:
resource.append(i) # 向资源中添加数据
condition.notify() # 唤醒一个等待的线程
def consumer():
while True:
with condition:
while not resource: # 如果资源为空,则等待
condition.wait()
item = resource.pop(0) # 从资源中获取数据
print(f\”Consumer consumed: {item}\”)
if item == 4: # 如果消费了所有的资源,就退出循环
break
# 创建一个生产者线程和一个消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
threading.Condition 对象常常与一个锁一起使用,允许一个或多个线程等待直到满足某个特定的条件,然后唤醒。这在某些情况下很有用,比如当你需要一个或多个线程等待直到其他线程完成特定任务或者改变了某个状态。
threading.Barrier:
栅栏对象,允许一定数量的线程同步,直到所有线程都到达栅栏位置,才会全部释放
import threading
import time
# 设定栅栏,允许3个线程进行同步
barrier = threading.Barrier(3)
def worker(thread_id):
print(f\”Thread {thread_id} is starting.\”)
time.sleep(thread_id) # 模拟线程执行过程中的延时
print(f\”Thread {thread_id} is waiting at the barrier.\”)
barrier.wait() # 等待所有线程到达栅栏
print(f\”Thread {thread_id} is released from the barrier.\”)
# 创建3个线程
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
在这个例子中,我们创建了一个栅栏对象,允许3个线程进行同步。然后我们创建了3个线程,每个线程都会执行 worker 函数。在 worker 函数中,线程首先打印一个开始信息,然后等待一段时间(这里是线程ID,可以模拟线程执行过程中的延时),接着打印一个等待信息,然后调用 barrier.wait() 等待栅栏。
当所有3个线程都调用了 barrier.wait(),栅栏将释放所有等待的线程。这时,线程将继续执行并打印它们已经从栅栏中释放的信息。这个示例演示了如何使用 threading.Barrier 对象来同步一组线程,确保它们在某个点上相互等待。
到此这篇关于浅析python多线程中的锁的文章就介绍到这了,更多相关python多线程锁内容请搜索悠久资源网以前的文章或继续浏览下面的相关文章希望大家以后多多支持悠久资源网!
您可能感兴趣的文章:
- Python使用requests xpath 并开启多线程爬取西刺代理ip实例
- python实现多线程及线程间通信的简单方法
- Python从入门到精通之多线程使用详解
- python爬虫通过增加多线程获取数据
- Python实现多线程并发请求测试的脚本
- python如何开启多线程