webrtc源码分析-线程任务管理

2021/7/31 1:06:26

本文主要是介绍webrtc源码分析-线程任务管理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1.前言

webrtc线程源于chromium,其中有消息队列,通信等功能,相对于原始的std::thread或者posix pthread而言,好用不少,本文介绍了webrtc 线程的常用功能以及实现;

2.正文

2.1 webrtc中的主要线程

出于管理接口即时性,平衡IO任务或其它任务阻塞性等,通过异步的方式,将不同类型的任务归类到不同的异步线程去处理是常见的处理方式,webrtc中一共有三大线程

  • signaling_thread_: 处理PeerConnection有关的接口任务和observer回调

  • network_thread_:网络io等

  • worker_thread_:其它阻塞耗时的任务

2.2 使用Invoke在异步线程执行任务

当需要将一个任务放到异步线程的时候,只需要使用thread->Invoke<>()函数即可,取JsepTransportController::SetLocalDescription() 作为例子, 在函数最开头就检查了,network_thread线程是否是当前线程,如果不是则通过network_thread_->Invoke<>()将当前函数投递到network_thread_中去执行:

RTCError JsepTransportController::SetLocalDescription(
    SdpType type,
    const cricket::SessionDescription* description) {
  // network线程运行
  if (!network_thread_->IsCurrent()) {
    return network_thread_->Invoke<RTCError>(
        RTC_FROM_HERE, [=] { return SetLocalDescription(type, description); });
  }

  RTC_DCHECK_RUN_ON(network_thread_);
  if (!initial_offerer_.has_value()) {
    initial_offerer_.emplace(type == SdpType::kOffer);
    if (*initial_offerer_) {
      SetIceRole_n(cricket::ICEROLE_CONTROLLING);
    } else {
      SetIceRole_n(cricket::ICEROLE_CONTROLLED);
    }
  }
  return ApplyDescription_n(/*local=*/true, type, description);
}

2.3 Invoke的实现和线程任务管理

2.3.1 任务的投递

以上述的SetLocalDescription()为例,其调用的network_thread_->Invoke()如下:

template中的第一个参数ReturnT为执行函数的返回值类型,第二个参数用来SFINAE确保返回值不是void(还有一个适配void类型的版本)

作为task的任务被转化成了FunctionView类型的functor,传入到函数InvokeInternal()

  template <
      class ReturnT,
      typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
  ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
    ReturnT result;
    InvokeInternal(posted_from, [functor, &result] { result = functor(); });
    return result;
  }

InvokeInternal()会将functor转化成Msg handler然后放到this线程队列中去

void Thread::InvokeInternal(const Location& posted_from,
                            rtc::FunctionView<void()> functor) {
  TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
               "src_func", posted_from.function_name());

  class FunctorMessageHandler : public MessageHandler {
   public:
    explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
        : functor_(functor) {}
    void OnMessage(Message* msg) override { functor_(); }

   private:
    rtc::FunctionView<void()> functor_;
  // 将funtor转化成Msg handler
  } handler(functor);
  // 发送到this线程队列中
  Send(posted_from, &handler);
}

Thread::Send()函数中有非常多的细节,首先会判断当前线程和this线程是否相同,是就直接执行,否则生成一个QueueTask 投递到this线程的队列中去

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  RTC_DCHECK(!IsQuitting());
  if (IsQuitting())
    return;

  // Sent messages are sent to the MessageHandler directly, in the context
  // of "thread", like Win32 SendMessage. If in the right context,
  // call the handler directly.
  // 构造成msg
  Message msg;
  msg.posted_from = posted_from;
  msg.phandler = phandler;
  msg.message_id = id;
  msg.pdata = pdata;

  // 如果当前线程就是this线程的话,直接将
  // 执行任务即可
  if (IsCurrent()) {
    msg.phandler->OnMessage(&msg);
    return;
  }

  AssertBlockingIsAllowedOnCurrentThread();

  // 获取当前线程
  Thread* current_thread = Thread::Current();

#if RTC_DCHECK_IS_ON
  if (current_thread) {
    RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
    ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
                                                             this);
  }
