当前位置:网站首页>进程的两种创建方式,join方法,进程间的数据隔离,队列,进程间的通信IPC机制,生产者消费者模型,守护进程,僵尸进程,孤儿进程,互斥锁
进程的两种创建方式,join方法,进程间的数据隔离,队列,进程间的通信IPC机制,生产者消费者模型,守护进程,僵尸进程,孤儿进程,互斥锁
2022-08-09 16:50:00 【Jayxieming】
一:创建进程的两种方式:
方式1
from multiprocessing import Process
import time
def task(name):
print(f'{
name}开始执行')
time.sleep(1)
print(f'{
name}执行结束')
if __name__ == '__main__':
p = Process(target=task, args=('ming',))
p.start() # 告诉操作系统帮你创建一个进程,异步,相当于创建了一个副本来执行程序
print('主进程')
方式2
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f'{
self.name}开始执行')
time.sleep(1)
print(f'{
self.name}执行结束')
if __name__ == '__main__':
p = MyProcess('ming')
p.start()
print('主')
二:join方法
功能:主进程等子进程结束之后才能执行
1代码实现join方法
from multiprocessing import Process
import time
def task(name):
print(f'{
name}开始执行')
time.sleep(1)
print(f'{
name}执行结束')
if __name__ == '__main__':
p = Process(target=task, args=('ming',))
p.start()
p.join()
print('主进程')
""" ming开始执行 ming执行结束 主进程 """
from multiprocessing import Process
import time
def dask(n):
time.sleep(n)
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=dask, args=(1,))
p2 = Process(target=dask, args=(2,))
p3 = Process(target=dask, args=(3,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
end_time = time.time() - start_time
print(end_time) # 3.2495625019073486
print('主')
from multiprocessing import Process
import time
def dask(n):
time.sleep(n)
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=dask, args=(1,))
p2 = Process(target=dask, args=(2,))
p3 = Process(target=dask, args=(3,))
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
end_time = time.time() - start_time
print(end_time) # 6.780239820480347
print('主')
三:进程间数据是隔离的
1.代码验证
from multiprocessing import Process
import time
hobby = 'sing'
def change():
global hobby
hobby = 'read'
if __name__ == '__main__':
p = Process(target=change)
p.start()
print(hobby) # sing
四:队列
1.特点:先进先出
2.队列方法
- 1.put() 向队列中传入数据
- 2.get() 从队列中取出数据
- 3.full() 判断队列是否已满
- 4.empty() 判断队列是否为空
- 5.get_nowait() 从队列中获取数据,不等待,没有就直接报错
3.注意
- 上述full(),empty(),get_nowait()方法在多进程中会失效
from multiprocessing import Queue
q = Queue(3)
print(q.empty()) # True 判断队列是否为空
q.put(1)
q.put(2)
q.put(3)
print(q.full()) # True 判断队列是否为满
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
q.get_nowait() # 获取队列中的数据,不等待,没有直接报错
五:进程间相互通信(IPC机制)
1.什么是IPC机制?
主进程与子进程通信 子进程与子进程通信
from multiprocessing import Process,Queue
def run1(q):
q.put('子进程run1的数据')
def run2(q):
data = q.get()
print(f'子进程run2从队列中得到数据是:{
data}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=run1, args=(q,))
p2 = Process(target=run2, args=(q,))
p1.start()
p2.start()
# 执行结果: 子进程run2从队列中得到数据是:子进程run1的数据
总结:通过队列可以实现不同队列间的通信
六:生产者消费者模型
- 生产者:产生数据
- 消费者:处理数据
1.代码实现生产者消费者功能
""" 生产者:生产/制造东西的 消费者:消费/处理东西的 该模型除了上面两个之外还需要一个媒介 生产者和消费者之间不是直接做交互的 而是借助于媒介做交互的 生产者(做包子的) + 消息队列(蒸笼)+ 消费者(吃包子的) """
from multiprocessing import Process, Queue, JoinableQueue
import time
import random
def producer(name, food, q):
for i in range(1, 6):
data = f'{
name}生产的{
food}{
i}'
time.sleep(random.randint(1, 2))
print(data)
q.put(data)
def customer(name, q):
while True:
food = q.get() # 没有数据会卡住
# 判断当前是否结束标识
# if food is None: break
time.sleep(random.randint(1, 2))
print(f'{
name}吃了{
food}')
q.task_done() # 告诉队列你已经从里面取了一个数据并且处理完毕了
if __name__ == '__main__':
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer, args=('机器猫', '包子', q))
p2 = Process(target=producer, args=('大雄', '饺子', q))
c1 = Process(target=customer, args=('达夫', q))
c2 = Process(target=customer, args=('胖虎', q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
# q.put(None)
# q.put(None)
q.join() # 等待队列中所有的数据被取完再再往下执行代码
""" JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1 每当你调用task_done的时候 计数器-1 q.join() 当计数器为0的时候 才往后运行 """
# 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了
七:进程相关方法
- 1.current_process().pid 查看当前进程的进程号
- 2.os.getpid() 查看当前进程的进程号
- 3.os.getppid() 查看当前进程父进程的进程号
- 4.terminate() 销毁进程
- 5.is_alive() 判断当前进程是否存活
代码实现
from multiprocessing import Process, current_process
import os
def dask():
print(f'子进程:{
current_process().pid}') # 19664
print(f'子进程:{
os.getpid()}') # 19664
print(f'子进程的父进程:{
os.getppid()}') # 4816
if __name__ == '__main__':
p = Process(target=dask)
p.start()
p.terminate() # 销毁进程
p.is_alive() # 判断进程是否存活
print(f'主进程{
os.getpid()}') # 4816
八:守护进程
1.什么是守护进程?
守护对象执行结束自己立即也结束的进程
代码实现
from multiprocessing import Process
import time
def task():
print('子进程task正在执行')
time.sleep(3)
print('子进程task执行结束')
if __name__ == '__main__':
p = Process(target=task)
p.daemon = True
p.start()
time.sleep(1)
print('父进程执行结束')
九:僵尸进程和孤儿进程
1.僵尸进程
进程已经运行结束 但是相关的资源并没有完全清空 需要父进程参与回收
2.孤儿进程
父进程意外死亡 子进程正常运行 该子进程就称之为孤儿进程 孤儿进程也不是没有人管 操作系统会自动分配福利院接收
十:互斥锁
import random
from multiprocessing import Process, Lock
import json
import time
# 1.查票
def search(i):
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
print('用户%s 要查的余票是%s' % (i, dic.get('ticket_num')))
# 字典取值尽量用get()
# 2.买票
def buy(i):
# 先查票
with open('data', 'r', encoding='utf-8') as f:
dic = json.load(f)
# 模拟延迟
time.sleep(random.randint(1, 3))
# 判断道歉是否有余票
if dic.get('ticket_num') > 0:
# 修改数据库买票
dic["ticket_num"] -= 1
# 写入数据库
with open('data', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print('用户%s 买票成功' % i)
else:
print('用户%s 买票失败' % i)
# 整合上面的两个函数
def run(i, mutex):
search(i)
# 给买票环节加锁
# 抢锁
mutex.acquire()
buy(i)
# 释放锁
mutex.release()
if __name__ == '__main__':
# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
mutex = Lock()
for i in range(10):
p = Process(target=run, args=(i, mutex))
p.start()
结论:
1.多个进程操作同一份数据的时候,会出现数据错乱的问题 针对上述问题,解决方法就是加锁处理
2.互斥锁:将并发变成串行,牺牲效率但是保证了数据的安全性
3.注意:
- 1.锁不要轻易使用,会出现死锁现象(我们写代码一般不会用到,都是内部封装好的)
- 2.锁只在处理数据的部分加 来保证数据安全(只在争抢数据的环节加锁处理)
作业:
1.尝试将TCP服务端制作成并发效果
客户端服务端全部设置成自动发消息自动回消息
eg: 客户端发hello 服务端直接转大写回HELLO
服务端:
import socket
from multiprocessing import Process
server = socket.socket()
server.bind(('127.0.0.1', 8090))
server.listen(5)
sock, addr = server.accept()
# 将服务的代码单独封装成一个函数
def task(sock):
while True:
try:
data = sock.recv(1024)
if len(data) == 0: break
print(data.decode('utf8'))
sock.send(data.upper())
except ConnectionError as e:
print(e)
break
sock.close()
if __name__ == '__main__':
p = Process(target=task, args=(sock,))
p.start()
客户端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8090))
while True:
client.send('hello '.encode('utf8'))
data = client.recv(1024)
print(data.decode('utf8'))
边栏推荐
猜你喜欢

Volatile:JVM 我警告你,我的人你别乱动

How to choose a good SaaS knowledge base tool?

学长告诉我,大厂MySQL都是通过SSH连接的

期货开户流程和手续费如何调整

小家电控制板开发——未来小家电行业的发展方向

方舟:生存进化开服务器端口映射教程

WinForm(三)揭开可视化控件的面纱

One-key login principle of local number

The senior told me that the MySQL of the big factory is connected through SSH
![[SUCTF 2019]CheckIn](/img/4a/cae4dbe47c3b9d0fb37fe337bb5c3f.png)
[SUCTF 2019]CheckIn
随机推荐
Jenkins deploys services to remote servers using pipelines
在 .NET MAUI 中如何更好地自定义控件
《ABP Framework 极速开发》 - 教程首发
MySQL的索引你了解吗
手写flexible.js的原理实现,我终于明白移动端多端适配
[Code Audit] - PHP project class RCE and files include download and delete
为了高性能、超大规模的模型训练,这个组合“出道”了
[极客大挑战 2019]HardSQL
The principle implementation of handwritten flexible.js, I finally understand the multi-terminal adaptation of the mobile terminal
Problems Existing in Hardware Development of Electronic Products
approach和method的区别
openEuler Xiong Wei: How do you view the SIG organization model in the open source community?
What is control board custom development?
What is hardware integrated development?What are the cores of hardware integrated development?
动手学深度学习_风格迁移
FAST-LIO2代码解析(三)
记一次 .NET 某工控自动化控制系统 卡死分析
开篇-开启全新的.NET现代应用开发体验
低代码平台和专业开发人员——完美搭档?
Redis 定长队列的探索和实践