Fork me on GitHub

进程和线程

定义:

进程:是程序运行的实例,是系统进行资源分配和调度的一个独立单位,它包括独立的地址空间,资源以及1个或多个线程.
线程:可以看成是轻量级的进程,是CPU调度和分派的基本单位.

区别:

1.调度 :从上面的定义可以看出一个是调度和分派的基本单位,一个是拥有资源的基本单位
2.共享地址空间,资源:进程拥有各自独立的地址空间,资源,所以共享复杂,需要用IPC,同步简单; 线程共享所属进程的资源,共享简单,但同步复杂,要通过加锁等措施.
3.占用内存,cpu: 进程占用内存多,切换复杂,CPU利用率低; 线程占用内存少,切换简单,CPU利用率高.
4.相互影响: 进程间不会相互影响; 一个线程挂掉会导致整个进程挂掉.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time


def show():
print(time.time(), threading.current_thread().name)


if __name__ == '__main__':
threading.Thread(target=show).run()
threading.Thread(target=show).start()
threading.Thread(target=show).start()
print(threading.main_thread().name)


1547886144.267069 MainThread
1547886144.2680657 Thread-2
1547886144.2690632 Thread-3
MainThread
1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import time


class MyThreading(threading.Thread):
def run(self):
print(time.time(), threading.current_thread().name)


if __name__ == '__main__':
MyThreading().start()

1547889092.2940536 Thread-1

start() 方法是启动一个子线程,线程名就是我们定义的name
run() 方法并不启动一个新线程,就是在主线程中调用了一个普通函数而已.

因此,如果你想启动多线程,就必须使用start()方法.

threading.join()

子线程执行该方法,阻塞主线程,直到子线程超时或者执行完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time

def show1():
time.sleep(10)
print(time.ctime(), threading.current_thread().name)

def show2():
print(time.ctime(), threading.current_thread().name)

if __name__ == '__main__':
print(threading.current_thread().name)

t1 = threading.Thread(target=show1)
t2 = threading.Thread(target=show2)
t1.start()
t1.join(timeout=4)
t2.start()

print(threading.main_thread().name)

MainThread
Wed Jan 23 13:30:24 2019 Thread-2
MainThread
Wed Jan 23 13:30:30 2019 Thread-1

threading.setDaemon()

将子线程标记为守护线程,主线程执行完毕时,不论子线程是否执行完毕,都要和主线程一起退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time


def show1():
time.sleep(10)
print(time.ctime(), threading.current_thread().name)


def show2():
print(time.ctime(), threading.current_thread().name)

if __name__ == '__main__':
print(threading.current_thread().name)

t1 = threading.Thread(target=show1)
t2 = threading.Thread(target=show2)

t1.setDaemon(True)

t1.start()
t1.join(timeout=4)

t2.start()

print(threading.main_thread().name)

MainThread
Wed Jan 23 13:45:28 2019 Thread-2
MainThread

t1为守护线程,主线程执行完毕时,t1会和主线程一起退出

queue

queue.Queue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

queue.LifoQueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

queue.PriorityQueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

multiprocessing.Process()

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self.name = '' # 进程的名称
self.daemon = False # 守护进程
self.authkey = None # 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串.这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功
self.exitcode = None
self.ident = 0
self.pid = 0
self.sentinel = None

def run(self):
pass

def start(self):
pass

# 强制终止进程
def terminate(self):
pass

def join(self, timeout=None):
pass

def is_alive(self):
return False

使用方法和线程类似

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing
import time


def show():
time.sleep(10)
print(time.ctime())

if __name__ == '__main__':
multiprocessing.Process(target=show).start()
multiprocessing.Process(target=show).start()
multiprocessing.Process(target=show).start()

进程结束后,id依然存在,不过已经没有意义了

主进程创建守护进程

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常

注意:进程之间是互相独立的(数据不共享),主进程代码运行结束,守护进程随即终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")

def bar():
print(456)
time.sleep(2)
print("end456")

if __name__ == '__main__':
print("main-------")
p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
# time.sleep(3)
print("main-------end")
print(p1.is_alive())
print(p2.is_alive())


p1是守护进程,随着主进程的结束而结束,可能会打印123,

进程和线程的应用场景

https://blog.csdn.net/wujiafei_njgcxy/article/details/77098977

锁机制

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import threading
import random
import time

gMoney = 1000
gLock = threading.Lock()
# 记录生产者生产的次数,达到10次就不再生产
gTimes = 0

