当前位置:网站首页>srs流媒体服务器推流的流程
srs流媒体服务器推流的流程
2022-08-06 05:26:00 【音视频开发老舅】
简述
在accept一个链接后,创建对应的SrsRtmpConn。
SrsRtmpConn自身是一个协程的子类,运行后进行rtmp协议中的handshake、connect、create stream。并且判断是publish之后,创建SrsRecvThread来接受推流。
但是有一点比较奇怪的写法就是,在handshake之后,connect之前就根据发的包内容来判断是一个publish还是play。这点和我认知的rtmp协议不同,在我的认知里面,rtmp协议在create stream会发play或者push的message表示自己是一个什么样的角色,一般都是通过这个方法来判断的。
SrsRecvThread是一个协程,运行后会开始接受message数据。根据对应message执行不同的函数,并且把message放入到SrsConsumer队列中,
在放入SrsConsumer的队列中后会通过条件srs_cond_signal(mw_wait),通知等待的协程可以开始消费message了
accept请求
这个是接受tcp链接的代码,rtmp的tcp监听也是在这边的。accept一个fd后,调用on_tcp_client处理这个链接。
这边有一点需要注意的是,srs的io操作大部分是state thread库函数做的。调用accept的socket是一个非阻塞式的,但是st_accept用起来像阻塞式的,实际上是一个非阻塞式的。
srs_error_t SrsTcpListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
/*接受一个链接*/
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(fd == NULL){
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
/*调用处理函数*/
if ((err = handler->on_tcp_client(fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
}
return err;
}
本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs)↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓
创建SrsRtmpConn
on_tcp_client最后调用会调用fd2con,将fd生成对应的SrsRtmpConn对像。
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
/*.....*/
if (type == SrsListenerRtmpStream) {
*pconn = new SrsRtmpConn(this, stfd, ip, port);
} else if (type == SrsListenerHttpApi) {
*pconn = new SrsHttpApi(this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream) {
*pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port);
} else {
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
srs_close_stfd(stfd);
return err;
}
return err;
}
并且SrsRtmpConn是一个st可以执行的协程类,最后会调用do_cycle进行handshake、connect、create stream。
srs_error_t SrsRtmpConn::do_cycle()
{
/*......*/
//握手
if ((err = rtmp->handshake()) != srs_success) {
return srs_error_wrap(err, "rtmp handshake");
}
//进行下一步操作
if ((err = service_cycle()) != srs_success) {
err = srs_error_wrap(err, "service cycle");
}
/*........*/
return err;
}
在service_cycle中就是调用stream_service_cycle
srs_error_t SrsRtmpConn::service_cycle()
{
/*.....*/
//这个while看起来像是做推流错误恢复处理,具体我也没有看明白
while (true) {
/*.....*/
//调用这个进入接受message
err = stream_service_cycle();
// for other system control message, fatal error.
return srs_error_wrap(err, "rtmp: reject");
}
return err;
}
这个函数做的东西比较多,一开始回去确认身份。然后根据不同的推流端进行一些特例化的操作,大致就是connect,createstream,然后开始推流。
srs_error_t SrsRtmpConn::stream_service_cycle()
{
//验证身份是play还是push
if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
return srs_error_wrap(err, "rtmp: identify client");
}
/*......*/
//根据不同的身份做不同的操作,一般start play中就是做create stream和connect
switch (info->type) {
case SrsRtmpConnPlay: {
// response connection start play
if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start play");
}
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}
err = playing(source);
http_hooks_on_stop();
return err;
}
case SrsRtmpConnFMLEPublish: {
if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start FMLE publish");
}
return publishing(source);
}
case SrsRtmpConnHaivisionPublish: {
if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start HAIVISION publish");
}
return publishing(source);
}
case SrsRtmpConnFlashPublish: {
if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start FLASH publish");
}
return publishing(source);
}
default: {
return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
}
}
return err;
}
创建SrsRecvThread协程开始接受数据
创建一个协程开使推流,调用do_publishing中启动协程开始推流,并做了一些链接的错误处理。
rs_error_t SrsRtmpConn::publishing(SrsSource* source)
{
/*......*/
// TODO: FIXME: Should refine the state of publishing.
if ((err = acquire_publish(source)) == srs_success) {
//创建一个协程开始开始推流
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd);
rtrd.stop();
}
/*......*/
return err;
}
在do_publishing中调用SrsPublishRecvThread的start,最后会开启协程。
srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
// start isolate recv thread.
//调用SrsPublishRecvThread的start开启协程,专门做接受数据
if ((err = rtrd->start()) != srs_success) {
return srs_error_wrap(err, "rtmp: receive thread");
}
/*......*/
return err;
}
start最后会调用SrsRecvThread协程,中间就省略很多无用的代码。
srs_error_t SrsRecvThread::do_cycle()
{
srs_error_t err = srs_success;
while (true) {
/*......*/
// Process the received message.
//recv_message接受数据
if ((err = rtmp->recv_message(&msg)) == srs_success) {
//consume消费一个数据
err = pumper->consume(msg);
}
/*.......*/
}
return err;
}
放入消费队列,通知消费者
最后会放入到source的consumer中
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
//把msg放入到consumer中
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
}
最后会通知所有的消费者,有message写入,可以开始消费
srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
/*......*/
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
if (atc && duration < 0) {
//通知消费者
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration) {
//通知消费者
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
}
#endif
return err;
}
本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs)↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓
边栏推荐
- Convolutional Neural Network Notes 2
- Qt 实现窗口大小变化时动画过渡
- AMPCOLOY940 high thermal conductivity beryllium-free copper alloy imported from the United States
- arcpy将本地shape发布成arcgis server 的mapserver
- 字符流Reader和Writer
- PCL1.12+VTK9.1+QT6编译部署
- C#脚本CSharpScript
- 【Pytorch】tensor、ndarray、list互相转换方法
- openalyers 好玩的效果之蒙版图层
- 利用R通过顺企网根据公司名称爬取企业地址
猜你喜欢

