ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
PeerConnectionManager.h
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 // ----------------------------------------------------------------------------
8 // Contains source code from
9 // https://github.com/mpromonet/webrtc-streamer
10 //
11 // This software is in the public domain, furnished "as is", without technical
12 // support, and with no warranty, express or implied, as to its usefulness for
13 // any purpose.
14 // ----------------------------------------------------------------------------
15 //
16 // This is a private header. It shall be hidden from CloudViewer's public API.
17 // Do not put this in CloudViewer.h.in.
18 
19 #pragma once
20 
21 #include <api/peer_connection_interface.h>
22 #include <rtc_base/strings/json.h>
23 
24 #include <future>
25 #include <mutex>
26 #include <regex>
27 #include <string>
28 #include <thread>
29 #include <unordered_map>
30 
34 
35 namespace cloudViewer {
36 namespace visualization {
37 namespace webrtc_server {
38 
76  class VideoSink : public rtc::VideoSinkInterface<webrtc::VideoFrame> {
77  public:
78  VideoSink(webrtc::VideoTrackInterface* track) : track_(track) {
79  track_->AddOrUpdateSink(this, rtc::VideoSinkWants());
80  }
81  virtual ~VideoSink() { track_->RemoveSink(this); }
82 
83  // VideoSinkInterface implementation
84  virtual void OnFrame(const webrtc::VideoFrame& video_frame) {
85  rtc::scoped_refptr<webrtc::I420BufferInterface> buffer(
86  video_frame.video_frame_buffer()->ToI420());
87  utility::LogDebug("[{}] frame: {}x{}", CLOUDVIEWER_FUNCTION,
88  buffer->height(), buffer->width());
89  }
90 
91  protected:
92  rtc::scoped_refptr<webrtc::VideoTrackInterface> track_;
93  };
94 
95  class SetSessionDescriptionObserver
96  : public webrtc::SetSessionDescriptionObserver {
97  public:
98  static SetSessionDescriptionObserver* Create(
99  webrtc::PeerConnectionInterface* pc,
100  std::promise<const webrtc::SessionDescriptionInterface*>&
101  promise) {
102  return new rtc::RefCountedObject<SetSessionDescriptionObserver>(
103  pc, promise);
104  }
105  virtual void OnSuccess() {
106  std::string sdp;
107  if (pc_->local_description()) {
108  promise_.set_value(pc_->local_description());
109  pc_->local_description()->ToString(&sdp);
110  } else if (pc_->remote_description()) {
111  promise_.set_value(pc_->remote_description());
112  pc_->remote_description()->ToString(&sdp);
113  }
114  }
115  virtual void OnFailure(webrtc::RTCError error) {
116  utility::LogWarning("{}", error.message());
117  promise_.set_value(nullptr);
118  }
119 
120  protected:
121  SetSessionDescriptionObserver(
122  webrtc::PeerConnectionInterface* pc,
123  std::promise<const webrtc::SessionDescriptionInterface*>&
124  promise)
125  : pc_(pc), promise_(promise) {};
126 
127  private:
128  webrtc::PeerConnectionInterface* pc_;
129  std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
130  };
131 
132  class CreateSessionDescriptionObserver
133  : public webrtc::CreateSessionDescriptionObserver {
134  public:
135  static CreateSessionDescriptionObserver* Create(
136  webrtc::PeerConnectionInterface* pc,
137  std::promise<const webrtc::SessionDescriptionInterface*>&
138  promise) {
139  return new rtc::RefCountedObject<CreateSessionDescriptionObserver>(
140  pc, promise);
141  }
142  virtual void OnSuccess(webrtc::SessionDescriptionInterface* desc) {
143  std::string sdp;
144  desc->ToString(&sdp);
145  pc_->SetLocalDescription(
146  SetSessionDescriptionObserver::Create(pc_, promise_), desc);
147  }
148  virtual void OnFailure(webrtc::RTCError error) {
149  utility::LogWarning("{}", error.message());
150  promise_.set_value(nullptr);
151  }
152 
153  protected:
154  CreateSessionDescriptionObserver(
155  webrtc::PeerConnectionInterface* pc,
156  std::promise<const webrtc::SessionDescriptionInterface*>&
157  promise)
158  : pc_(pc), promise_(promise) {};
159 
160  private:
161  webrtc::PeerConnectionInterface* pc_;
162  std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
163  };
164 
165  class PeerConnectionStatsCollectorCallback
166  : public webrtc::RTCStatsCollectorCallback {
167  public:
168  PeerConnectionStatsCollectorCallback() {}
169  void clearReport() { report_.clear(); }
170  Json::Value getReport() { return report_; }
171 
172  protected:
173  virtual void OnStatsDelivered(
174  const rtc::scoped_refptr<const webrtc::RTCStatsReport>&
175  report) {
176  for (const webrtc::RTCStats& stats : *report) {
177  Json::Value stats_members;
178  for (const webrtc::RTCStatsMemberInterface* member :
179  stats.Members()) {
180  stats_members[member->name()] = member->ValueToString();
181  }
182  report_[stats.id()] = stats_members;
183  }
184  }
185 
186  Json::Value report_;
187  };
188 
189  class DataChannelObserver : public webrtc::DataChannelObserver {
190  public:
191  DataChannelObserver(
192  PeerConnectionManager* peer_connection_manager,
193  rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
194  const std::string& peerid)
195  : peer_connection_manager_(peer_connection_manager),
196  data_channel_(data_channel),
197  peerid_(peerid) {
198  data_channel_->RegisterObserver(this);
199  }
200  virtual ~DataChannelObserver() { data_channel_->UnregisterObserver(); }
201 
202  // DataChannelObserver interface
203  virtual void OnStateChange() {
204  // Useful to know when the data channel is established.
205  const std::string label = data_channel_->label();
206  const std::string state =
207  webrtc::DataChannelInterface::DataStateString(
208  data_channel_->state());
210  "DataChannelObserver::OnStateChange label: {}, state: {}, "
211  "peerid: {}",
212  label, state, peerid_);
213  std::string msg(label + " " + state);
214  webrtc::DataBuffer buffer(msg);
215  data_channel_->Send(buffer);
216  // ClientDataChannel is established after ServerDataChannel. Once
217  // ClientDataChannel is established, we need to send initial frames
218  // to the client such that the video is not empty. Afterwards,
219  // video frames will only be sent when the GUI redraws.
220  if (label == "ClientDataChannel" && state == "open") {
221  {
222  std::lock_guard<std::mutex> mutex_lock(
223  peer_connection_manager_
225  peer_connection_manager_->peerid_data_channel_ready_.insert(
226  peerid_);
227  }
228  peer_connection_manager_->SendInitFramesToPeer(peerid_);
229  }
230  if (label == "ClientDataChannel" &&
231  (state == "closed" || state == "closing")) {
232  std::lock_guard<std::mutex> mutex_lock(
233  peer_connection_manager_->peerid_data_channel_mutex_);
234  peer_connection_manager_->peerid_data_channel_ready_.erase(
235  peerid_);
236  }
237  }
238  virtual void OnMessage(const webrtc::DataBuffer& buffer) {
239  std::string msg((const char*)buffer.data.data(),
240  buffer.data.size());
241  utility::LogDebug("DataChannelObserver::OnMessage: {}, msg: {}.",
242  data_channel_->label(), msg);
243  std::string reply =
244  WebRTCWindowSystem::GetInstance()->OnDataChannelMessage(
245  msg);
246  if (!reply.empty()) {
247  webrtc::DataBuffer buffer(reply);
248  data_channel_->Send(buffer);
249  }
250  }
251 
252  protected:
253  PeerConnectionManager* peer_connection_manager_;
254  rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
255  const std::string peerid_;
256  };
257 
258  class PeerConnectionObserver : public webrtc::PeerConnectionObserver {
259  public:
260  PeerConnectionObserver(
261  PeerConnectionManager* peer_connection_manager,
262  const std::string& peerid,
263  const webrtc::PeerConnectionInterface::RTCConfiguration& config,
264  std::unique_ptr<cricket::PortAllocator> port_allocator)
265  : peer_connection_manager_(peer_connection_manager),
266  peerid_(peerid),
267  local_channel_(nullptr),
268  remote_channel_(nullptr),
269  ice_candidate_list_(Json::arrayValue),
270  deleting_(false) {
271  pc_ = peer_connection_manager_->peer_connection_factory_
272  ->CreatePeerConnection(config,
273  std::move(port_allocator),
274  nullptr, this);
275 
276  if (pc_.get()) {
277  rtc::scoped_refptr<webrtc::DataChannelInterface> channel =
278  pc_->CreateDataChannel("ServerDataChannel", nullptr);
279  local_channel_ = new DataChannelObserver(
280  peer_connection_manager_, channel, peerid_);
281  }
282 
283  stats_callback_ = new rtc::RefCountedObject<
284  PeerConnectionStatsCollectorCallback>();
285  };
286 
287  virtual ~PeerConnectionObserver() {
288  delete local_channel_;
289  delete remote_channel_;
290  if (pc_.get()) {
291  // warning: pc->close call OnIceConnectionChange
292  deleting_ = true;
293  pc_->Close();
294  }
295  }
296 
297  Json::Value GetIceCandidateList() { return ice_candidate_list_; }
298 
299  Json::Value GetStats() {
300  stats_callback_->clearReport();
301  pc_->GetStats(stats_callback_);
302  int count = 10;
303  while ((stats_callback_->getReport().empty()) && (--count > 0)) {
304  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
305  }
306  return Json::Value(stats_callback_->getReport());
307  };
308 
309  rtc::scoped_refptr<webrtc::PeerConnectionInterface>
311  return pc_;
312  };
313 
314  // PeerConnectionObserver interface
315  virtual void OnAddStream(
316  rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
317  utility::LogDebug("[{}] GetVideoTracks().size(): {}.",
319  stream->GetVideoTracks().size());
320  webrtc::VideoTrackVector videoTracks = stream->GetVideoTracks();
321  if (videoTracks.size() > 0) {
322  video_sink_.reset(new VideoSink(videoTracks.at(0)));
323  }
324  }
325  virtual void OnRemoveStream(
326  rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
327  video_sink_.reset();
328  }
329  virtual void OnDataChannel(
330  rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
332  "PeerConnectionObserver::OnDataChannel peerid: {}",
333  peerid_);
334  remote_channel_ = new DataChannelObserver(peer_connection_manager_,
335  channel, peerid_);
336  }
337  virtual void OnRenegotiationNeeded() {
338  std::lock_guard<std::mutex> mutex_lock(
339  peer_connection_manager_->peerid_data_channel_mutex_);
340  peer_connection_manager_->peerid_data_channel_ready_.erase(peerid_);
342  "PeerConnectionObserver::OnRenegotiationNeeded peerid: {}",
343  peerid_);
344  }
345  virtual void OnIceCandidate(
346  const webrtc::IceCandidateInterface* candidate);
347 
348  virtual void OnSignalingChange(
349  webrtc::PeerConnectionInterface::SignalingState state) {
350  utility::LogDebug("state: {}, peerid: {}", state, peerid_);
351  }
352  virtual void OnIceConnectionChange(
353  webrtc::PeerConnectionInterface::IceConnectionState state) {
354  if ((state ==
355  webrtc::PeerConnectionInterface::kIceConnectionFailed) ||
356  (state ==
357  webrtc::PeerConnectionInterface::kIceConnectionClosed)) {
358  ice_candidate_list_.clear();
359  if (!deleting_) {
360  std::thread([this]() {
361  peer_connection_manager_->HangUp(peerid_);
362  }).detach();
363  }
364  }
365  }
366 
367  virtual void OnIceGatheringChange(
368  webrtc::PeerConnectionInterface::IceGatheringState) {}
369 
370  private:
371  PeerConnectionManager* peer_connection_manager_;
372  const std::string peerid_;
373  rtc::scoped_refptr<webrtc::PeerConnectionInterface> pc_;
374  DataChannelObserver* local_channel_;
375  DataChannelObserver* remote_channel_;
376  Json::Value ice_candidate_list_;
377  rtc::scoped_refptr<PeerConnectionStatsCollectorCallback>
378  stats_callback_;
379  std::unique_ptr<VideoSink> video_sink_;
380  bool deleting_;
381  };
382 
383 public:
384  PeerConnectionManager(const std::list<std::string>& ice_server_list,
385  const Json::Value& config,
386  const std::string& publish_filter,
387  const std::string& webrtc_udp_port_range);
388  virtual ~PeerConnectionManager();
389 
391  const std::map<std::string, HttpServerRequestHandler::HttpFunction>
392  GetHttpApi();
393 
394  const Json::Value GetIceCandidateList(const std::string& peerid);
395  const Json::Value AddIceCandidate(const std::string& peerid,
396  const Json::Value& json_message);
397  const Json::Value GetMediaList();
398  const Json::Value HangUp(const std::string& peerid);
399  const Json::Value Call(const std::string& peerid,
400  const std::string& window_uid,
401  const std::string& options,
402  const Json::Value& json_message);
403  const Json::Value GetIceServers();
404 
405  void SendInitFramesToPeer(const std::string& peerid);
406 
407  void CloseWindowConnections(const std::string& window_uid);
408 
409  void OnFrame(const std::string& window_uid,
410  const std::shared_ptr<core::Tensor>& im);
411 
412 protected:
413  rtc::scoped_refptr<BitmapTrackSourceInterface> GetVideoTrackSource(
414  const std::string& window_uid);
415  PeerConnectionObserver* CreatePeerConnection(const std::string& peerid);
416  bool AddStreams(webrtc::PeerConnectionInterface* peer_connection,
417  const std::string& window_uid,
418  const std::string& options);
419  rtc::scoped_refptr<BitmapTrackSourceInterface> CreateVideoSource(
420  const std::string& window_uid,
421  const std::map<std::string, std::string>& opts);
422  bool WindowStillUsed(const std::string& window_uid);
423  rtc::scoped_refptr<webrtc::PeerConnectionInterface> GetPeerConnection(
424  const std::string& peerid);
425 
426 protected:
427  std::unique_ptr<webrtc::TaskQueueFactory> task_queue_factory_;
428  rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
430 
431  // Each peer has exactly one connection.
432  std::unordered_map<std::string, PeerConnectionObserver*>
435  // Set of peerids with data channel ready for communication
436  std::unordered_set<std::string> peerid_data_channel_ready_;
438 
439  // Each Window has exactly one TrackSource.
440  std::unordered_map<std::string,
441  rtc::scoped_refptr<BitmapTrackSourceInterface>>
444 
445  // Each Window can be connected to zero, one or more peers.
446  std::unordered_map<std::string, std::set<std::string>>
448  std::unordered_map<std::string, std::string> peerid_to_window_uid_;
449  // Shared by window_uid_to_peerids_ and peerid_to_window_uid_.
451 
452  std::list<std::string> ice_server_list_;
453  const Json::Value config_;
454  const std::regex publish_filter_;
455  std::map<std::string, HttpServerRequestHandler::HttpFunction> func_;
456  std::string webrtc_port_range_;
457 };
458 
459 } // namespace webrtc_server
460 } // namespace visualization
461 } // namespace cloudViewer
462 
463 namespace fmt {
464 
465 template <>
466 struct formatter<webrtc::PeerConnectionInterface::SignalingState> {
467  template <typename FormatContext>
468  auto format(const webrtc::PeerConnectionInterface::SignalingState& state,
469  FormatContext& ctx) const -> decltype(ctx.out()) {
470  using namespace webrtc;
471  const char* text = nullptr;
472  switch (state) {
473  case PeerConnectionInterface::SignalingState::kStable:
474  text = "kStable";
475  break;
476  case PeerConnectionInterface::SignalingState::kHaveLocalOffer:
477  text = "kHaveLocalOffer";
478  break;
479  case PeerConnectionInterface::SignalingState::kHaveLocalPrAnswer:
480  text = "kHaveLocalPrAnswer";
481  break;
482  case PeerConnectionInterface::SignalingState::kHaveRemoteOffer:
483  text = "kHaveRemoteOffer";
484  break;
485  case PeerConnectionInterface::SignalingState::kHaveRemotePrAnswer:
486  text = "kHaveRemotePrAnswer";
487  break;
488  case PeerConnectionInterface::SignalingState::kClosed:
489  text = "kClosed";
490  break;
491  default:
492  text = "unknown";
493  }
494  return format_to(ctx.out(), "{}", text);
495  }
496 
497  template <typename ParseContext>
498  constexpr auto parse(ParseContext& ctx) -> decltype(ctx.begin()) {
499  return ctx.begin();
500  }
501 };
502 
503 } // namespace fmt
int count
#define CLOUDVIEWER_FUNCTION
Definition: Macro.h:43
void clear() override
Clears the entity from all its points and features.
void OnFrame(const std::string &window_uid, const std::shared_ptr< core::Tensor > &im)
std::unordered_map< std::string, std::set< std::string > > window_uid_to_peerids_
PeerConnectionObserver * CreatePeerConnection(const std::string &peerid)
const Json::Value GetIceCandidateList(const std::string &peerid)
rtc::scoped_refptr< webrtc::PeerConnectionFactoryInterface > peer_connection_factory_
rtc::scoped_refptr< BitmapTrackSourceInterface > GetVideoTrackSource(const std::string &window_uid)
std::unordered_map< std::string, rtc::scoped_refptr< BitmapTrackSourceInterface > > window_uid_to_track_source_
bool AddStreams(webrtc::PeerConnectionInterface *peer_connection, const std::string &window_uid, const std::string &options)
std::map< std::string, HttpServerRequestHandler::HttpFunction > func_
PeerConnectionManager(const std::list< std::string > &ice_server_list, const Json::Value &config, const std::string &publish_filter, const std::string &webrtc_udp_port_range)
std::unordered_map< std::string, PeerConnectionObserver * > peerid_to_connection_
std::unordered_map< std::string, std::string > peerid_to_window_uid_
const Json::Value Call(const std::string &peerid, const std::string &window_uid, const std::string &options, const Json::Value &json_message)
std::unique_ptr< webrtc::TaskQueueFactory > task_queue_factory_
const std::map< std::string, HttpServerRequestHandler::HttpFunction > GetHttpApi()
rtc::scoped_refptr< webrtc::PeerConnectionInterface > GetPeerConnection(const std::string &peerid)
rtc::scoped_refptr< BitmapTrackSourceInterface > CreateVideoSource(const std::string &window_uid, const std::map< std::string, std::string > &opts)
const Json::Value AddIceCandidate(const std::string &peerid, const Json::Value &json_message)
static std::shared_ptr< WebRTCWindowSystem > GetInstance()
#define LogWarning(...)
Definition: Logging.h:72
#define LogDebug(...)
Definition: Logging.h:90
static void error(char *msg)
Definition: lsd.c:159
Generic file read and write utility for python interface.
constexpr auto parse(ParseContext &ctx) -> decltype(ctx.begin())
auto format(const webrtc::PeerConnectionInterface::SignalingState &state, FormatContext &ctx) const -> decltype(ctx.out())