进程与线程二

进程、线程、协程

本节内容

  1. 操作系统发展史介绍
  2. 进程、与线程区别
  3. python GIL全局解释器锁
  4. 线程
    1. 语法
    2. join
    3. 线程锁之Lock\Rlock\信号量
    4. 将线程变为守护进程
    5. Event事件 
    6. queue队列
    7. 生产者消费者模型
    8. Queue队列
    9. 开发一个线程池
  5. 进程
    1. 语法
    2. 进程间通讯
    3. 进程池    

操作系统发展史

手工操作(无操作系统)

1946年第一台计算机诞生–20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式。

手工操作
程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把程序和数据输入计算机内存,接着通过控制台开关启动程序针对数据运行;计算完毕,打印机输出计算结果;用户取走结果并卸下纸带(或卡片)后,才让下一个用户上机。

手工操作方式两个特点:
(1)用户独占全机。不会出现因资源已被其他用户占用而等待的现象,但资源的利用率低。
(2)CPU 等待手工操作。CPU的利用不充分。

 20世纪50年代后期,出现人机矛盾:手工操作的慢速度和计算机的高速度之间形成了尖锐矛盾,手工操作方式已严重损害了系统资源的利用率(使资源利用率降为百分之几,甚至更低),不能容忍。唯一的解决办法:只有摆脱人的手工操作,实现作业的自动过渡。这样就出现了成批处理。

批处理系统

批处理系统:加载在计算机上的一个系统软件,在它的控制下,计算机能够自动地、成批地处理一个或多个用户的作业(这作业包括程序、数据和命令)。

联机批处理系统
首先出现的是联机批处理系统,即作业的输入/输出由CPU来处理。
主机与输入机之间增加一个存储设备——磁带,在运行于主机上的监督程序的自动控制下,计算机可自动完成:成批地把输入机上的用户作业读入磁带,依次把磁带上的用户作业读入主机内存并执行并把计算结果向输出机输出。完成了上一批作业后,监督程序又从输入机上输入另一批作业,保存在磁带上,并按上述步骤重复处理。

监督程序不停地处理各个作业,从而实现了作业到作业的自动转接,减少了作业建立时间和手工操作时间,有效克服了人机矛盾,提高了计算机的利用率。

但是,在作业输入和结果输出时,主机的高速CPU仍处于空闲状态,等待慢速的输入/输出设备完成工作: 主机处于“忙等”状态。

脱机批处理系统
为克服与缓解高速主机与慢速外设的矛盾,提高CPU的利用率,又引入了脱机批处理系统,即输入/输出脱离主机控制。
这种方式的显著特征是:增加一台不与主机直接相连而专门用于与输入/输出设备打交道的卫星机。
其功能是:
(1)从输入机上读取用户作业并放到输入磁带上。
(2)从输出磁带上读取执行结果并传给输出机。

这样,主机不是直接与慢速的输入/输出设备打交道,而是与速度相对较快的磁带机发生关系,有效缓解了主机与设备的矛盾。主机与卫星机可并行工作,二者分工明确,可以充分发挥主机的高速计算能力。

脱机批处理系统:20世纪60年代应用十分广泛,它极大缓解了人机矛盾及主机与外设的矛盾。IBM-7090/7094:配备的监督程序就是脱机批处理系统,是现代操作系统的原型。

不足:每次主机内存中仅存放一道作业,每当它运行期间发出输入/输出(I/O)请求后,高速的CPU便处于等待低速的I/O完成状态,致使CPU空闲。

为改善CPU的利用率,又引入了多道程序系统。

多道程序系统

多道程序设计技术

所谓多道程序设计技术,就是指允许多个程序同时进入内存并运行。即同时把多个程序放入内存,并允许它们交替在CPU中运行,它们共享系统中的各种硬、软件资源。当一道程序因I/O请求而暂停运行时,CPU便立即转去运行另一道程序。

单道程序的运行过程:
在A程序计算时,I/O空闲, A程序I/O操作时,CPU空闲(B程序也是同样);必须A工作完成后,B才能进入内存中开始工作,两者是串行的,全部完成共需时间=T1+T2。

多道程序的运行过程:
将A、B两道程序同时存放在内存中,它们在系统的控制下,可相互穿插、交替地在CPU上运行:当A程序因请求I/O操作而放弃CPU时,B程序就可占用CPU运行,这样 CPU不再空闲,而正进行A I/O操作的I/O设备也不空闲,显然,CPU和I/O设备都处于“忙”状态,大大提高了资源的利用率,从而也提高了系统的效率,A、B全部完成所需时间<<T1+T2。

