分享
  1. 首页
  2. 文章

SRS流媒体服务器源码分析(一)

云上听风 · · 5191 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

线程模型


srs使用了state-threads协程库,是单线程多协程模型。
这个协程的概念类似于lua的协程,都是单线程中可以创建多个协程。而golang中的goroutine协程是多线程并发的,goroutine有可能运行在同一个线程也可能在不同线程,这样就有了线程安全问题,所以需要chan通信或者mutex加锁共享资源。
而srs因为是单线程多协程所以不用考虑线程安全,数据不用加锁。

主流程分析


撇掉程序启动的一些初始化和设置,直接进入:


int SrsServer::listen()
{
 int ret = ERROR_SUCCESS;
 
 if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
 return ret;
 }
 
 if ((ret = listen_http_api()) != ERROR_SUCCESS) {
 return ret;
 }
 
 if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
 return ret;
 }
 
 if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
 return ret;
 }
 
 return ret;
}

先看看listen_rtmp():

int SrsServer::listen_rtmp()
{
 int ret = ERROR_SUCCESS;
 
 // stream service port.
 std::vector<std::string> ip_ports = _srs_config->get_listens();
 srs_assert((int)ip_ports.size() > 0);
 
 close_listeners(SrsListenerRtmpStream);
 
 for (int i = 0; i < (int)ip_ports.size(); i++) {
 SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
 listeners.push_back(listener);
 
 std::string ip;
 int port;
 srs_parse_endpoint(ip_ports[i], ip, port);
 
 if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
 srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
 return ret;
 }
 }
 
 return ret;
}

创建了SrsStreamListener,在SrsStreamListener::listen中又创建了SrsTcpListener进行listen

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
{
 handler = h;
 ip = i;
 port = p;
 _fd = -1;
 _stfd = NULL;
 pthread = new SrsReusableThread("tcp", this);
}

SrsTcpListener中创建了pthread: SrsReusableThread
int SrsTcpListener::listen()中调用了pthread->start(),协程会回调到int SrsTcpListener::cycle()

int SrsTcpListener::cycle()
{
 int ret = ERROR_SUCCESS;
 
 st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
 
 if(client_stfd == NULL){
 // ignore error.
 if (errno != EINTR) {
 srs_error("ignore accept thread stoppped for accept client error");
 }
 return ret;
 }
 srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
 
 if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) {
 srs_warn("accept client error. ret=%d", ret);
 return ret;
 }
 
 return ret;
}

accept连接后,回调到on_tcp_client
也就是SrsStreamListener::on_tcp_client:

int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{
 int ret = ERROR_SUCCESS;
 
 if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
 srs_warn("accept client error. ret=%d", ret);
 return ret;
 }
 return ret;
}
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
{
...
 SrsConnection* conn = NULL;
 if (type == SrsListenerRtmpStream) {
 conn = new SrsRtmpConn(this, client_stfd);
 } else if (type == SrsListenerHttpApi) {
#ifdef SRS_AUTO_HTTP_API
 conn = new SrsHttpApi(this, client_stfd, http_api_mux);
#else
 srs_warn("close http client for server not support http-api");
 srs_close_stfd(client_stfd);
 return ret;
#endif
 } else if (type == SrsListenerHttpStream) {
#ifdef SRS_AUTO_HTTP_SERVER
 conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server);
#else
 srs_warn("close http client for server not support http-server");
 srs_close_stfd(client_stfd);
 return ret;
#endif
 } else {
 // TODO: FIXME: handler others
 }
 srs_assert(conn);
 
 // directly enqueue, the cycle thread will remove the client.
 conns.push_back(conn);
 srs_verbose("add conn to vector.");
 
 // cycle will start process thread and when finished remove the client.
 // @remark never use the conn, for it maybe destroyed.
 if ((ret = conn->start()) != ERROR_SUCCESS) {
 return ret;
 }
 srs_verbose("conn started success.");
 srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
 
 return ret;
}

在上面根据type创建不同的SrsConnection,Rtmp创建了SrsRtmpConn,并且加入到std::vector<SrsConnection*> conns;中,然后执行conn->start()

SrsConnection基类创建了一个协程pthread: SrsOneCycleThread,上面的conn->start()。实际上是pthread->start():

SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
{
 id = 0;
 manager = cm;
 stfd = c;
 disposed = false;
 expired = false;
 
 // the client thread should reap itself, 
 // so we never use joinable.
 // TODO: FIXME: maybe other thread need to stop it.
 // @see: https://github.com/ossrs/srs/issues/78
 pthread = new SrsOneCycleThread("conn", this);
}
int SrsConnection::start()
{
 return pthread->start();
}

int SrsConnection::cycle()调用了do_cycle(),派生类实现了这个方法。