class Producer(threading.Thread):
def run(self):
global gMoney
global gLock
global gTimes
while True:
money = random.randint(100, 1000)
gLock.acquire()
# 如果已经达到10次了,就不再生产了
if gTimes >= 10:
gLock.release()
break
gMoney += money
print('%s当前存入%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
gTimes += 1
time.sleep(0.5)
gLock.release()

class Consumer(threading.Thread):
def run(self):
global gMoney
global gLock
global gTimes
while True:
money = random.randint(100, 500)
gLock.acquire()
if gMoney > money:
gMoney -= money
print('%s当前取出%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
time.sleep(0.5)
else:
# 如果钱不够了,有可能是已经超过了次数,这时候就判断一下
if gTimes >= 10:
gLock.release()
break
print("%s当前想取%s元钱,剩余%s元钱,不足!" % (threading.current_thread(),money,gMoney))
gLock.release()

def main():
for x in range(5):
Consumer(name='消费者线程%d'%x).start()

for x in range(5):
Producer(name='生产者线程%d'%x).start()

if __name__ == '__main__':
main()

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全.
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题.这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道.
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性.

死锁现象与递归锁

互斥锁特点:只能acquire一次
递归锁特点:可以acquire多次,每acquire一次,计数加1,只要计数不为0,就不能被其他线程抢到

1、死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程.
解决方法:递归锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread,Lock,RLock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()

def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A锁\033[0m' %self.name)
mutexB.acquire()
print('\033[42m%s 拿到B锁\033[0m' %self.name)
mutexB.release()
mutexA.release()

def func2(self):
mutexB.acquire()
print('\033[43m%s 拿到B锁\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s 拿到A锁\033[0m' %self.name)
mutexA.release()
mutexB.release()

if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()

#分析:进程1执行到func2时,拿到B锁,请求A锁,进程1可能正好拿到A锁,请求B锁,互相等待,卡死

2、递归锁RLock
在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock.
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require.直到一个线程所有的acquire都被release,其他的线程才能获得资源.
上面例子改为:
mutexA=mutexB=RLock()
(此时A,B其实是同一把锁)
一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

#multiprocessing.Queue

主要方法:

  1. 初始化Queue(maxsize):创建一个先进先出的队列,如果maxsize <= 0,则队列大小为无限大.
  2. qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
  3. empty():调用此方法时queue为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目
  4. full():调用此方法时queue已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走.
  5. get():从队列读取并且删除一个元素.get方法有两个可选参数:blocked和timeout.如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常.如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
  6. put():用以插入数据到队列中,put方法有两个可选参数:blocked和timeout.如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间.如果超时,会抛出Queue.Full异常.如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了

生产者消费者模型总结

程序中有两类角色
    一类负责生产数据(生产者)
    一类负责处理数据(消费者)

引入生产者消费者模型为了解决的问题是:
    平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

如何实现:
    生产者<-->队列<——>消费者
生产者消费者模型实现类程序的解耦和

multiprocessing.Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间.多进程是实现并发的手段之一,需要注意的问题是:

1.很明显需要并发执行的任务通常要远大于核数
2.一个操作系统不可能无限开启进程,通常有几个核就开几个进程
3.进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个…手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效.

1
2
3
4
5
6
7
8
9
class Pool(object):
...

def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

一些方法

1
2
3
4
5
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果.需要强调的是:此操作并不会在所有池工作进程中并执行func函数.如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果.此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数.当func的结果变为可用时,将理解传递给callback.callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果.

p.close():关闭进程池,防止进一步操作.如果所有操作持续挂起,它们将在工作进程终止前完成
P.jion():等待所有工作进程退出.此方法只能在close()或teminate()之后调用

同步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2

if __name__ == '__main__':
p=Pool() #进程池中从无到有创建n个进程,以后一直是这n个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限
res_l.append(res)
print(res_l)

异步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2

if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
res_l.append(res)

#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close()
p.join()
for res in res_l:
print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

同步执行实际上就是调用异步的get方法

1
2
3
4
5
6
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
Pool must be running.
'''
return self.apply_async(func, args, kwds).get()

map

1
2
3
4
5
6
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def func(n):
# print(n**2)
return n

pool = ThreadPoolExecutor(max_workers=5)
a = pool.map(func, range(20))

for i in range(20):
print(pool.submit(func, i).result())

pool.shutdown() # close+join

print([x for x in a])
-------------本文结束感谢您的阅读-------------
坚持原创技术分享,您的支持将鼓励我继续创作!
0%