多道程序设计技术不仅使CPU得到充分利用,同时改善I/O设备和内存的利用率,从而提高了整个系统的资源利用率和系统吞吐量(单位时间内处理作业(程序)的个数),最终提高了整个系统的效率。

单处理机系统中多道程序运行时的特点:
(1)多道:计算机内存中同时存放几道相互独立的程序;
(2)宏观上并行:同时进入系统的几道程序都处于运行过程中,即它们先后开始了各自的运行,但都未运行完毕;
(3)微观上串行:实际上,各道程序轮流地用CPU,并交替运行。

多道程序系统的出现,标志着操作系统渐趋成熟的阶段,先后出现了作业调度管理、处理机管理、存储器管理、外部设备管理、文件系统管理等功能。

多道批处理系统
20世纪60年代中期,在前述的批处理系统中,引入多道程序设计技术后形成多道批处理系统(简称:批处理系统)。
它有两个特点:
(1)多道:系统内可同时容纳多个作业。这些作业放在外存中,组成一个后备队列,系统按一定的调度原则每次从后备作业队列中选取一个或多个作业进入内存运行,运行作业结束、退出运行和后备作业进入运行均由系统自动实现,从而在系统中形成一个自动转接的、连续的作业流。
(2)成批:在系统运行过程中,不允许用户与其作业发生交互作用,即:作业一旦进入系统,用户就不能直接干预其作业的运行。

批处理系统的追求目标:提高系统资源利用率和系统吞吐量,以及作业流程的自动化。

批处理系统的一个重要缺点:不提供人机交互能力,给用户使用计算机带来不便。虽然用户独占全机资源,并且直接控制程序的运行,可以随时了解程序运行情况。但这种工作方式因独占全机造成资源效率极低。

一种新的追求目标:既能保证计算机效率,又能方便用户使用计算机。 20世纪60年代中期,计算机技术和软件技术的发展使这种追求成为可能。

分时系统

由于CPU速度不断提高和采用分时技术,一台计算机可同时连接多个用户终端,而每个用户可在自己的终端上联机使用计算机,好象自己独占机器一样。

分时技术:把处理机的运行时间分成很短的时间片,按时间片轮流把处理机分配给各联机作业使用。

若某个作业在分配给它的时间片内不能完成其计算,则该作业暂时中断,把处理机让给另一作业使用,等待下一轮时再继续其运行。由于计算机速度很快,作业运行轮转得很快,给每个用户的印象是,好象他独占了一台计算机。而每个用户可以通过自己的终端向系统发出各种操作控制命令,在充分的人机交互情况下,完成作业的运行。

具有上述特征的计算机系统称为分时系统,它允许多个用户同时联机使用计算机。

特点:
(1)多路性。若干个用户同时使用一台计算机。微观上看是各用户轮流使用计算机;宏观上看是各用户并行工作。
(2)交互性。用户可根据系统对请求的响应结果,进一步向系统提出新的请求。这种能使用户与系统进行人机对话的工作方式,明显地有别于批处理系统,因而,分时系统又被称为交互式系统。
(3)独立性。用户之间可以相互独立操作,互不干扰。系统保证各用户程序运行的完整性,不会发生相互混淆或破坏现象。
(4)及时性。系统可对用户的输入及时作出响应。分时系统性能的主要指标之一是响应时间,它是指:从终端发出命令到系统予以应答所需的时间。

分时系统的主要目标:对用户响应的及时性,即不至于用户等待每一个命令的处理时间过长。

分时系统可以同时接纳数十个甚至上百个用户,由于内存空间有限,往往采用对换(又称交换)方式的存储方法。即将未“轮到”的作业放入磁盘,一旦“轮到”,再将其调入内存;而时间片用完后,又将作业存回磁盘(俗称“滚进”、“滚出“法),使同一存储区域轮流为多个用户服务。

多用户分时系统是当今计算机操作系统中最普遍使用的一类操作系统。

实时系统

虽然多道批处理系统和分时系统能获得较令人满意的资源利用率和系统响应时间,但却不能满足实时控制与实时信息处理两个应用领域的需求。于是就产生了实时系统,即系统能够及时响应随机发生的外部事件,并在严格的时间范围内完成对该事件的处理。
实时系统在一个特定的应用中常作为一种控制设备来使用。

