当前位置:网站首页>Using Flask and Celery to push real-time/timed messages asynchronously in Win10 environment (Socket.io)/The latest strategy in 2020
Using Flask and Celery to push real-time/timed messages asynchronously in Win10 environment (Socket.io)/The latest strategy in 2020
2022-08-08 13:03:00 【User 9127725】
首先得明确一点,和Django一样,在2020年Flask 1.1.1Later versions do not require so-called third-party library support,即Flask-Celery或者Flask-Celery-Help这些库,直接使用CeleryThe native library can be used.
一般情况下,CeleryUsed to handle time-consuming tasks,Such as stereotyped emails or file uploads,本次使用CeleryReal-time or scheduled delivery basedWebsocket的消息队列,Because if the front end has abandoned the old polling strategy,使用Websocket,The back end needs to be matched accordinglyCeleryMake pairs persistentWebsocketLinks to proactively push messages,This scenario is still very common in production environments,But there are few articles on the Internet,而CeleryThe official statement on this is:
If using multiple processes, a message queue service is used by the processes to coordinate operations such as broadcasting. The supported queues are Redis, RabbitMQ, and any other message queues supported by the Kombu package
Basically it means:因为 Celery 和 前端Web 是分开的 Process So there needs to be a common backend to trigger the push of messages,This is an availabilityCelery触发WebsocketThe focus of message push.
第一步,安装必须的库
pip3 install flask-cors
pip3 install flask-socketio
pip3 install celery
flask-corsLibraries are libraries used to circumvent the browser's same-origin policy,flask-socketioUsed to establish full duplexwebsocket链接,celeryTakes responsibility for an asynchronous task queue.
实例化app对象
from flask_cors import CORS
from flask_socketio import SocketIO,send,emit,join_room, leave_room
import urllib.parse
from celery import Celery
from datetime import timedelta
app = Flask(__name__)
app.config['BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
app.config['CELERY_ACCEPT_CONTENT'] = ['json', 'pickle']
app.config['REDIS_URL'] = 'redis://localhost:6379'
The message queue container is still used hereredis
Then use the initializedapp队列,初始化socket对象,This will allow based onwsgi的Flask支持websocket
socketio = SocketIO(app,cors_allowed_origins='*',async_mode="threading",message_queue=app.config['CELERY_RESULT_BACKEND'])
这里注意下,Plus cross-domain parameters,And specify the asynchronous mode as thread.
第三步,就是初始化celery对象
celery = Celery(app.name)
celery.conf.update(app.config)
After that, you can declare some necessary methods and views,and run the instance
@celery.task()
def get_sendback():
socketio.emit('sendback','message',broadcast=True)
@app.route('/task')
def start_background_task():
get_sendback.delay()
return '开始'
@socketio.on('join')
def on_join(data):
username = 'user1'
room = 'room1'
join_room(room)
send(username + ' has entered the room.', room=room)
@socketio.on('message')
def handle_message(message):
message = urllib.parse.unquote(message)
print(message)
send(message,broadcast=True)
@socketio.on('connect', namespace='/chat')
def test_connect():
emit('my response', {'data': 'Connected'})
@socketio.on('disconnect', namespace='/chat')
def test_disconnect():
print('Client disconnected')
@app.route("/sendback",methods=['GET'])
def sendback():
socketio.emit('sendback','message')
return 'ok'
if __name__ == '__main__':
app.config['JSON_AS_ASCII'] = False
socketio.run(app,debug=True,host="0.0.0.0",port=5000)
You can see the asynchronous call task [email protected]()来声明,而基于websocketview [email protected]来声明,在Flask项目的目录下,分别开启两个命令行,启动Web服务和Celery服务
python manage.py
启动celery服务
celery worker -A manage.celery --loglevel=info -P eventlet
这里celeryThe service is still based on the coroutine libraryeventlet
The front end uses the more popular ones on the marketVue.js,需要安装socket.io的支持
npm install [email protected]
Write a component for testingclient.vue
<template>
<div>
<div v-for="item in log_list"
>
{{item}}
</div>
<input v-model="msg" />
<button @click="send">发送消息</button>
</div>
</template>
<script>
export default {
data () {
return {
msg: "",
log_list:[]
}
},
//Register component tags
components:{
},
sockets:{
connect: function(){
console.log('socket 连接成功')
},
message: function(val){
console.log('返回:'+val);
alert(val);
this.log_list.push(val);
},
sendback: function(val){
console.log('返回:'+val);
alert(val);
}
},
mounted:function(){
},
methods:{
send(){
this.$socket.emit('join',encodeURI("加入房间"))
this.$socket.emit('message',encodeURI("用户:"+this.msg));
},
}
}
</script>
<style>
</style>
By listening on the same key as the backend“sendback”to display the messages pushed in the background.
Test out asynchronous push
访问url触发异步任务:http://localhost:5000/sendback
The front end immediately receives the messages asynchronously pushed by the back end.
Now let's test the scheduled task,基于Celery的CrontabThe advantage is that it supports second-level timing,在上面celery初始化之后,You can define scheduled tasks through configuration
celery = Celery(app.name)
celery.conf.update(app.config)
celery.conf.CELERYBEAT_SCHEDULE = {
"test":{
"task":"get_cron",
"schedule":timedelta(seconds=10)
}
}
Here we add a test task,定时每10Push a message in seconds
@celery.task(name="get_cron")
def get_cron():
get_sendback.delay()
Just asynchronously call the push method you just wrote,This allows you to share a backend with the frontendwebsocket链接,Otherwise, the scheduled task cannot trigger the message push.
Start a third service in the same directory,注意webServices and asynchronous services do not stop
celery -A manage.celery beat --loglevel=debug
You can see the timing pushwebsocketThe message also came true.
This feature is essentially an application-level decoupling,用Celery特有的taskway to basewebsocket推送emit消息,二者相辅相成.
Finally serve thisdemo的版本库:https://gitee.com/QiHanXiBei/myflask
边栏推荐
- ctfshow 七夕杯(复现)
- [Horizon Rising Sun X3 Trial Experience] WIFI connection, SSH login, TogetherROS installation (section 2)
- 关于微信小程序体验版获取不到openId的问题
- Kunpeng Developer Creation Day 2022: Kunpeng Full-Stack Innovation and Developers Build Digital Hunan
- MySQL database storage series (5) the InnoDB storage format
- OFD是什么
- Geoffrey Hinton:深度学习的下一个大事件
- 海外邮件发送指南(一)
- 如何在go重打印函数调用者信息Caller
- ets declarative ui development, how to get the current system time
猜你喜欢
【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
手绘地图制作的关键点之“图层覆盖”
xxd命令(反编译、二进制文件转十六进制文件)
报错 | Cannot find module ‘@better-scroll/core/dist/types/BScroll‘
Jenkins - Introduction to Continuous Integration (1)
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
指针和数组笔试题解析
[界面开发]DevExpress WinForms流程图控件——XtraDiagrams组件入门指南
史上最全JVM性能调优:线程+子系统+类加载+内存分配+垃圾回收
2022-08-04
随机推荐
nvm的使用 nodejs版本管理,解决用户名是汉字的问题
[C language] Dynamic memory management
node中package解析、npm 命令行npm详解,node中的common模块化,npm、nrm两种方式查看源和切换镜像
在半小时内从无到有开发并调试一款Chrome扩展(Chrome插件/谷歌浏览器插件)
(6)FlinkSQL将kafka数据写入到mysql方式一
报错 | RegExp2 is not defined
SQL实例 - 胜平负
The use of qsort function and its analog implementation
2022-08-05
硬盘数据恢复工具
Docker-持久化数据库(数据卷)
(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
Promise 解决阻塞式同步,将异步变为同步
别再到处乱放配置文件了!试试我司使用 7 年的这套解决方案,稳的一秕
odps sql被删除了,能找回来吗
MeterSphere - open source test platform
(8)FlinkSQL自定义UDF
JPA之使用复合主键
Program Environment and Preprocessing
Doris学习笔记之优化