本节内容
- 进程与线程的概念
- Python threading 模块
- GIL——global interpreter lock
- Mutex互斥锁(线程锁)
- Semaphore信号量
- Events事件
- Queue队列
1、进程与线程的概念
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。正是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
有了进程为什么还要线程?
进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。但进程还是有很多缺陷的,主要体现为:
-
进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
-
进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
什么是进程(process)?
对各种资源管理的集合,称之为进程。各类资源包括:内存的调用、网卡的调用等等。
每一个应用程序就是一个进程。
一个进程至少包含一个线程。
正在运行的进程都有唯一的一个ID号,即PID。如下图:
操作系统调用进程的时候,不会用进程名称进行调用,而是用唯一的PID进行调用。
什么是线程(thread)?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并发的执行不同的任务。
所以线程可以简单的理解为: 线程 = 一堆指令
【注意】:进程要操作CPU,必须先创建一个线程。
总结:进程就是各种资源的集合,线程是一堆指令。进程包含一个或多个线程,进程的操作是要靠线程进行的。
进程、线程与内存的关系:
进程的内存空间相对独立,进程之间不能互相访问对方的内存空间。
所有在同一个进程里的线程是共享同一块内存空间的。
进程与线程的区别?
Q1:进程快还是线程快?
A:这没有可比性,进程是资源的集合,线程是一堆指令,进程想要执行也是依靠线程进行的。
Q2:启动一个进程快还是启动一个线程快?
A:肯定是启动一个线程快了。启动进程需要向OS申请各种资源,但是启动线程就是生成一堆指令,一下子就出来了。
区别:
1、线程共享内存空间,进程的内存是相互独立的。
2、同一个父进程创建的子进程之间,内存空间相互独立。但同一个父进程创建的子线程之间,内存空间共享。
3、同一个进程的线程之间可以直接交流;两个进程之间想通信,必须通过一个中间代理。
4、创建新线程很简单,创建新进程需要对其父进程进行克隆
5、一个线程可以控制和操作同一个进程里的其他线程,但进程只能操作子进程。
6、对主线程的修改,可能会影响同一个进程里的其他线程的行为。但是对父进程的修改不会影响其他子进程。
什么时候使用多线程:
I/O操作不占CPU
计算占用CPU
python多线程不适合CPU密集操作型的任务。适合I/O操作密集型的任务。
因为python多线程是伪多线程,其实是在不同的线程之间进行切换,切换就要保持当前线程状态,读取下一线程状态,这些操作会增加CPU负载。所以对于本身就是CPU密集型的任务而言并不适合多线程。
2、Python threading模块
线程有2种调用方式,如下:
直接调用(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import threading import time def sayhi(num): #定义每个线程要运行的函数 print ( "running on number:%s" % num) time.sleep( 3 ) if __name__ = = '__main__' : t1 = threading.Thread(target = sayhi,args = ( 1 ,)) #生成一个线程实例 t2 = threading.Thread(target = sayhi,args = ( 2 ,)) #生成另一个线程实例 t1.start() #启动线程 t2.start() #启动另一个线程 print (t1.getName()) #获取线程名 print (t2.getName()) |
继承式调用(不推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import threading import time class MyThread(threading.Thread): def __init__( self ,num): threading.Thread.__init__( self ) self .num = num def run( self ): #定义每个线程要运行的函数 print ( "running on number:%s" % self .num) time.sleep( 3 ) if __name__ = = '__main__' : t1 = MyThread( 1 ) t2 = MyThread( 2 ) t1.start() t2.start() |
这里会出现一个问题:
在用time模块统计一个主线程运行时间时,由于多线程独立运行,所以这里的计时器只能统计主线程运行时间,这个时间不包括子线程运行时间,即不管子线程是否执行完毕,只要主线程执行完毕,计时器就得出结果。
在使用time模块计时时需要注意这个坑。。。。。。
如果非要在主线程中计算所有线程执行的时间的话,可以先让主线程等待子线程的执行结果,然后再计算时间。这里用到的方法是.join()
例如,我想等待t1这个子线程的执行结果,就在主线程中使用指令: t1.join()
如果用循环的方式启动子线程,会出现一个问题,没办法对子线程进行命名,或者说没办法给子线程门牌号。这样不利于后段程序的调用。
这时只需要在实例化子线程时使用一个临时变量,然后通过.append()方法,把每个子线程对应的对象加入之前设置好的列表中即可。
然后用for循环遍历存放子线程的列表,即可按顺序取出之前新建的子线程对象。
主线程就是程序本身,自己是看不到的。但是可以通过一个命令来证明,.py文件就是主线程。
print(threading.current_thread()) #打印当前线程
如果结果中出现 MainThread就是表示这个线程是主线程。没有则表示是子线程。
守护线程
守护线程相当于主线程的仆人。当主线程执行完毕后,不管守护线程是否执行完毕,程序都会退出。
主程序退出前,会等待所有的非守护线程执行完毕再退出。
t.setDaemon(True) #把t这个子线程设置为守护线程。
上面这条指令一定要在子线程开始之前,即t.star()指令之前。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | #_*_coding:utf-8_*_ __author__ = 'Alex Li' import time import threading def run(n): print ( '[%s]------running----\n' % n) time.sleep( 2 ) print ( '--done--' ) def main(): for i in range ( 5 ): t = threading.Thread(target = run,args = [i,]) t.start() t.join( 1 ) print ( 'starting thread' , t.getName()) m = threading.Thread(target = main,args = []) m.setDaemon( True ) #将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务 m.start() m.join(timeout = 2 ) print ( "---main thread done----" ) |
3、Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。
所以我们之前说的多线程其实是伪多线程,本质也是CPU在代码部分的上下文切换。只是这种切换太快了。
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
这篇文章透彻的剖析了GIL对python多线程的影响:http://www.dabeaz.com/python/UnderstandingGIL.pdf
4、Mutex互斥锁(线程锁)
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 print ( '--get num:' ,num ) time.sleep( 1 ) num - = 1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range ( 100 ): t = threading.Thread(target = addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print ( 'final num:' , num ) |
正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0。这个原因很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处理完的结果是99,但此时B线程运算完的结果也是99,两个线程同时将CPU运算的结果再赋值给num变量后,结果就都是99。所以每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
*注:python3.x上的结果总是正确的,可能是自动加了锁。
所以多个线程同时修改同一份数据时,必须加互斥锁即Mutex。
加锁版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 print ( '--get num:' ,num ) time.sleep( 1 ) lock.acquire() #修改数据前加锁 num - = 1 #对此公共变量进行-1操作 lock.release() #修改后释放 num = 100 #设定一个共享变量 thread_list = [] lock = threading.Lock() #生成全局锁 for i in range ( 100 ): t = threading.Thread(target = addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print ( 'final num:' , num ) |
GIL VS Lock
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 这里的lock是用户级的lock,跟那个GIL没关系 。原因如下图:
加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在写的python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时自己写的程序里的线程和 py解释器自己的线程是并发运行的,假设自己的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。
RLock(递归锁)
就是在一个大锁中还要再包含子锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import threading,time def run1(): print ( "grab the first part data" ) lock.acquire() global num num + = 1 lock.release() return num def run2(): print ( "grab the second part data" ) lock.acquire() global num2 num2 + = 1 lock.release() return num2 def run3(): lock.acquire() res = run1() print ( '--------between run1 and run2-----' ) res2 = run2() lock.release() print (res,res2) if __name__ = = '__main__' : num,num2 = 0 , 0 lock = threading.RLock() for i in range ( 10 ): t = threading.Thread(target = run3) t.start() while threading.active_count() ! = 1 : print (threading.active_count()) else : print ( '----all threads done---' ) print (num,num2) |
上述代码中,如果用lock
=
threading.Lock()代替l
ock
=
threading.RLock(),会出现程序把解锁的钥匙弄混,导致死循环,永远出不去的情况。
所以当连续锁很多次时,就需要使用递归锁。
5、Semaphore(信号量)
互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,当之前启动的线程完成后,新的线程才能启动、执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import threading,time def run(n): semaphore.acquire() #获得一个信号量 time.sleep( 1 ) print ( "run the thread: %s\n" % n) semaphore.release() #释放一个信号量 if __name__ = = '__main__' : semaphore = threading.BoundedSemaphore( 5 ) #最多允许5个线程同时运行 for i in range ( 20 ): t = threading.Thread(target = run,args = (i,)) t.start() while threading.active_count() ! = 1 : pass #print threading.active_count() else : print ( '----all threads done---' ) |
6、Events事件
事件是一个简单的同步对象,事件代表一个内置的标志位,并且线程可以等待标志位被设置为真或清除标志位。
调用事件前,需要先实例化一个事件对象。
event = threading.Event()
事件的4个方法:
1、event.wait() # 一个客户端线程可以等待事件标志位被设置为真。此时wait()使得线程处于阻塞状态 2、event.set() #一个服务线程可以设置标志位或清空标志位。 set()方法是设置标志位。 3、event.clear() #clear()方法是清空标志位 4、event.isSet() #isSet方法是判断标志位是否被设置。
如果事件标志位被设置,等待方法什么也不会做。
如果事件标志位被清空,等待方法将会处于阻塞状态,直到事件标志位被再一次设置。
多个线程可以同时等待一个相同的事件标志位的设置。
通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import threading,time import random def light(): if not event.isSet(): event. set () #wait就不阻塞 #绿灯状态 count = 0 while True : if count < 10 : print ( '\033[42;1m--green light on---\033[0m' ) elif count < 13 : print ( '\033[43;1m--yellow light on---\033[0m' ) elif count < 20 : if event.isSet(): event.clear() print ( '\033[41;1m--red light on---\033[0m' ) else : count = 0 event. set () #打开绿灯 time.sleep( 1 ) count + = 1 def car(n): while 1 : time.sleep(random.randrange( 10 )) if event.isSet(): #绿灯 print ( "car [%s] is running.." % n) else : print ( "car [%s] is waiting for the red light.." % n) if __name__ = = '__main__' : event = threading.Event() Light = threading.Thread(target = light) Light.start() for i in range ( 3 ): t = threading.Thread(target = car,args = (i,)) t.start() |
7、Queue队列
queue队列的作用:
1、提高运行效率
2、程序的解耦
队列的本质:一个有顺序的容器
其他的容器还有 列表、字典等。
队列与列表的区别:
数据从列表中取出后,数据依然存在于列表之中。但是数据从队列中取出后,原始数据会被从队列中删除。即对队列而言,数据只有一份,取走就没。
队列在线程编程中是十分有用的,尤其是在信息必须被安全的交换在多个线程之间的时候。
队列的种类:
- class
queue.
Queue
(maxsize=0) #先入先出 - class
queue.
LifoQueue
(maxsize=0) #last in fisrt out class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列 - 队列的方法:
Queue.
qsize
() Queue.
empty
() #return True if empty Queue.
full
() # return True if full Queue.
put
(item, block=True, timeout=None) Queue.
put_nowait
(item) Queue.
get
(block=True, timeout=None) Queue.
get_nowait
() Queue.
task_done
() Queue.
join
() block直到queue被消费完毕
用队列的生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
生产者消费者模型目的就是解耦,而队列只是用来解耦的一个工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | import threading import queue def producer(): for i in range ( 10 ): q.put( "骨头 %s" % i ) print ( "开始等待所有的骨头被取走..." ) q.join() print ( "所有的骨头被取完了..." ) def consumer(n): while q.qsize() > 0 : print ( "%s 取到" % n , q.get()) q.task_done() #告知这个任务执行完了 q = queue.Queue() p = threading.Thread(target = producer,) p.start() c1 = consumer( "Gavin" ) |