实时系统可分成两类:
(1)实时控制系统。当用于飞机飞行、导弹发射等的自动控制时,要求计算机能尽快处理测量系统测得的数据,及时地对飞机或导弹进行控制,或将有关信息通过显示终端提供给决策人员。当用于轧钢、石化等工业生产过程控制时,也要求计算机能及时处理由各类传感器送来的数据,然后控制相应的执行机构。
(2)实时信息处理系统。当用于预定飞机票、查询有关航班、航线、票价等事宜时,或当用于银行系统、情报检索系统时,都要求计算机能对终端设备发来的服务请求及时予以正确的回答。此类对响应及时性的要求稍弱于第一类。

实时操作系统的主要特点:
(1)及时响应。每一个信息接收、分析处理和发送的过程必须在严格的时间限制内完成。
(2)高可靠性。需采取冗余措施,双机系统前后台工作,也包括必要的保密措施等。

进程与线程

什么是进程(process)?

程序的执行实例称为进程。

每个进程都提供执行程序所需的资源。进程具有虚拟地址空间,可执行代码,系统对象的打开句柄,安全上下文,唯一进程标识符,环境变量,优先级类,最小和最大工作集大小以及至少一个执行线程。每个进程都使用单个线程启动,通常称为主线程,但可以从其任何线程创建其他线程。

程序不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调试下,可以实现并发地执行。正是这样的设计,大大提高 了cpu的利用率。进程的出现让每个用户感觉到自己独享cpu,因此,进程就是为了在cpu上实现多道编程而提出的。

有了进程为什么还要线程?

进程有很多优点,提供了多道编程,让我们感觉我们每个人都拥有自己的cpu和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

例如,我们在使用qq聊天, qq做为一个独立进程如果同一时间只能干一件事,那他如何实现在同一时刻 即能监听键盘输入、又能监听其它人给你发的消息、同时还能把别人发的消息显示在屏幕上呢?你会说,操作系统不是有分时么?但我的亲,分时是指在不同进程间的分时呀, 即操作系统处理一会你的qq任务,又切换到word文档任务上了,每个cpu时间片分给你的qq程序时,你的qq还是只能同时干一件事呀。

再直白一点, 一个操作系统就像是一个工厂,工厂里面有很多个生产车间,不同的车间生产不同的产品,每个车间就相当于一个进程,且你的工厂又穷,供电不足,同一时间只能给一个车间供电,为了能让所有车间都能同时生产,你的工厂的电工只能给不同的车间分时供电,但是轮到你的qq车间时,发现只有一个干活的工人,结果生产效率极低,为了解决这个问题,应该怎么办呢?。。。。没错,你肯定想到了,就是多加几个工人,让几个人工人并行工作,这每个工人,就是线程!

什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

线程是执行上下文,它是CPU执行指令流所需的所有信息。
假设你正在读一本书,而你现在想休息一下,但是你希望能够从你停下来的确切位置回来并继续阅读。实现这一目标的一种方法是记下页码,行号和字号。因此,阅读书籍的执行环境就是这三个数字。
如果你有一个室友,并且她使用相同的技术,她可以在你不使用时拿走这本书,并从她停下的地方继续阅读。然后你可以把它拿回来,并从你所在的地方恢复。
线程以相同的方式工作。 CPU正在给你一种错觉,即它同时进行多次计算。它通过在每次计算上花费一点时间来做到这一点。它可以做到这一点,因为它有每个计算的执行上下文。就像您可以与朋友共享一本书一样,许多任务可以共享CPU。
在技术层面上,执行上下文(因此是一个线程)由CPU寄存器的值组成。
最后:线程与进程不同。线程是执行的上下文,而进程是与计算相关联的一堆资源。一个进程可以有一个或多个线程。
澄清:与进程关联的资源包括内存页面(进程中的所有线程具有相同的内存视图),文件描述符(例如,打开套接字)和安全凭证(例如,启动该进程的用户的ID)处理)。

进程与线程的区别?

