SRS4.0源码分析-RTMP入口 - 弦外之音

/ 0评 / 5

本文采用的 SRS 版本是 4.0-b8 , 下载地址:github


上篇文章 《SRS4.0源码分析-main》 讲解了 SRS main 函数的基本流程,但是可能有些朋友还是比较懵逼。说实话,其实对于SRS的具体逻辑流程,我现在也是比较懵逼。在这里,分享一个研究开源项目源码的经验,怎么快速跳出这种懵逼的状态。

首先,研究一个项目的源码,需要有一个目标。例如之前《RTMP协议分析》的系列文章,已经讲解了一个 RTMP 客户端的实现,我研究 SRS 的源码就是为了看一下 RTMP 服务器端是如何实现的。这就是一个目标。

如果茫无目的地看源码,把日志处理,配置加载,等等源码都看一遍,虽然能看完,但是印象会不太深刻。因为没有目标跟需求驱动。

确认了目标是 RTMP 服务器的实现原理之后,后续要做的事情就比较简单了,首先找到 SRS 的 RTMP 业务的入口,因为 SRS 是多个功能混合的,他支持 RTMP,SRT,webrtc。在你了解 RTMP 业务入口的同时,其他功能的入口也会找到。

同时,在研究 RTMP 服务器端逻辑的时候,各种 日志处理,配置加载,也会一并了解。



下面就开始寻找 RTMP 服务器的入口。先分析 run_directly_or_daemon() 函数,这个函数的代码比较简单,就不画流程图,直接截图代码。请看下图:

上图代码有几个重点:

1,用了比较多的srs_trace() 函数来记录日志。srs_trace() 函数会往 srs.log 写入一条日志,具体请看《SRS4.0源码分析-日志处理》

2,SRS的守护进程没用 setsid()umask(022) ,也就是当前进程没有脱离 从父进程继承 的 SessionID、进程组ID和打开的终端,SRS 为什么不脱离我也不太清楚。这个问题,我后续问下杨成立大神。

3,调用 run_hybrid_server(),重点就是 run_hybrid_server 函数。

run_directly_or_daemon() 函数分析完毕。


下面继续 看 run_hybrid_server() 函数的实现,流程图如下:

上图主要有几个重点:

1,利用 依赖注入 把 SrsSrtRtc 的 Adapter 注入给 _srs_hybrid。里面其实是一个 vector,std::vector<ISrsHybridServer*> servers

2,然后初始化 _srs_hybrid,SRS 是一个混合的服务器,他结合了 RTMP,SRT,webrtc。所以叫 hybird。

3,_srs_circuit_breaker 具体的作用后面补充,可能是类似一个 watchdog 的机制。(TODO)

4,_srs_hybrid->run() 应该就会开启协程,然后一直阻塞在这里。


分析到这里,还没找到 RTMP 的入口。继续分析 _srs_hybrid->run() 函数。请看下图:

_srs_hybrid->run() 的代码比较简单,就是遍历之前注入的 vector,然后执行他们的 run 函数。RTMP 应该是在 SrsServerAdapter 里面处理的,而不是 Srt 或者 RTCAdapter 。所以只需要找 SrsServerAdapterrun 函数就行。为了便于理解,先画个结构图,如下

上图的 SrsServer 有非常多的变量跟函数,我只画出一部分的重点。

接下来讲解一下 class ISrsHybridServer 这个类。代码如下:

上图的重点是 = 0 这种语法是纯虚函数的写法,意思是把这个函数指针赋值为 0。推荐阅读《C++ 虚函数和纯虚函数的区别》

定义一个函数为虚函数,不代表函数为不被实现的函数。

定义他为虚函数是为了允许用基类的指针来调用子类的这个函数。

定义一个函数为纯虚函数,才代表函数没有被实现。

定义纯虚函数是为了实现一个接口,起到一个规范的作用,规范继承这个类的程序员必须实现这个函数。

SrsSrtRtc 都会继承这个 ISrsHybridServer 类,实现 initialize (初始化),run (运行),stop(停止)函数。

SrsServerAdapterrun 函数实现代码如下:

