本文共 6598 字,大约阅读时间需要 21 分钟。
目录
和Lock很像,信号量对象内部维护一个倒数计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其他的线程对信号量release后,计数大于0,恢复阻塞的线程
名称 含义 Semaphore(value=1) 构造方法。value小于0,抛ValueError异常 acquire(blocking=True,timeout=None) 获取信号量,计数器减1,获取成功返回True release() 释放信号量,计数器加1 计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞
import threadingimport loggingimport timeFORMAT = '%(asctime)-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def worker(s:threading.Semaphore): logging.info("in sub thread") logging.info(s.acquire()) #阻塞 logging.info("sub thread over")#信号量s = threading.Semaphore(3)logging.info(s.acquire())print(s._value)logging.info(s.acquire())print(s._value)logging.info(s.acquire())print(s._value)threading.Thread(target=worker,args=(s,)).start()time.sleep(2)logging.info(s.acquire(False))logging.info(s.acquire(timeout=3))#释放logging.info("released")s.release()结果:2021-06-24 23:33:30,508 [MainThread, 17360] True2021-06-24 23:33:30,508 [MainThread, 17360] True2021-06-24 23:33:30,508 [MainThread, 17360] True2021-06-24 23:33:30,509 [Thread-1, 12680] in sub thread2102021-06-24 23:33:32,511 [MainThread, 17360] False2021-06-24 23:33:35,512 [MainThread, 17360] False2021-06-24 23:33:35,512 [MainThread, 17360] released2021-06-24 23:33:35,512 [Thread-1, 12680] True2021-06-24 23:33:35,512 [Thread-1, 12680] sub thread over
连接池:因为资源有限,且开启一个连接成本高,所以,使用连接池
一个简单的连接池:连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用
class Conn: def __init__(self,name): self.name = nameclass Pool: def __init__(self,count): self.count = count #池中是连接对象的列表 self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)] def _connect(self,conn_name): #创建连接的方法,返回一个名称 return Conn(conn_name) def get_conn(self): #从池中拿走一个连接 if len(self.pool) > 0 : return self.pool.pop() def return_pool(self,conn:Conn): #向池中添加一个连接 self.pool.append(conn)真正的连接池的实现比上面的例子要复杂的多,这里只是一个简单的功能的事项
本例中,get_conn()方法,在多线程的时候,有线程安全问题:
假设池中有一个连接,有可能多个线程判断池的长度是大于0的,当一个线程拿走了连接对象,其他线程再来pop就会抛出异常,如何解决?
1、加锁,在读写的地方加锁
2、使用信号量Semapore
1、使用信号量对上例进行修改
import threadingimport randomimport loggingFORMAT = '%(asctime)-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)class Conn: def __init__(self,name): self.name = nameclass Pool: def __init__(self,count): self.count = count #池中是连接对象的列表 self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)] self.semaphore = threading.Semaphore(count) def _connect(self,conn_name): #创建连接的方法,返回一个名称 return Conn(conn_name) def get_conn(self): #从池中拿走一个连接 print("~"*20) self.semaphore.acquire() print("="*20) conn = self.pool.pop() return conn def return_pool(self,conn:Conn): #向池中添加一个连接 self.pool.append(conn) self.semaphore.release()pool = Pool(3)def worker(pool:Pool): conn = pool.get_conn() logging.info(conn) #模拟使用了一段事件 threading.Event().wait(random.randint(1,4)) pool.return_pool(conn)for i in range(6): threading.Thread(target=worker,name="worek-{}".format(i),args=(pool,)).start()结果:2021-06-25 00:20:59,477 [worek-0, 23612] <__main__.Conn object at 0x02167030>2021-06-25 00:20:59,477 [worek-1, 23128] <__main__.Conn object at 0x02164F50>2021-06-25 00:20:59,477 [worek-2, 21476] <__main__.Conn object at 0x02164CF0>~~~~~~~~~~~~~~~~~~~~====================~~~~~~~~~~~~~~~~~~~~====================~~~~~~~~~~~~~~~~~~~~====================~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~2021-06-25 00:21:00,478 [worek-3, 23684] <__main__.Conn object at 0x02167030>====================2021-06-25 00:21:01,478 [worek-4, 12960] <__main__.Conn object at 0x02164F50>====================2021-06-25 00:21:02,478 [worek-5, 4208] <__main__.Conn object at 0x02164CF0>====================上列中,使用信号量解决资源有限的问题。
如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超时资源数,请求者只能等待。当使用者 用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源
注意:这个例子不能用于生成环境,只时为了说明信号量使用的例子,还有很多未完成功能self.pool.append(conn)这一句要不要加锁
1、从逻辑上分析
1.1、假设如果还没有使用信号量,就release,会怎样?
import threadingsema = threading.Semaphore(3)sema.release()sema.release()sema.release()print(sema.__dict__)结果:{'_cond':, 0)>, '_value': 6} 对上例输出结果看出,竟然内置计数器达到了4,这样实际上超出我们最大值,需要解决这个问题
BoundedSemaphore类
有界的信号量,不允许使用release超出初始值的范围,否则,抛出ValueError异常
这样用有界信号量修改源代码,保证如果多return_pool就会抛出异常
保证了多归还连接抛出异常如果归还了同一个连接多次怎么办,去重很容易判断出来
1.2如果使用了信号量,但是还没有用完
self.pool.append(conn)self.semphore.release()假设一种极端情况,计数器还差1就满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常
因此信号量,可以保证,一定不能多归还
1.3很多线程用完了信号量
没有获得信号量的线程都阻塞,没有线程和归还线程争抢,当append后才release,这时候才能唤醒等待的线程,才能pop,也就是没有获取信号量就不能pop,这是安全的。
经过上面的分析,信号量列表长度好,线程安全
信号量和锁
锁,只允许同一个事件一个线程独占资源,它是特殊的信号量,即信号量计数器初值为1.信号量,可以多个线程访问共享资源,但这个共享资源数量有限。
锁,可以看做特殊的信号量
Queue
标准库queue模块,提供FIFO的Queue、LIFO的队列,优先队列。
Queue类时线程安全的,适用于多线程间安全的交换数据。内部适用了Lock和Condition。
为什么将魔术方法时,说实话容器的大小,不准确?
如果不加锁,是不可能获得准确的大小的,因为你刚读取了一个大小,还没有取走,就有可能被其他线程改了。
Queue类的size虽然加了锁,但是,依然不能立即保证立即get、put就能成功,英文读取大小和get、put方法是分开的
import queueq = queue.Queue(8)if q.qsize() ==7: q.put() #上下两句可能被打断if q.qsize() ==1: q.get() #未必会成功
CPython在解释器进程级别有一把锁,叫做GIL全局解释器锁。
GIL保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也是如此
CPython中
IO密集型,由于线程阻塞,就会调度其他线程;
CPU密集型,当前线程可能连续的获得GIL,导致其他线程几乎无法适用CPU
在CPython中由于有GIL存在,IO密集型,适用多线程;CPU密集型,适用多线程,绕开GIL。
新版CPython正在努力优化GIL的问题,但不是移除。
如果非要适用多线程的效率问题,请绕行其他语言erlang,go等。Python中绝大多数内置结构的读写都是原子操作。 由于GIL的存在,Python的内置数据类型在多线程编程的时候,就变成安全的了,但是实际上他们本身不是线程安全类型的。
看下面两个程序
1、单线程import loggingimport datetimelogging.basicConfig(level=logging.INFO,format='%(thread)s %(message)s')start = datetime.datetime.now()#计算def calc(): sum = 0 for _ in range(1000000000): sum +=1calc()calc()calc()calc()calc()delta = (datetime.datetime.now() -start).total_seconds()logging.info(delta)结果14760 267.0979232、多线程
import loggingimport datetimeimport threadinglogging.basicConfig(level=logging.INFO,format='%(thread)s %(message)s')start = datetime.datetime.now()#计算def calc(): sum = 0 for _ in range(1000000000): sum +=1t1 = threading.Thread(target=calc)t2 = threading.Thread(target=calc)t3 = threading.Thread(target=calc)t4 = threading.Thread(target=calc)t1.start()t2.start()t3.start()t4.start()t1.join()t2.join()t3.join()t4.join()delta = (datetime.datetime.now() -start).total_seconds()logging.info(delta)结果:24616 215.609224从两端程序测试的结果来看,CPython中多线程根本没有任何优势,和一个线程执行时间相当。因为GIL的存在,尤其是像上面的计算密集型程序
转载地址:http://zoesi.baihongyu.com/