1
2
3
4
5
6
线程共享创建它的进程的地址空间;进程有自己的地址空间。
线程可以直接访问其进程的数据段;进程拥有自己父进程数据段的副本。
线程可以直接与其进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
新线程很容易创建;新流程需要重复父流程。
线程可以对同一进程的线程进行相当大的控制;进程只能控制子进程。
对主线程的更改(取消,优先级更改等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。

Python GIL(Global Interpreter Lock)  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,擦。。。,那这还叫什么多线程呀?莫如此早的下结结论,听我现场讲。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf

Python threading模块

线程有2种调用方式,如下:

直接调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time

def sayhi(num): #定义每个线程要运行的函数

print("running on number:%s" %num)

time.sleep(3)

if __name__ == '__main__':

t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例

t1.start() #启动线程
t2.start() #启动另一个线程

print(t1.getName()) #获取线程名
print(t2.getName())

继承式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time


class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num

def run(self):#定义每个线程要运行的函数

print("running on number:%s" %self.num)

time.sleep(3)

if __name__ == '__main__':

t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()

加入&守护进程

有些线程执行后台任务,例如发送keepalive数据包,或执行定期垃圾收集等等。这些仅在主程序运行时才有用,并且一旦其他非守护程序线程退出就可以将它们终止。

如果没有守护程序线程,您必须跟踪它们,并在程序完全退出之前告诉它们退出。通过将它们设置为守护程序线程,您可以让它们运行并忘记它们,当程序退出时,任何守护程序线程都会自动终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import time
import threading


def run(n):

print('[%s]------running----\n' % n)
time.sleep(2)
print('--done--')

def main():
for i in range(5):
t = threading.Thread(target=run,args=[i,])
t.start()
t.join(1)
print('starting thread', t.getName())


m = threading.Thread(target=main,args=[])
m.setDaemon(True) #将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout=2)
print("---main thread done----")

注意:守护程序线程在关闭时突然停止。他们的资源(例如打开文件,数据库事务等)可能无法正确发布。如果您希望线程正常停止,请将它们设置为非守护进程并使用合适的信号机制(如Event)。

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading

def addNum():
global num #在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
num -=1 #对此公共变量进行-1操作

num = 100 #设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
t.join()


print('final num:', num )

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import threading

def addNum():
global num #在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
lock.acquire() #修改数据前加锁
num -=1 #对此公共变量进行-1操作
lock.release() #修改后释放

num = 100 #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
t.join()

print('final num:', num )

GIL VS Lock

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体我们通过下图来看一下+配合我现场讲给大家,就明白了。

那你又问了, 既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading,time

def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2)


if __name__ == '__main__':

num,num2 = 0,0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()

while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading,time

def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" %n)
semaphore.release()

if __name__ == '__main__':

num= 0
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run,args=(i,))
t.start()

while threading.active_count() != 1:
pass #print threading.active_count()
else:
print('----all threads done---')
print(num)

计时器
此类表示仅在经过一定时间后才应运行的操作与线程一样,通过调用start()方法启动计时器。可以通过调用thecancel()方法停止计时器(在其动作开始之前)。计时器在执行其操作之前等待的时间间隔可能与用户指定的时间间隔不完全相同。

1
2
3
4
5
def hello():
print("hello, world")

t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed

事件
事件是一个简单的同步对象;
该事件代表一个内部标志和线程
可以等待设置标志,或者自己设置或清除标志。
event = threading.Event()
#客户端线程可以等待设置标志
event.wait()

#a服务器线程可以设置或重置它
event.set()
event.clear()
如果设置了标志,则wait方法不会执行任何操作。
如果该标志被清除,则等待将被阻塞,直到它再次被设置为止。
任意数量的线程都可以等待同一事件。

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #绿灯状态
count = 0
while True:
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count <13:
print('\033[43;1m--yellow light on---\033[0m')
elif count <20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() #打开绿灯
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): #绿灯
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()

这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 1 #_*_coding:utf-8_*_
2 __author__ = 'Alex Li'
3 import threading
4 import time
5 import random
6
7 def door():
8 door_open_time_counter = 0
9 while True:
10 if door_swiping_event.is_set():
11 print("\033[32;1mdoor opening....\033[0m")
12 door_open_time_counter +=1
13
14 else:
15 print("\033[31;1mdoor closed...., swipe to open.\033[0m")
16 door_open_time_counter = 0 #清空计时器
17 door_swiping_event.wait()
18
19
20 if door_open_time_counter > 3:#门开了已经3s了,该关了
21 door_swiping_event.clear()
22
23 time.sleep(0.5)
24
25
26 def staff(n):
27
28 print("staff [%s] is comming..." % n )
29 while True:
30 if door_swiping_event.is_set():
31 print("\033[34;1mdoor is opened, passing.....\033[0m")
32 break
33 else:
34 print("staff [%s] sees door got closed, swipping the card....." % n)
35 print(door_swiping_event.set())
36 door_swiping_event.set()
37 print("after set ",door_swiping_event.set())
38 time.sleep(0.5)
39 door_swiping_event = threading.Event() #设置事件
40
41
42 door_thread = threading.Thread(target=door)
43 door_thread.start()
44
45
46
47 for i in range(5):
48 p = threading.Thread(target=staff,args=(i,))
49 time.sleep(random.randrange(3))
50 p.start()