int SrsRtmpConn::do_cycle()
{
 int ret = ERROR_SUCCESS;
 
 srs_trace("RTMP client ip=%s", ip.c_str());
 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
 rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
 
 //正式进入rtmp握手。
 if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
 srs_error("rtmp handshake failed. ret=%d", ret);
 return ret;
 }
 srs_verbose("rtmp handshake success");
 
 if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
 srs_error("rtmp connect vhost/app failed. ret=%d", ret);
 return ret;
 }
 srs_verbose("rtmp connect app success");
 
 // set client ip to request.
 req->ip = ip;
 
 srs_trace("connect app, "
 "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", 
 req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), 
 req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
 req->app.c_str(), (req->args? "(obj)":"null"));
 
 // show client identity
 if(req->args) {
 std::string srs_version;
 std::string srs_server_ip;
 int srs_pid = 0;
 int srs_id = 0;
 
 SrsAmf0Any* prop = NULL;
 if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
 srs_version = prop->to_str();
 }
 if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
 srs_server_ip = prop->to_str();
 }
 if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
 srs_pid = (int)prop->to_number();
 }
 if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
 srs_id = (int)prop->to_number();
 }
 
 srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
 srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
 if (srs_pid > 0) {
 srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
 srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
 }
 }
 
 ret = service_cycle();
 
 http_hooks_on_close();
 return ret;
}

在这儿正式进入rtmp协议处理阶段。先进行握手:rtmp->handshake()等操作,然后进入service_cycle();

int SrsRtmpConn::service_cycle()
{ 
 ...
 while (!disposed) {
 ret = stream_service_cycle();
 
 // stream service must terminated with error, never success.
 // when terminated with success, it's user required to stop.
 if (ret == ERROR_SUCCESS) {
 continue;
 }
 
 // when not system control error, fatal error, return.
 if (!srs_is_system_control_error(ret)) {
 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
 srs_error("stream service cycle failed. ret=%d", ret);
 }
 return ret;
 }
 
 // for republish, continue service
 if (ret == ERROR_CONTROL_REPUBLISH) {
 // set timeout to a larger value, wait for encoder to republish.
 rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
 rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
 
 srs_trace("control message(unpublish) accept, retry stream service.");
 continue;
 }
 
 // for "some" system control error, 
 // logical accept and retry stream service.
 if (ret == ERROR_CONTROL_RTMP_CLOSE) {
 // TODO: FIXME: use ping message to anti-death of socket.
 // @see: https://github.com/ossrs/srs/issues/39
 // set timeout to a larger value, for user paused.
 rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
 rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
 
 srs_trace("control message(close) accept, retry stream service.");
 continue;
 }
 
 // for other system control message, fatal error.
 srs_error("control message(%d) reject as error. ret=%d", ret, ret);
 return ret;
 }
 
 return ret;
}

stream_service_cycle:

int SrsRtmpConn::stream_service_cycle()
{
 int ret = ERROR_SUCCESS;
 
 SrsRtmpConnType type;
 if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
 if (!srs_is_client_gracefully_close(ret)) {
 srs_error("identify client failed. ret=%d", ret);
 }
 return ret;
 }
 
 srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
 req->strip();
 srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f, param=%s",
 srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration, req->param.c_str());
 
 // discovery vhost, resolve the vhost from config
 SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
 if (parsed_vhost) {
 req->vhost = parsed_vhost->arg0();
 }
 
 if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
 ret = ERROR_RTMP_REQ_TCURL;
 srs_error("discovery tcUrl failed. "
 "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
 req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
 return ret;
 }
 
 if ((ret = check_vhost()) != ERROR_SUCCESS) {
 srs_error("check vhost failed. ret=%d", ret);
 return ret;
 }
 
 srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, stream=%s, param=%s, args=%s",
 req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
 req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
 req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));
 
 // do token traverse before serve it.
 // @see https://github.com/ossrs/srs/pull/239
 if (true) {
 bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
 bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
 if (vhost_is_edge && edge_traverse) {
 if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
 srs_warn("token auth failed, ret=%d", ret);
 return ret;
 }
 }
 }
 
 // security check
 if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
 srs_error("security check failed. ret=%d", ret);
 return ret;
 }
 srs_info("security check ok");
 
 // Never allow the empty stream name, for HLS may write to a file with empty name.
 // @see https://github.com/ossrs/srs/issues/834
 if (req->stream.empty()) {
 ret = ERROR_RTMP_STREAM_NAME_EMPTY;
 srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
 return ret;
 }
 // client is identified, set the timeout to service timeout.
 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
 rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
 
 // find a source to serve.
 SrsSource* source = NULL;
 if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
 return ret;
 }
 srs_assert(source != NULL);
 
 // update the statistic when source disconveried.
 SrsStatistic* stat = SrsStatistic::instance();
 if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {
 srs_error("stat client failed. ret=%d", ret);
 return ret;
 }
 bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
 bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 
 source->source_id(), source->source_id());
 source->set_cache(enabled_cache);
 
 client_type = type;
 //根据客户端类型进入不同分支
 switch (type) {
 case SrsRtmpConnPlay: {
 srs_verbose("start to play stream %s.", req->stream.c_str());
 
 // response connection start play
 if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
 srs_error("start to play stream failed. ret=%d", ret);
 return ret;
 }
 if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
 srs_error("http hook on_play failed. ret=%d", ret);
 return ret;
 }
 
 srs_info("start to play stream %s success", req->stream.c_str());
 ret = playing(source);
 http_hooks_on_stop();
 
 return ret;
 }
 case SrsRtmpConnFMLEPublish: {
 srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
 
 if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
 srs_error("start to publish stream failed. ret=%d", ret);
 return ret;
 }
 
 return publishing(source);
 }
 case SrsRtmpConnHaivisionPublish: {
 srs_verbose("Haivision start to publish stream %s.", req->stream.c_str());
 
 if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) {
 srs_error("start to publish stream failed. ret=%d", ret);
 return ret;
 }
 
 return publishing(source);
 }
 case SrsRtmpConnFlashPublish: {
 srs_verbose("flash start to publish stream %s.", req->stream.c_str());
 
 if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
 srs_error("flash start to publish stream failed. ret=%d", ret);
 return ret;
 }
 
 return publishing(source);
 }
 default: {
 ret = ERROR_SYSTEM_CLIENT_INVALID;
 srs_info("invalid client type=%d. ret=%d", type, ret);
 return ret;
 }
 }
 return ret;
}