#endif

  // Perhaps down the line we can get rid of this workaround and always require
  // current_thread to be valid when Send() is called.
  std::unique_ptr<rtc::Event> done_event;
  if (!current_thread)
    done_event.reset(new rtc::Event());

  bool ready = false;
  // 将msg封装达成QueueTask,放到线程队列中
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
      [this, &ready, current_thread, done = done_event.get()] {
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp();
        } else {
          done->Set();
        }
      }));

  if (current_thread) {
    // 当前的thread是google thread
    bool waited = false;
    crit_.Enter();
    while (!ready) {
      // 任务未执行完,阻塞等待到任务完成被唤醒
      crit_.Leave(); 
      current_thread->socketserver()->Wait(kForever, false); // epoll wait
      waited = true;
      crit_.Enter();
    }
    crit_.Leave();

    // Our Wait loop above may have consumed some WakeUp events for this
    // Thread, that weren't relevant to this Send.  Losing these WakeUps can
    // cause problems for some SocketServers.
    //
    // Concrete example:
    // Win32SocketServer on thread A calls Send on thread B.  While processing
    // the message, thread B Posts a message to A.  We consume the wakeup for
    // that Post while waiting for the Send to complete, which means that when
    // we exit this loop, we need to issue another WakeUp, or else the Posted
    // message won't be processed in a timely manner.

    if (waited) {
      // socketserver有两个使用场景
      // 1.像这种给别的线程投递了阻塞任务后,进行wait等到执行完毕
      // 2.Thread::Get()函数中获取消息的时候,如果获取不到就会陷入永久的wait直到被wakup()
      // 对于第二点,此处提到了一个问题,A向B投递了一个阻塞任务task1后wait等待结果,此时别的线程
      // 向A的队列投递了一个任务task1,投递的时候会有wakeup()的操作,那么上面检测ready的loop会把
      // 这个wakeup()给吃掉,当任务完成时,由于wakeup被吃掉了,导致线程获取task得时候会陷入wait
      // 无法及时处理task,(表述上确实如此,但代码上看似乎没有这样得问题,因为是先检测队列是否为空
      // 再继续wait的)
      current_thread->socketserver()->WakeUp();
    }
  } else {
    // 非常google thread
    done_event->Wait(rtc::Event::kForever);
  }
}

先跟着主流程走看看这个PostTask()函数, PostTask()内部直接调用了POST(), 在POST()中把msg再次封装成一个rtc::message然后投递到this线程的任务队列messages_中,然后执行WakeUpSocketServer() 唤醒this线程消费任务,至此,任务的投递过程就完成了;