queue队列

当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。

  • class queue.Queue(maxsize=0) #先入先出

  • class queue.LifoQueue(maxsize=0) #last in fisrt out 先入后出

  • class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

优先级队列的构造函数。 maxsize是一个整数,用于设置可以放入队列的项目数的上限。达到此大小后,插入将阻止,直到消耗队列项。如果maxsize小于或等于零,则队列大小为无限大。
首先检索最低值的条目(最低值条目是由sorted(list(entries))[0]返回的条目。条目的典型模式是以下形式的元组:(priority_number,data)。

exception queue.Empty

在对空的Queue对象调用非阻塞get()(或get_nowait())时引发异常。

exception queue.Full

在已满的Queue对象上调用非阻塞put()(或put_nowait())时引发异常。

  • Queue.``qsize()

  • Queue.``empty() #return True if empty

  • Queue.``full() # return True if full

  • Queue.``put(item, block=True, timeout=None)

将项目放入队列。如果可选的args块为true且timeout为None(默认值),则在必要时阻塞,直到有空闲插槽可用。如果timeout是一个正数,它会阻止最多超时秒,如果在该时间内没有可用的空闲槽,则会引发Full异常。否则(块为假),如果空闲插槽立即可用,则将项目放入队列,否则引发完全异常(在这种情况下忽略超时)。

  • Queue.``put_nowait(item)

    Equivalent to put(item, False).

  • Queue.``get(block=True, timeout=None)

从队列中删除并返回一个项目。如果可选的args块为true且timeout为None(默认值),则在必要时阻止,直到某个项可用为止。如果timeout是一个正数,它会阻止最多超时秒,如果在该时间内没有可用的项,则会引发Empty异常。否则(块为假),如果一个项立即可用则返回一个项,否则引发Empty异常(在这种情况下忽略超时)。

Queue.``get_nowait()

Equivalent to get(False).

提供了两种方法来支持跟踪守护进程消费者线程是否已完全处理入队任务。

Queue.task_done()
表示以前排队的任务已完成。由队列使用者线程使用。对于用于获取任务的每个get(),对task_done()的后续调用会告知队列该任务的处理已完成。

如果join()当前正在阻塞,则它将在所有项目都已处理后恢复(这意味着已为每个已放入队列的项目收到task_done()调用)。

如果调用的次数超过队列中放置的项目,则引发ValueError。

Queue.``join() block直到queue被消费完毕

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

下面来学习一个最基本的生产者消费者模型的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import queue

def producer():
for i in range(10):
q.put("骨头 %s" % i )

print("开始等待所有的骨头被取走...")
q.join()
print("所有的骨头被取完了...")


def consumer(n):

while q.qsize() >0:

print("%s 取到" %n , q.get())
q.task_done() #告知这个任务执行完了


q = queue.Queue()



p = threading.Thread(target=producer,)
p.start()

c1 = consumer("李闯")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <20:
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
def Consumer(name):
count = 0
while count <20:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

多进程multiprocessing

multiprocessing是一个使用类似于线程模块的API支持产生进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效地侧向执行全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。

1
2
3
4
5
6
7
8
9
10
from multiprocessing import Process
import time
def f(name):
time.sleep(2)
print('hello', name)

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

要显示所涉及的各个进程ID,以下是一个扩展示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
import os

def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n")

def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name)

if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start()
p.join()

进程间通讯  

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues

使用方法跟threading里的queue差不多

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()

Pipes

Pipe()函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()

Pipe()返回的两个连接对象代表管道的两端。每个连接对象都有send()和recv()方法(以及其他方法)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。

Manager
Manager()返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager()返回的管理器将支持类型列表,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array。例如,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(1)
print(l)

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()

l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()

print(d)
print(l)

进程同步

如果不使用不同进程的锁定输出,可能会混淆不清。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()

if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()

进程池  

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from  multiprocessing import Process,Pool
import time

def Foo(i):
time.sleep(2)
return i+100

def Bar(arg):
print('-->exec done:',arg)

pool = Pool(5)

for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
#pool.apply(func=Foo, args=(i,))

print('end')
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。