当前位置:网站首页>进程的两种创建方式,join方法,进程间的数据隔离,队列,进程间的通信IPC机制,生产者消费者模型,守护进程,僵尸进程,孤儿进程,互斥锁
进程的两种创建方式,join方法,进程间的数据隔离,队列,进程间的通信IPC机制,生产者消费者模型,守护进程,僵尸进程,孤儿进程,互斥锁
2022-08-09 16:50:00 【Jayxieming】
一:创建进程的两种方式:
方式1
from multiprocessing import Process
import time
def task(name):
print(f'{
name}开始执行')
time.sleep(1)
print(f'{
name}执行结束')
if __name__ == '__main__':
p = Process(target=task, args=('ming',))
p.start() # 告诉操作系统帮你创建一个进程,异步,相当于创建了一个副本来执行程序
print('主进程')
方式2
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f'{
self.name}开始执行')
time.sleep(1)
print(f'{
self.name}执行结束')
if __name__ == '__main__':
p = MyProcess('ming')
p.start()
print('主')
二:join方法
功能:主进程等子进程结束之后才能执行
1代码实现join方法
from multiprocessing import Process
import time
def task(name):
print(f'{
name}开始执行')
time.sleep(1)
print(f'{
name}执行结束')
if __name__ == '__main__':
p = Process(target=task, args=('ming',))
p.start()
p.join()
print('主进程')
""" ming开始执行 ming执行结束 主进程 """
from multiprocessing import Process
import time
def dask(n):
time.sleep(n)
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=dask, args=(1,))
p2 = Process(target=dask, args=(2,))
p3 = Process(target=dask, args=(3,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
end_time = time.time() - start_time
print(end_time) # 3.2495625019073486
print('主')
from multiprocessing import Process
import time
def dask(n):
time.sleep(n)
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=dask, args=(1,))
p2 = Process(target=dask, args=(2,))
p3 = Process(target=dask, args=(3,))
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
end_time = time.time() - start_time
print(end_time) # 6.780239820480347
print('主')
三:进程间数据是隔离的
1.代码验证
from multiprocessing import Process
import time
hobby = 'sing'
def change():
global hobby
hobby = 'read'
if __name__ == '__main__':
p = Process(target=change)
p.start()
print(hobby) # sing
四:队列
1.特点:先进先出
2.队列方法
- 1.put() 向队列中传入数据
- 2.get() 从队列中取出数据
- 3.full() 判断队列是否已满
- 4.empty() 判断队列是否为空
- 5.get_nowait() 从队列中获取数据,不等待,没有就直接报错
3.注意
- 上述full(),empty(),get_nowait()方法在多进程中会失效
from multiprocessing import Queue
q = Queue(3)
print(q.empty()) # True 判断队列是否为空
q.put(1)
q.put(2)
q.put(3)
print(q.full()) # True 判断队列是否为满
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
q.get_nowait() # 获取队列中的数据,不等待,没有直接报错
五:进程间相互通信(IPC机制)
1.什么是IPC机制?
主进程与子进程通信 子进程与子进程通信
from multiprocessing import Process,Queue
def run1(q):
q.put('子进程run1的数据')
def run2(q):
data = q.get()
print(f'子进程run2从队列中得到数据是:{
data}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=run1, args=(q,))
p2 = Process(target=run2, args=(q,))
p1.start()
p2.start()
# 执行结果: 子进程run2从队列中得到数据是:子进程run1的数据
总结:通过队列可以实现不同队列间的通信
六:生产者消费者模型
- 生产者:产生数据
- 消费者:处理数据
1.代码实现生产者消费者功能
""" 生产者:生产/制造东西的 消费者:消费/处理东西的 该模型除了上面两个之外还需要一个媒介 生产者和消费者之间不是直接做交互的 而是借助于媒介做交互的 生产者(做包子的) + 消息队列(蒸笼)+ 消费者(吃包子的) """
from multiprocessing import Process, Queue, JoinableQueue
import time
import random
def producer(name, food, q):
for i in range(1, 6):
data = f'{
name}生产的{
food}{
i}'
time.sleep(random.randint(1, 2))
print(data)
q.put(data)
def customer(name, q):
while True:
food = q.get() # 没有数据会卡住
# 判断当前是否结束标识
# if food is None: break
time.sleep(random.randint(1, 2))
print(f'{
name}吃了{
food}')
q.task_done() # 告诉队列你已经从里面取了一个数据并且处理完毕了
if __name__ == '__main__':
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer, args=('机器猫', '包子', q))
p2 = Process(target=producer, args=('大雄', '饺子', q))
c1 = Process(target=customer, args=('达夫', q))
c2 = Process(target=customer, args=('胖虎', q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
# q.put(None)
# q.put(None)
q.join() # 等待队列中所有的数据被取完再再往下执行代码
""" JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1 每当你调用task_done的时候 计数器-1 q.join() 当计数器为0的时候 才往后运行 """
# 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了
七:进程相关方法
- 1.current_process().pid 查看当前进程的进程号
- 2.os.getpid() 查看当前进程的进程号
- 3.os.getppid() 查看当前进程父进程的进程号
- 4.terminate() 销毁进程
- 5.is_alive() 判断当前进程是否存活
代码实现
from multiprocessing import Process, current_process
import os
def dask():
print(f'子进程:{
current_process().pid}') # 19664
print(f'子进程:{
os.getpid()}') # 19664
print(f'子进程的父进程:{
os.getppid()}') # 4816
if __name__ == '__main__':
p = Process(target=dask)
p.start()
p.terminate() # 销毁进程
p.is_alive() # 判断进程是否存活
print(f'主进程{
os.getpid()}') # 4816
八:守护进程
1.什么是守护进程?
守护对象执行结束自己立即也结束的进程
代码实现
from multiprocessing import Process
import time
def task():
print('子进程task正在执行')
time.sleep(3)
print('子进程task执行结束')
if __name__ == '__main__':
p = Process(target=task)
p.daemon = True
p.start()
time.sleep(1)
print('父进程执行结束')
九:僵尸进程和孤儿进程
1.僵尸进程
进程已经运行结束 但是相关的资源并没有完全清空 需要父进程参与回收
2.孤儿进程
父进程意外死亡 子进程正常运行 该子进程就称之为孤儿进程 孤儿进程也不是没有人管 操作系统会自动分配福利院接收
十:互斥锁
import random
from multiprocessing import Process, Lock
import json
import time
# 1.查票
def search(i):
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
print('用户%s 要查的余票是%s' % (i, dic.get('ticket_num')))
# 字典取值尽量用get()
# 2.买票
def buy(i):
# 先查票
with open('data', 'r', encoding='utf-8') as f:
dic = json.load(f)
# 模拟延迟
time.sleep(random.randint(1, 3))
# 判断道歉是否有余票
if dic.get('ticket_num') > 0:
# 修改数据库买票
dic["ticket_num"] -= 1
# 写入数据库
with open('data', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print('用户%s 买票成功' % i)
else:
print('用户%s 买票失败' % i)
# 整合上面的两个函数
def run(i, mutex):
search(i)
# 给买票环节加锁
# 抢锁
mutex.acquire()
buy(i)
# 释放锁
mutex.release()
if __name__ == '__main__':
# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
mutex = Lock()
for i in range(10):
p = Process(target=run, args=(i, mutex))
p.start()
结论:
1.多个进程操作同一份数据的时候,会出现数据错乱的问题 针对上述问题,解决方法就是加锁处理
2.互斥锁:将并发变成串行,牺牲效率但是保证了数据的安全性
3.注意:
- 1.锁不要轻易使用,会出现死锁现象(我们写代码一般不会用到,都是内部封装好的)
- 2.锁只在处理数据的部分加 来保证数据安全(只在争抢数据的环节加锁处理)
作业:
1.尝试将TCP服务端制作成并发效果
客户端服务端全部设置成自动发消息自动回消息
eg: 客户端发hello 服务端直接转大写回HELLO
服务端:
import socket
from multiprocessing import Process
server = socket.socket()
server.bind(('127.0.0.1', 8090))
server.listen(5)
sock, addr = server.accept()
# 将服务的代码单独封装成一个函数
def task(sock):
while True:
try:
data = sock.recv(1024)
if len(data) == 0: break
print(data.decode('utf8'))
sock.send(data.upper())
except ConnectionError as e:
print(e)
break
sock.close()
if __name__ == '__main__':
p = Process(target=task, args=(sock,))
p.start()
客户端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8090))
while True:
client.send('hello '.encode('utf8'))
data = client.recv(1024)
print(data.decode('utf8'))
边栏推荐
猜你喜欢
随机推荐
What is test development and why is it so popular now?
BSN季度版本2022年8月31日迭代更新预告
[SUCTF 2019]CheckIn
Ark Standalone/Administrator Special Item Command Codes
[极客大挑战 2019]HardSQL
Apache Doris 社区 PMC 杨政国:开源项目如何在自身和社区的需求中取得平衡?
How to adjust futures account opening process and handling fee
The senior told me that the MySQL of the big factory is connected through SSH
方舟开服务器Vmware虚拟机安装不上?
Functions and Features of Smart Home Control System
The strongest distributed lock tool: Redisson
好的架构是进化来的,不是设计来的
Axure实现表格带滚动条
低代码平台和专业开发人员——完美搭档?
openEuler Xiong Wei: How do you view the SIG organization model in the open source community?
LINE Verda Programming Contest (AtCoder Beginner Contest 263) A~E 题解
字符设备的注册
试试使用 Vitest 进行组件测试,确实很香。
在 .NET MAUI 中如何更好地自定义控件
如何在 PC 机上测试移动端的网页?