当一个人知道自己为什么而活,就可以忍受任何一种生活

MacOS下 Qt6编译及链接MySQL

【图像处理】RGB、YUV (YCbCr) 图像表示详解

Talk预告 | 德国马普所修宇亮:如何多快好省地重建三维数字人

好的架构是进化来的,不是设计来的

Write Plist file using Qt XmlStreamWriter

十四、一起学习Lua 元表(Metatable)

CVPR 2022 | SharpContour:一种基于轮廓变形 实现高效准确实例分割的边缘细化方法

A Pseudo-relevance feedback framework combining relevance matching...泛读笔记

卷积神经网络手写数字分类
随机推荐
[R language] R language managed by conda in Jupyter Notebook
Zero foundation to build their own famine Don 't Starve server, get rid of the online caton and happy friend online
Qt 5.14.2 连接Mysql 数据库
Convolutional Neural Network Notes 2
7月17日上午,阿里AE技术团队直播专场,分享CVPR挑战赛冠军、亚军方案!
专业技术人员继续教育考试题
十、 一起学习Lua 数组
CW008A Copper alloy
C#脚本CSharpScript
QCompleter的进阶使用
QGraphicsItem删除item崩溃
Ununtu20.04安装OSI及相关组件
ECCV 2022 Oral | 满分文章!视频实例分割新SOTA:SeqFormer & IDOL
AMPCOLOY940 美国进口高导热无铍铜合金
图像处理(8) : 模板匹配
How many slots does a Flink task need?
分层架构&SOA架构
斐波那契序列,数组排序,在数组中查找对应元素Objects类的equals方法的作用,将26个英文字母用逗号分隔,组成字符串打印出来,矩阵的转置,找出1~1000之间的全部同构数,用随机数生成语句
Qt 5.14.2 connect to Mysql database
基于肌电信号(sEMG) 的深度学习手势分类-2