rs_error_t SrsServerAdapter::run(SrsWaitGroup* wg)
{
    srs_error_t err = srs_success;
​
    // Initialize the whole system, set hooks to handle server level events.
    if ((err = srs->initialize(NULL)) != srs_success) {
        return srs_error_wrap(err, "server initialize");
    }
​
    if ((err = srs->initialize_st()) != srs_success) {
        return srs_error_wrap(err, "initialize st");
    }
​
    if ((err = srs->acquire_pid_file()) != srs_success) {
        return srs_error_wrap(err, "acquire pid file");
    }
​
    if ((err = srs->initialize_signal()) != srs_success) {
        return srs_error_wrap(err, "initialize signal");
    }
​
    if ((err = srs->listen()) != srs_success) {
        return srs_error_wrap(err, "listen");
    }
​
    if ((err = srs->register_signal()) != srs_success) {
        return srs_error_wrap(err, "register signal");
    }
​
    if ((err = srs->http_handle()) != srs_success) {
        return srs_error_wrap(err, "http handle");
    }
​
    if ((err = srs->ingest()) != srs_success) {
        return srs_error_wrap(err, "ingest");
    }
​
    if ((err = srs->start(wg)) != srs_success) {
        return srs_error_wrap(err, "start");
    }
​
    return err;
}

从上面代码可以看出,run 里面调了相当多的 class SrsServer 里面的函数。如下:

1,srs->initialize() ,这个函数里面初始了几个 http 服务器,但是还没开始 listen

2,srs->initialize_st(),这个函数 跟 st 库没有关系,主要是 对 supervisor 的场景做处理。

3,srs->acquire_pid_file(),生成 pid 进程文件。

4,srs->initialize_signal(),对信号做处理,应该会把信号转成 IO 事情。

5,srs->listen(),开始监听端口了,listen fd 会保存在对象里面,一个协程监听一个listen fd。

6,srs->register_signal(),还是跟信号有关,请看后续文章《SRS4.0源码分析-信号处理》

7,srs->http_handle() 处理HTTP 请求。

8,srs->ingest(),如果我没猜错,应该是那个 web 后台服务。

9,srs->start(wg),启动,这个 wg 是重点,后面会分析。


上面一共调了 9 个函数,但是实际上只有 两个 重点函数,srs->listen()srs->start(wg)

srs->http_handle() 虽然也是重点,但是本文主要是找到 RTMP 的入口,所以 http 相关的分析,请看后续文章《SRS4.0源码分析-HTTP》。

SRShttp 服务器好像是自己写的, http 报文的解析在 trunk\src\protocol\srs_http_stack.hpp 文件



srs->listen(),这个函数是重中之中,开始监听端口了,代码如下:

先做个简单的介绍,SRS 启动之后,只看到一个进程,而且搜索源代码,也没发现 pthread_create() 的函数在 SRS的代码里面出现,也就是说 SRS 的所有业务都是基于 ST 协程实现的,没有用线程。

上图把 往 Srsserver 类的 private 变量 std::vector<SrsListener*> listeners 插数据,因为 RTMP 可以监听多个IP跟端口。然后调 SrsBufferListener::listen(),然后再调 SrsTcpListener::listen(),这个链路有点长,我画个流程图,如下:

所以重点 在TCP 的listen 函数里面,RTMP 是基于 TCP 的,所以肯定是 会listen 一个 tcp 的 fd,现在就深入看 SrsTcpListener::listen()

如上图,SrsTcpListener 类里面有个 变量 srs_netfd_t lfdllisten 的缩写。这个 srs_netfd_t 实际上就是 st 库里面的 st_netfd_t,只是换了个命名。SRS 代码的数据结构,有很多都是用 st 的数据结构,例如 条件变量,互斥锁,等等。

下面正式开始看 SrsTcpListener::listen() 函数,请看代码:

这个函数的调用链还是有点长,我还是画个流程图吧。

上图最重要的其实是 srs_netfd_open_socket() 这个其实是 st 库的函数。在之前专栏state-thread源码分析有讲过这个函数。请看下图

srs_tcp_listen() 函数执行完之后,就已经拿到了 ST 库的 netfd,就会开始创建协程。SrsSTCoroutine 继承 SrsFastCoroutine,所以这里创建协程使用的是 SrsFastCoroutine::start() 函数,SrsFastCoroutine 类里面有个 srs_thread_t ,这实际跟 ST 库的 _st_thread_t 是一样的。

SrsFastCoroutine 类就是对 ST 库的 协程做封装,请看下图:

从上图可以看到,start() 函数里面调的就是 ST库的创建协程的函数。所以 srs_tcp_listen() 函数执行完之后,就已经拿到了 ST 库的 netfd,就调 SrsFastCoroutine::start() 创建一个协程。注意这里 _pfn_st_thread_create() 传递的 是 pfn, 所以协程的 start 函数 是 SrsFastCoroutine::pfn(),上下文切换的时候,这个协程会从 SrsFastCoroutine::pfn() 函数开始执行。协程 start 函数 的参数是 this,就是对象自己