void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
  // Though Post takes MessageData by raw pointer (last parameter), it still
  // takes it with ownership.
  Post(RTC_FROM_HERE, &queued_task_handler_,
       /*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}

void Thread::Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata,
                  bool time_sensitive) {
  RTC_DCHECK(!time_sensitive);
  if (IsQuitting()) {
    delete pdata;
    return;
  }

  // Keep thread safe
  // Add the message to the end of the queue
  // Signal for the multiplexer to return

  {
    CritScope cs(&crit_);
    // 将QueueTask 封装成 rtc::message 放到message队列中
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    messages_.push_back(msg);
  }
  // 唤醒this线程消费任务
  WakeUpSocketServer();
}

2.3.2 任务的消费

接下来看看task的消费流程,thread启动之后会运行Run() 然后运行ProcessMessages()

void Thread::Run() {
  ProcessMessages(kForever);
}

bool Thread::ProcessMessages(int cmsLoop) {
  // Using ProcessMessages with a custom clock for testing and a time greater
  // than 0 doesn't work, since it's not guaranteed to advance the custom
  // clock's time, and may get stuck in an infinite loop.
  RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
             cmsLoop == kForever);
  int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
  int cmsNext = cmsLoop;

  while (true) {
#if defined(WEBRTC_MAC)
    ScopedAutoReleasePool pool;
#endif
    Message msg;
    // 获取消息
    if (!Get(&msg, cmsNext))
      return !IsQuitting();
    // 分发处理
    Dispatch(&msg);

    if (cmsLoop != kForever) {
      cmsNext = static_cast<int>(TimeUntil(msEnd));
      if (cmsNext < 0)
        return true;
    }
  }
}

Thread::Get()中会从消息队列messages_获取消息,看起来很长,核心的只有几句:

遍历delay_messages_,获取到期消息并放入到messages_队列中

将messages_存在的消息取出,返回出去,如果没有,则break陷入阻塞直到被唤醒

bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
  // Return and clear peek if present
  // Always return the peek if it exists so there is Peek/Get symmetry

  if (fPeekKeep_) {
    *pmsg = msgPeek_;
    fPeekKeep_ = false;
    return true;
  }

  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch

  int64_t cmsTotal = cmsWait;
  int64_t cmsElapsed = 0;
  int64_t msStart = TimeMillis();
  int64_t msCurrent = msStart;
  while (true) {
    // Check for posted events
    int64_t cmsDelayNext = kForever;
    bool first_pass = true;
    while (true) {
      // All queue operations need to be locked, but nothing else in this loop
      // (specifically handling disposed message) can happen inside the crit.
      // Otherwise, disposed MessageHandlers will cause deadlocks.
      {
        CritScope cs(&crit_);
        // On the first pass, check for delayed messages that have been
        // triggered and calculate the next trigger time.
        if (first_pass) {
          first_pass = false;
          // 遍历delay message到期消息
          while (!delayed_messages_.empty()) {
            if (msCurrent < delayed_messages_.top().run_time_ms_) {
              cmsDelayNext =
                  TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
              break;
            }
            // 将到期消息移动到messages_队列中
            messages_.push_back(delayed_messages_.top().msg_);
            delayed_messages_.pop();
          }
        }
        // Pull a message off the message queue, if available.
        // 获取messages_任务
        if (messages_.empty()) {
          break;
        } else {
          *pmsg = messages_.front();
          messages_.pop_front();
        }
      }  // crit_ is released here.

      // If this was a dispose message, delete it and skip it.
      if (MQID_DISPOSE == pmsg->message_id) {
        RTC_DCHECK(nullptr == pmsg->phandler);
        delete pmsg->pdata;
        *pmsg = Message();
        continue;
      }
      return true;
    }

    if (IsQuitting())
      break;

    // Which is shorter, the delay wait or the asked wait?

    int64_t cmsNext;
    if (cmsWait == kForever) {
      cmsNext = cmsDelayNext;
    } else {
      cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
        cmsNext = cmsDelayNext;
    }

    {
      // 阻塞直到消息来
      // Wait and multiplex in the meantime
      if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
        return false;
    }

    // If the specified timeout expired, return

    msCurrent = TimeMillis();
    cmsElapsed = TimeDiff(msCurrent, msStart);
    if (cmsWait != kForever) {
      if (cmsElapsed >= cmsWait)
        return false;
    }
  }
  return false;
}

当消息被获取出,就调用dispatch()然后执行,至此,任务就被执行完成了

void Thread::Dispatch(Message* pmsg) {
  TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
               pmsg->posted_from.file_name(), "src_func",
               pmsg->posted_from.function_name());
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time = TimeMillis();
  pmsg->phandler->OnMessage(pmsg);// 执行
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch. Posted from: "
                     << pmsg->posted_from.ToString();
    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_ = diff + 1;
  }
}

调用QueuedTaskHandler::OnMessage(),从msg->pdata中还原成QueueTask运行,然后release释放掉

void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
  RTC_DCHECK(msg);
  //取出data 还原成queue task
  auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
  std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
  // Thread expects handler to own Message::pdata when OnMessage is called
  // Since MessageData is no longer needed, delete it.
  delete data;

  // 运行之后释放
  // QueuedTask interface uses Run return value to communicate who owns the
  // task. false means QueuedTask took the ownership.
  if (!task->Run())
    task.release();
}

2.3.3 执行结果的返回

上述task执行的时候涉及到两个非常重要的函数task->Run()task.release():


task->Run()会将最初的投递进来的函数运行然后存到result中,流程如下

首先构造QueueTask时的lambda

  void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  
  ......
  
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },      // <= Run()
      [this, &ready, current_thread, done = done_event.get()] {
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp();
        } else {
          done->Set();
        }
      }));
      
  ......
}

msg.phandler->OnMessage(&msg) 中的phanlder是InvokeInternal构造的FunctorMessageHandler,调用的override的OnMessage()函数

void Thread::InvokeInternal(const Location& posted_from,
                            rtc::FunctionView<void()> functor) {
  TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
               "src_func", posted_from.function_name());

  class FunctorMessageHandler : public MessageHandler {
   public:
    explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
        : functor_(functor) {}
    void OnMessage(Message* msg) override { functor_(); } // <= OnMessage()

   private:
    rtc::FunctionView<void()> functor_;
  } handler(functor);
  Send(posted_from, &handler);
}

functor_()是Invoke中是将result = functor(); 封装成的一个lambda,所以执行完成后的result会存在该result中;

  template <
      class ReturnT,
      typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
  ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
    ReturnT result;
    InvokeInternal(posted_from, [functor, &result] { result = functor(); });
    return result;
  }

在之前投递任务的send函数中,当前线程PostTask()后就开始current_thread->socketserver()->Wait(kForever, false), 陷入阻塞;

task.release()则会唤醒投递任务后陷入wait的当前线程,让其将result返回

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
......


  std::unique_ptr<rtc::Event> done_event;
  if (!current_thread)
    done_event.reset(new rtc::Event());

  bool ready = false;
  // 将msg封装达成QueueTask,放到线程队列中
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
      [this, &ready, current_thread, done = done_event.get()] { // <= task.release()
        if (current_thread) {
          CritScope cs(&crit_);
          ready = true;
          current_thread->socketserver()->WakeUp(); // 唤醒
        } else {
          done->Set();
        }
      }));

  if (current_thread) {
    // 当前的thread是google thread
    bool waited = false;
    crit_.Enter();
    while (!ready) {
      // 任务未执行完,阻塞等待到任务完成被唤醒
      crit_.Leave(); 
      current_thread->socketserver()->Wait(kForever, false); // epoll wait
      waited = true;
      crit_.Enter();
    }
    crit_.Leave();

    // Our Wait loop above may have consumed some WakeUp events for this
    // Thread, that weren't relevant to this Send.  Losing these WakeUps can
    // cause problems for some SocketServers.
    //
    // Concrete example:
    // Win32SocketServer on thread A calls Send on thread B.  While processing
    // the message, thread B Posts a message to A.  We consume the wakeup for
    // that Post while waiting for the Send to complete, which means that when
    // we exit this loop, we need to issue another WakeUp, or else the Posted
    // message won't be processed in a timely manner.

    if (waited) {
      // socketserver有两个使用场景
      // 1.像这种给别的线程投递了阻塞任务后,进行wait等到执行完毕
      // 2.Thread::Get()函数中获取消息的时候,如果获取不到就会陷入永久的wait直到被wakup()
      // 对于第二点,此处提到了一个问题,A向B投递了一个阻塞任务task1后wait等待结果,此时别的线程
      // 向A的队列投递了一个任务task1,投递的时候会有wakeup()的操作,那么上面检测ready的loop会把
      // 这个wakeup()给吃掉,当任务完成时,由于wakeup被吃掉了,导致线程获取task得时候会陷入wait
      // 无法及时处理task,(表述上确实如此,但代码上看似乎没有这样得问题,因为是先检测队列是否为空
      // 再继续wait的)
      current_thread->socketserver()->WakeUp();
    }
  } else {
    // 非常google thread
    done_event->Wait(rtc::Event::kForever);
  }
}

2.4 API类的异步代理类

signaling_thread_ 是用来处理Api层次任务的线程,类对外的接口会通过代理类,将内部的接口任务投递到signaling_thread_ 中;

比如类PeerConnection,在api\peer_connection_proxy.h定义了其代理类如下

BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
              AddTrack,
              rtc::scoped_refptr<MediaStreamTrackInterface>,
              const std::vector<std::string>&)
.....

这样的代理类是通过api/proxy.h的一组宏完成的的,这组宏的用法在文件有很详细的说明:

//
// Example usage:
// 1. 创建interface类
// class TestInterface : public rtc::RefCountInterface {
//  public:
//   std::string FooA() = 0;
//   std::string FooB(bool arg1) const = 0;
//   std::string FooC(bool arg1) = 0;
//  };
//
// Note that return types can not be a const reference.
// 2.继承接口类,实现
// class Test : public TestInterface {
// ... implementation of the interface.
// };
//
// 3. 通过宏生命代理类
// BEGIN_PROXY_MAP(Test)
//   PROXY_PRIMARY_THREAD_DESTRUCTOR()
//   PROXY_METHOD0(std::string, FooA)
//   PROXY_CONSTMETHOD1(std::string, FooB, arg1)
//   PROXY_SECONDARY_METHOD1(std::string, FooC, arg1)
// END_PROXY_MAP()
//
// Where the destructor and first two methods are invoked on the primary
// thread, and the third is invoked on the secondary thread.
//
// The proxy can be created using
// 4.创建代理对象
//   TestProxy::Create(Thread* signaling_thread, Thread* worker_thread,
//                     TestInterface*).
//

将PeerConnection的代理宏展开

template <class INTERNAL_CLASS> 
class PeerConnectionProxyWithInternal; 
typedef PeerConnectionProxyWithInternal<PeerConnectionInterface> PeerConnectionProxy; 

// 代理类继承PeerConnectionInterface接口
template <class INTERNAL_CLASS> class PeerConnectionProxyWithInternal : public PeerConnectionInterface {
protected: 
     typedef PeerConnectionInterface C; 
     public: 
     const INTERNAL_CLASS* internal() const { return c_; } 

     INTERNAL_CLASS* internal() { return c_; } 

protected:
     PeerConnectionProxyWithInternal(rtc::Thread* primary_thread, 
        rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) : 
     primary_thread_(primary_thread), secondary_thread_(secondary_thread), 
     c_(PeerConnection) {} 

private: 
	 // 放入的两个线程
     mutable rtc::Thread* primary_thread_; 
     mutable rtc::Thread* secondary_thread_; 
     protected:
      ~PeerConnectionProxyWithInternal() {
           MethodCall<PeerConnectionProxyWithInternal, void> call( this, &PeerConnectionProxyWithInternal::DestroyInternal); 
           call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 28), destructor_thread()); 
    } 

private: 
    void DestroyInternal() { c_ = nullptr; } 

    rtc::scoped_refptr<INTERNAL_CLASS> c_; 

public: 
	// 创建代理对象静态方法
    static rtc::scoped_refptr<PeerConnectionProxyWithInternal> 
    Create( rtc::Thread* primary_thread, rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) {
         return new rtc::RefCountedObject<PeerConnectionProxyWithInternal>( primary_thread, secondary_thread, PeerConnection); 
    }

private:
     rtc::Thread* destructor_thread() const { return primary_thread_; } 

public:
	// local_streams的代理方法
    rtc::scoped_refptr<StreamCollectionInterface> local_streams() override {
         MethodCall<C, rtc::scoped_refptr<StreamCollectionInterface>> call(c_, &C::local_streams); 
         return call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 30), primary_thread_);
    }
    

代理类创建的时候可以放入两个目的线程primary_thread_secondary_thread_ ,用来提供给代理方法使用。

local_streams()的代理方法是通过宏PROXY_METHOD0()创建的,该宏会创建一个MethodCall<> call封装要执行的函数local_streams(),然后通过call.Marshal()将任务投递到primary_thread_

Marshal()方法如下所示:

  R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
    if (t->IsCurrent()) {
      // 是当前线程,Invoke
      Invoke(std::index_sequence_for<Args...>());
    } else {
      // 不是则PostTask 然后阻塞等
      t->PostTask(std::unique_ptr<QueuedTask>(this));
      event_.Wait(rtc::Event::kForever);
    }
    return r_.moved_result();
  }


这篇关于webrtc源码分析-线程任务管理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程