先进行tmp->identify_client客户端身份识别。
然后根据根据客户端类型(type)进入不同分支。
SrsRtmpConnPlay是客户端播流。
SrsRtmpConnFMLEPublish 是Rtmp推流到服务器。
SrsRtmpConnHaivisionPublish 应该是海康威视推流到服务器?
SrsRtmpConnFlashPublish 是Flash推流到服务器。
这儿只看SrsRtmpConnFMLEPublish:
进入int SrsRtmpConn::publishing(SrsSource* source),然后int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd),

int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
{
...
 // start isolate recv thread.
 if ((ret = trd->start()) != ERROR_SUCCESS) {
 srs_error("start isolate recv thread failed. ret=%d", ret);
 return ret;
 }
 ...
}

trd协程运行,协程循环:执行rtmp->recv_message(&msg)后调用int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
再回调到int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)
之后处理收到的数据:


int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
{
 int ret = ERROR_SUCCESS;
 
 // for edge, directly proxy message to origin.
 if (vhost_is_edge) {
 if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
 srs_error("edge publish proxy msg failed. ret=%d", ret);
 return ret;
 }
 return ret;
 }
 
 // process audio packet
 if (msg->header.is_audio()) {
 if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
 srs_error("source process audio message failed. ret=%d", ret);
 return ret;
 }
 return ret;
 }
 // process video packet
 if (msg->header.is_video()) {
 if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
 srs_error("source process video message failed. ret=%d", ret);
 return ret;
 }
 return ret;
 }
 
 // process aggregate packet
 if (msg->header.is_aggregate()) {
 if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
 srs_error("source process aggregate message failed. ret=%d", ret);
 return ret;
 }
 return ret;
 }
 
 // process onMetaData
 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
 SrsPacket* pkt = NULL;
 if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
 srs_error("decode onMetaData message failed. ret=%d", ret);
 return ret;
 }
 SrsAutoFree(SrsPacket, pkt);
 
 if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
 SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
 if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
 srs_error("source process onMetaData message failed. ret=%d", ret);
 return ret;
 }
 srs_info("process onMetaData message success.");
 return ret;
 }
 
 srs_info("ignore AMF0/AMF3 data message.");
 return ret;
 }
 
 return ret;
}

如果本服务器是edge边缘服务器(vhost_is_edge)直接推流回源到源服务器。
audio和video分开处理。
这儿只看一下video的处理:


int SrsSource::on_video(SrsCommonMessage* shared_video)
{
 int ret = ERROR_SUCCESS;
 
 // monotically increase detect.
 if (!mix_correct && is_monotonically_increase) {
 if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {
 is_monotonically_increase = false;
 srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");
 }
 }
 last_packet_time = shared_video->header.timestamp;
 
 // drop any unknown header video.
 // @see https://github.com/ossrs/srs/issues/421
 if (!SrsFlvCodec::video_is_acceptable(shared_video->payload, shared_video->size)) {
 char b0 = 0x00;
 if (shared_video->size > 0) {
 b0 = shared_video->payload[0];
 }
 
 srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
 return ret;
 }
 
 // convert shared_video to msg, user should not use shared_video again.
 // the payload is transfer to msg, and set to NULL in shared_video.
 SrsSharedPtrMessage msg;
 if ((ret = msg.create(shared_video)) != ERROR_SUCCESS) {
 srs_error("initialize the video failed. ret=%d", ret);
 return ret;
 }
 srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size);
 
 // directly process the audio message.
 if (!mix_correct) {
 return on_video_imp(&msg);
 }
 
 // insert msg to the queue.
 mix_queue->push(msg.copy());
 
 // fetch someone from mix queue.
 SrsSharedPtrMessage* m = mix_queue->pop();
 if (!m) {
 return ret;
 }
 
 // consume the monotonically increase message.
 if (m->is_audio()) {
 ret = on_audio_imp(m);
 } else {
 ret = on_video_imp(m);
 }
 srs_freep(m);
 
 return ret;
}