至此,虽然还有一点东西没讲,但是整体的流程图已经可以画出来了,如下:

从上图可以看出,listen_rtmp()listen_http_api(),等等函数都会创建一个协程SrsServer::listen() 执行完之后,一共创建了 7 个协程。但是这 7个协程还未开始运行。因为还没开始切换上下文。

最后还有一个重点函数SrsServer::start(),代码如下:

SrsServer::start() 函数实际上是把 自己 也变成 一个协程,丢进去 RUNQ 里面了,thisSrsServer 对象。

此时此刻,协程还是没开始运行。

上面流程图中的 SrtServerAdapter::run() 是创建 SRT 相关的协程, RtcServerAdapter::run() 是创建 RTC 相关的协程,这些不是本文重点,不用管。

此时此刻,协程还是没开始运行。那什么时候协程开始运行?在上面截图中,SrsServer::start() 函数最后有两句代码:

// OK, we start SRS server.
wg_ = wg;
wg->add(1);

每个 Server start的时候都会往 wg add 一下。

上面的流程图,我用绿色画出了一个框,wg.wait(),真正开始切换上下文,让之前创建的协程全部跑起来,我猜测就是在这里做的。

SrsWaitGroup::wait() 的实现非常简单,就是等待一个协程条件变量。

void SrsWaitGroup::wait()
{
    if (nn_ > 0) {
        srs_cond_wait(done_);
    }
}

注意,这里的 srs_cond_wait() 会让当前协程阻塞,实际上是切换到其他地方开始执行,这是 ST 的函数,ST 的阻塞函数就会导致上下文切换,进入 _st_vp_schedule() ,开始把之前创建的协程拿出来,一一运行起来。关于 ST库的分析,请看 《ST源码分析》专栏。


代码运行到 wg.wait() 的时候,之前的协程已经开始跑起来,那 RTMP 会在哪个地方跑起来呢?之前说过,协程 start 函数 是 SrsFastCoroutine::pfn(),所以 RTMP 的业务会在这个函数 pfn() 函数跑起来,实际上SRT ,RTC也是在这个 pfn() 函数跑起来的。

下面继续 分析 SrsFastCoroutine::pfn() 函数的实现,

从上图可以看到,pfn() 实际上是调了 子类的 cycle() 来循环处理业务。那 RTMP 业务的子类是啥?是 SrsTcpListener ,所以需要看 SrsTcpListenercycle 实现。请看下图

从上图可以看出, 直接用 ST 库的函数 srs_accept() 阻塞,等待 tcp 客户端来。然后丢给 handleron_tcp_client 处理逻辑。当初初始化 RTMP 的时候,handler 传的是什么?请看下图:

从上图可以看到 传的是 this,肯定又是子类传参法。所以 handler->on_tcp_client() 的实现如下:

srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
    srs_error_t err = server->accept_client(type, stfd);
    if (err != srs_success) {
        srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
        srs_freep(err);
    }
    
    return srs_success;
}

现在又有一个疑问,上面的 server 是什么?请看下图:

从上图可以看到,传的是this,所以 server 就是 SrsServer,所以 RTMP 接受到一个 tcp 客户端 fd 的时候,就会执行 SrsServer::accept_client() 函数

下面开始分析 SrsServer::accept_client() 函数,代码如下:

上图的重点是 fd_to_resource() 函数,代码如下:

此时此刻,已经追踪到了 rtmp 连接的处理入口,就是 new SrsRtmpConn()。从上图能看到,RTMP,HTTP 是同一个地方处理的。所以 SrsServer 实际上就是处理 RTMP,http,等请求的。

pfn()cycle()srs_accept() 再到 rtmp 的入口,这个链条有点长,我画个流程图便于理解。

到这里,已经找到 RTMP 业务的入口了,就是 new SrsRtmpConn(),下一篇文章 《SRS4.0源码分析-建立RTMP链接》 开始分析 RTMP 链接的建立。

扩展知识:

1,::bind::listen,前面带了 两个冒号。应该是代表使用原始的函数。

相关阅读:

1,《SRS3.0源码分析-夏立新》


由于笔者的水平有限, 加之编写的同时还要参与开发工作,文中难免会出现一些错误或者不准确的地方,恳请读者批评指正。如果读者有任何宝贵意见,可以加我微信 Loken1。QQ:2338195090。

发表回复

您的电子邮箱地址不会被公开。