shared_video转换为SrsSharedPtrMessage
调用on_video_imp

int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
{
 int ret = ERROR_SUCCESS;
 
 srs_info("Video dts=%"PRId64", size=%d", msg->timestamp, msg->size);
 
 bool is_sequence_header = SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size);
 
 // whether consumer should drop for the duplicated sequence header.
 bool drop_for_reduce = false;
 if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(_req->vhost)) {
 if (cache_sh_video->size == msg->size) {
 drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);
 srs_warn("drop for reduce sh video, size=%d", msg->size);
 }
 }
 
 // cache the sequence header if h264
 // donot cache the sequence header to gop_cache, return here.
 if (is_sequence_header) {
 srs_freep(cache_sh_video);
 cache_sh_video = msg->copy();
 
 // parse detail audio codec
 SrsAvcAacCodec codec;
 
 // user can disable the sps parse to workaround when parse sps failed.
 // @see https://github.com/ossrs/srs/issues/474
 codec.avc_parse_sps = _srs_config->get_parse_sps(_req->vhost);
 
 SrsCodecSample sample;
 if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
 srs_error("source codec demux video failed. ret=%d", ret);
 return ret;
 }
 
 // when got video stream info.
 SrsStatistic* stat = SrsStatistic::instance();
 if ((ret = stat->on_video_info(_req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level)) != ERROR_SUCCESS) {
 return ret;
 }
 
 srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",
 msg->size, codec.video_codec_id,
 srs_codec_avc_profile2str(codec.avc_profile).c_str(),
 srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,
 codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
 }
 
#ifdef SRS_AUTO_HLS
 if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
 // apply the error strategy for hls.
 // @see https://github.com/ossrs/srs/issues/264
 std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);
 if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
 srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
 
 // unpublish, ignore ret.
 hls->on_unpublish();
 
 // ignore.
 ret = ERROR_SUCCESS;
 } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
 if (srs_hls_can_continue(ret, cache_sh_video, msg)) {
 ret = ERROR_SUCCESS;
 } else {
 srs_warn("hls continue video failed. ret=%d", ret);
 return ret;
 }
 } else {
 srs_warn("hls disconnect publisher for video error. ret=%d", ret);
 return ret;
 }
 }
#endif
 
#ifdef SRS_AUTO_DVR
 if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {
 srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
 
 // unpublish, ignore ret.
 dvr->on_unpublish();
 
 // ignore.
 ret = ERROR_SUCCESS;
 }
#endif
#ifdef SRS_AUTO_HDS
 if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) {
 srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret);
 
 // unpublish, ignore ret.
 hds->on_unpublish();
 // ignore.
 ret = ERROR_SUCCESS;
 }
#endif
 
 // copy to all consumer
 if (!drop_for_reduce) {
 for (int i = 0; i < (int)consumers.size(); i++) {
 SrsConsumer* consumer = consumers.at(i);
 if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {
 srs_error("dispatch the video failed. ret=%d", ret);
 return ret;
 }
 }
 srs_info("dispatch video success.");
 }
 // copy to all forwarders.
 if (!forwarders.empty()) {
 std::vector<SrsForwarder*>::iterator it;
 for (it = forwarders.begin(); it != forwarders.end(); ++it) {
 SrsForwarder* forwarder = *it;
 if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) {
 srs_error("forwarder process video message failed. ret=%d", ret);
 return ret;
 }
 }
 }
 
 // when sequence header, donot push to gop cache and adjust the timestamp.
 if (is_sequence_header) {
 return ret;
 }
 // cache the last gop packets
 if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
 srs_error("gop cache msg failed. ret=%d", ret);
 return ret;
 }
 srs_verbose("cache gop success.");
 
 // if atc, update the sequence header to abs time.
 if (atc) {
 if (cache_sh_video) {
 cache_sh_video->timestamp = msg->timestamp;
 }
 if (cache_metadata) {
 cache_metadata->timestamp = msg->timestamp;
 }
 }
 
 return ret;
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:云上听风

查看原文:SRS流媒体服务器源码分析(一)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
5191 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