ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
PeerConnectionManager.cpp
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 // ----------------------------------------------------------------------------
8 // Contains source code from
9 // https://github.com/mpromonet/webrtc-streamer
10 //
11 // This software is in the public domain, furnished "as is", without technical
12 // support, and with no warranty, express or implied, as to its usefulness for
13 // any purpose.
14 // ----------------------------------------------------------------------------
15 
17 
18 #include <IJsonConvertible.h>
19 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
20 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
21 #include <api/rtc_event_log/rtc_event_log_factory.h>
22 #include <api/task_queue/default_task_queue_factory.h>
23 #include <api/video_codecs/builtin_video_decoder_factory.h>
24 #include <api/video_codecs/builtin_video_encoder_factory.h>
25 #include <media/engine/webrtc_media_engine.h>
26 #include <modules/audio_device/include/fake_audio_device.h>
27 #include <p2p/client/basic_port_allocator.h>
28 
29 #include <fstream>
30 #include <functional>
31 #include <utility>
32 
37 
38 namespace cloudViewer {
39 namespace visualization {
40 namespace webrtc_server {
41 
42 // Names used for a IceCandidate JSON object.
43 const char k_candidate_sdp_mid_name[] = "sdpMid";
44 const char k_candidate_sdp_mline_index_name[] = "sdpMLineIndex";
45 const char k_candidate_sdp_name[] = "candidate";
46 
47 // Names used for a SessionDescription JSON object.
48 const char k_session_description_type_name[] = "type";
49 const char k_session_description_sdp_name[] = "sdp";
50 
51 struct IceServer {
52  std::string url;
53  std::string user;
54  std::string pass;
55 };
56 
57 static IceServer GetIceServerFromUrl(const std::string &url) {
58  IceServer srv;
59  srv.url = url;
60 
61  std::size_t pos = url.find_first_of(':');
62  if (pos != std::string::npos) {
63  std::string protocol = url.substr(0, pos);
64  std::string uri = url.substr(pos + 1);
65  std::string credentials;
66 
67  std::size_t pos = uri.rfind('@');
68  if (pos != std::string::npos) {
69  credentials = uri.substr(0, pos);
70  uri = uri.substr(pos + 1);
71  }
72  srv.url = protocol + ":" + uri;
73 
74  if (!credentials.empty()) {
75  pos = credentials.find(':');
76  if (pos == std::string::npos) {
77  srv.user = credentials;
78  } else {
79  srv.user = credentials.substr(0, pos);
80  srv.pass = credentials.substr(pos + 1);
81  }
82  }
83  }
84 
85  return srv;
86 }
87 
88 static webrtc::PeerConnectionFactoryDependencies
90  webrtc::PeerConnectionFactoryDependencies dependencies;
91  dependencies.network_thread = nullptr;
92  dependencies.worker_thread = rtc::Thread::Current();
93  dependencies.signaling_thread = nullptr;
94  dependencies.call_factory = webrtc::CreateCallFactory();
95  dependencies.task_queue_factory = webrtc::CreateDefaultTaskQueueFactory();
96  dependencies.event_log_factory =
97  absl::make_unique<webrtc::RtcEventLogFactory>(
98  dependencies.task_queue_factory.get());
99 
100  cricket::MediaEngineDependencies media_dependencies;
101  media_dependencies.task_queue_factory =
102  dependencies.task_queue_factory.get();
103 
104  // Dummy audio factory.
105  rtc::scoped_refptr<webrtc::AudioDeviceModule> audio_device_module(
106  new webrtc::FakeAudioDeviceModule());
107  media_dependencies.adm = std::move(audio_device_module);
108  media_dependencies.audio_encoder_factory =
109  webrtc::CreateBuiltinAudioEncoderFactory();
110  media_dependencies.audio_decoder_factory =
111  webrtc::CreateBuiltinAudioDecoderFactory();
112  media_dependencies.audio_processing =
113  webrtc::AudioProcessingBuilder().Create();
114 
115  media_dependencies.video_encoder_factory =
116  webrtc::CreateBuiltinVideoEncoderFactory();
117  media_dependencies.video_decoder_factory =
118  webrtc::CreateBuiltinVideoDecoderFactory();
119 
120  dependencies.media_engine =
121  cricket::CreateMediaEngine(std::move(media_dependencies));
122 
123  return dependencies;
124 }
125 
127  const std::list<std::string> &ice_server_list,
128  const Json::Value &config,
129  const std::string &publish_filter,
130  const std::string &webrtc_udp_port_range)
131  : task_queue_factory_(webrtc::CreateDefaultTaskQueueFactory()),
132  peer_connection_factory_(webrtc::CreateModularPeerConnectionFactory(
134  ice_server_list_(ice_server_list),
135  config_(config),
136  publish_filter_(publish_filter) {
137  // Set the webrtc port range.
138  webrtc_port_range_ = webrtc_udp_port_range;
139 
140  // Register api in http server.
141  func_["/api/getMediaList"] = [this](const struct mg_request_info *req_info,
142  const Json::Value &in) -> Json::Value {
143  utility::LogDebug("[Called HTTP API] /api/getMediaList");
144  return this->GetMediaList();
145  };
146 
147  func_["/api/getIceServers"] = [this](const struct mg_request_info *req_info,
148  const Json::Value &in) -> Json::Value {
149  utility::LogDebug("[Called HTTP API] /api/getIceServers");
150  return this->GetIceServers();
151  };
152 
153  func_["/api/call"] = [this](const struct mg_request_info *req_info,
154  const Json::Value &in) -> Json::Value {
155  utility::LogDebug("[Called HTTP API] /api/call");
156  std::string peerid;
157  std::string url; // window_uid.
158  std::string options;
159  if (req_info->query_string) {
160  CivetServer::getParam(req_info->query_string, "peerid", peerid);
161  CivetServer::getParam(req_info->query_string, "url", url);
162  CivetServer::getParam(req_info->query_string, "options", options);
163  }
164  return this->Call(peerid, url, options, in);
165  };
166 
167  func_["/api/getIceCandidate"] =
168  [this](const struct mg_request_info *req_info,
169  const Json::Value &in) -> Json::Value {
170  utility::LogDebug("[Called HTTP API] /api/getIceCandidate");
171  std::string peerid;
172  if (req_info->query_string) {
173  CivetServer::getParam(req_info->query_string, "peerid", peerid);
174  }
175  return this->GetIceCandidateList(peerid);
176  };
177 
178  func_["/api/addIceCandidate"] =
179  [this](const struct mg_request_info *req_info,
180  const Json::Value &in) -> Json::Value {
181  utility::LogDebug("[Called HTTP API] /api/addIceCandidate");
182  std::string peerid;
183  if (req_info->query_string) {
184  CivetServer::getParam(req_info->query_string, "peerid", peerid);
185  }
186  return this->AddIceCandidate(peerid, in);
187  };
188 
189  func_["/api/hangup"] = [this](const struct mg_request_info *req_info,
190  const Json::Value &in) -> Json::Value {
191  utility::LogDebug("[Called HTTP API] /api/hangup");
192  std::string peerid;
193  if (req_info->query_string) {
194  CivetServer::getParam(req_info->query_string, "peerid", peerid);
195  }
196  return this->HangUp(peerid);
197  };
198 }
199 
201 
202 // Return deviceList as JSON vector.
204  Json::Value value(Json::arrayValue);
205 
206  for (const std::string &window_uid :
207  WebRTCWindowSystem::GetInstance()->GetWindowUIDs()) {
208  Json::Value media;
209  media["video"] = window_uid;
210  value.append(media);
211  }
212 
213  return value;
214 }
215 
216 // Return iceServers as JSON vector.
218  // This is a simplified version. The original version takes the client's IP
219  // and the server returns the best available STUN server.
220  Json::Value urls(Json::arrayValue);
221 
222  for (auto ice_server : ice_server_list_) {
223  Json::Value server;
224  Json::Value urlList(Json::arrayValue);
225  IceServer srv = GetIceServerFromUrl(ice_server);
226  urlList.append(srv.url);
227  server["urls"] = urlList;
228  if (srv.user.length() > 0) server["username"] = srv.user;
229  if (srv.pass.length() > 0) server["credential"] = srv.pass;
230  urls.append(server);
231  }
232 
233  Json::Value iceServers;
234  iceServers["iceServers"] = urls;
235 
236  return iceServers;
237 }
238 
239 // Get PeerConnection associated with peerid.
240 rtc::scoped_refptr<webrtc::PeerConnectionInterface>
241 PeerConnectionManager::GetPeerConnection(const std::string &peerid) {
242  rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection;
243  auto it = peerid_to_connection_.find(peerid);
244  if (it != peerid_to_connection_.end()) {
245  peer_connection = it->second->GetPeerConnection();
246  }
247  return peer_connection;
248 }
249 
250 // Add ICE candidate to a PeerConnection.
252  const std::string &peerid, const Json::Value &json_message) {
253  bool result = false;
254  std::string sdp_mid;
255  int sdp_mlineindex = 0;
256  std::string sdp;
257  if (!rtc::GetStringFromJsonObject(json_message, k_candidate_sdp_mid_name,
258  &sdp_mid) ||
259  !rtc::GetIntFromJsonObject(json_message,
261  &sdp_mlineindex) ||
262  !rtc::GetStringFromJsonObject(json_message, k_candidate_sdp_name,
263  &sdp)) {
264  utility::LogWarning("Can't parse received message.");
265  } else {
266  std::unique_ptr<webrtc::IceCandidateInterface> candidate(
267  webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, sdp,
268  nullptr));
269  if (!candidate.get()) {
270  utility::LogWarning("Can't parse received candidate message.");
271  } else {
272  bool dc_ready = false;
273  { // avoid holding lock in the else{} block
274  std::lock_guard<std::mutex> mutex_lock(
276  dc_ready = peerid_data_channel_ready_.count(peerid) > 0;
277  }
278  if (dc_ready) {
280  "DataChannels ready. Skipping AddIceCandidate.");
281  } else {
282  std::lock_guard<std::mutex> mutex_lock(
284  rtc::scoped_refptr<webrtc::PeerConnectionInterface>
285  peer_connection = this->GetPeerConnection(peerid);
286  if (peer_connection) {
287  if (!peer_connection->AddIceCandidate(candidate.get())) {
289  "Failed to apply the received candidate.");
290  } else {
291  result = true;
292  }
293  }
294  }
295  }
296  }
297  Json::Value answer;
298  if (result) {
299  answer = result;
300  }
301  return answer;
302 }
303 
304 // Auto-answer to a call.
305 const Json::Value PeerConnectionManager::Call(const std::string &peerid,
306  const std::string &window_uid,
307  const std::string &options,
308  const Json::Value &json_message) {
309  Json::Value answer;
310 
311  std::string type;
312  std::string sdp;
313 
314  if (!rtc::GetStringFromJsonObject(json_message,
316  !rtc::GetStringFromJsonObject(json_message,
318  utility::LogWarning("Can't parse received message.");
319  } else {
320  PeerConnectionObserver *peer_connection_observer =
321  this->CreatePeerConnection(peerid);
322  if (!peer_connection_observer) {
323  utility::LogError("Failed to initialize PeerConnectionObserver");
324  } else if (!peer_connection_observer->GetPeerConnection().get()) {
325  utility::LogError("Failed to initialize PeerConnection");
326  delete peer_connection_observer;
327  } else {
328  rtc::scoped_refptr<webrtc::PeerConnectionInterface>
329  peer_connection =
330  peer_connection_observer->GetPeerConnection();
331  utility::LogDebug("nbStreams local: {}, remote: {}",
332  peer_connection->local_streams()->count(),
333  peer_connection->remote_streams()->count());
334 
335  // Register peerid.
336  {
337  std::lock_guard<std::mutex> mutex_lock(
339  peerid_to_connection_.insert(
340  std::pair<std::string, PeerConnectionObserver *>(
341  peerid, peer_connection_observer));
342  }
343  {
344  std::lock_guard<std::mutex> mutex_lock(
346  window_uid_to_peerids_[window_uid].insert(peerid);
347  peerid_to_window_uid_[peerid] = window_uid;
348  }
349 
350  // Set remote offer.
351  webrtc::SessionDescriptionInterface *session_description(
352  webrtc::CreateSessionDescription(type, sdp, nullptr));
353  if (!session_description) {
355  "Can't parse received session description message. "
356  "Cannot create session description.");
357  } else {
358  std::promise<const webrtc::SessionDescriptionInterface *>
359  remote_promise;
360  peer_connection->SetRemoteDescription(
361  SetSessionDescriptionObserver::Create(peer_connection,
362  remote_promise),
363  session_description);
364  // Waiting for remote description.
365  std::future<const webrtc::SessionDescriptionInterface *>
366  remote_future = remote_promise.get_future();
367  if (remote_future.wait_for(std::chrono::milliseconds(5000)) ==
368  std::future_status::ready) {
369  utility::LogDebug("remote_description is ready.");
370  } else {
372  "remote_description is nullptr. Setting remote "
373  "description failed.");
374  }
375  }
376 
377  // Add local stream.
378  if (!this->AddStreams(peer_connection, window_uid, options)) {
379  utility::LogError("Can't add stream {}, {}.", window_uid,
380  options);
381  }
382 
383  // Create answer.
384  webrtc::PeerConnectionInterface::RTCOfferAnswerOptions rtc_options;
385  std::promise<const webrtc::SessionDescriptionInterface *>
386  local_promise;
387  peer_connection->CreateAnswer(
388  CreateSessionDescriptionObserver::Create(peer_connection,
389  local_promise),
390  rtc_options);
391 
392  // Waiting for answer.
393  std::future<const webrtc::SessionDescriptionInterface *>
394  local_future = local_promise.get_future();
395  if (local_future.wait_for(std::chrono::milliseconds(5000)) ==
396  std::future_status::ready) {
397  // Answer with the created answer.
398  const webrtc::SessionDescriptionInterface *desc =
399  local_future.get();
400  if (desc) {
401  std::string sdp;
402  desc->ToString(&sdp);
403 
404  answer[k_session_description_type_name] = desc->type();
405  answer[k_session_description_sdp_name] = sdp;
406  } else {
407  utility::LogError("Failed to create answer");
408  }
409  } else {
410  utility::LogError("Failed to create answer");
411  }
412  }
413  }
414  return answer;
415 }
416 
417 bool PeerConnectionManager::WindowStillUsed(const std::string &window_uid) {
418  bool still_used = false;
419  for (auto it : peerid_to_connection_) {
420  rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection =
421  it.second->GetPeerConnection();
422  rtc::scoped_refptr<webrtc::StreamCollectionInterface> local_streams(
423  peer_connection->local_streams());
424  for (unsigned int i = 0; i < local_streams->count(); i++) {
425  if (local_streams->at(i)->id() == window_uid) {
426  still_used = true;
427  break;
428  }
429  }
430  }
431  return still_used;
432 }
433 
434 // Hangup a call.
435 const Json::Value PeerConnectionManager::HangUp(const std::string &peerid) {
436  bool result = false;
437  PeerConnectionObserver *pc_observer = nullptr;
438  {
439  std::lock_guard<std::mutex> mutex_lock(peerid_to_connection_mutex_);
440  auto it = peerid_to_connection_.find(peerid);
441  if (it != peerid_to_connection_.end()) {
442  pc_observer = it->second;
443  utility::LogDebug("Remove PeerConnection peerid: {}", peerid);
444  peerid_to_connection_.erase(it);
445  }
446  if (peerid_to_window_uid_.count(peerid) != 0) {
447  std::lock_guard<std::mutex> mutex_lock(
449  const std::string window_uid = peerid_to_window_uid_.at(peerid);
450  peerid_to_window_uid_.erase(peerid);
451 
452  // After window_uid_to_peerids_[window_uid] becomes empty, we don't
453  // remove the window_uid from the map here. We remove window_uid
454  // from window_uid_to_peerids_ when the Window is closed.
455  window_uid_to_peerids_[window_uid].erase(peerid);
456  }
457 
458  if (pc_observer) {
459  rtc::scoped_refptr<webrtc::PeerConnectionInterface>
460  peer_connection = pc_observer->GetPeerConnection();
461 
462  rtc::scoped_refptr<webrtc::StreamCollectionInterface> local_streams(
463  peer_connection->local_streams());
464  for (unsigned int i = 0; i < local_streams->count(); i++) {
465  auto stream = local_streams->at(i);
466 
467  std::string window_uid = stream->id();
468  bool still_used = this->WindowStillUsed(window_uid);
469  if (!still_used) {
470  std::lock_guard<std::mutex> mlock(
472  auto it = window_uid_to_track_source_.find(window_uid);
473  if (it != window_uid_to_track_source_.end()) {
474  window_uid_to_track_source_.erase(it);
475  }
476  utility::LogDebug("HangUp stream closed {}.", window_uid);
477  }
478 
479  peer_connection->RemoveStream(stream);
480  }
481 
482  delete pc_observer;
483  result = true;
484  }
485  }
486  Json::Value answer;
487  if (result) {
488  answer = result;
489  }
490  return answer;
491 }
492 
493 const std::map<std::string, HttpServerRequestHandler::HttpFunction>
495  return func_;
496 }
497 
498 // Get list ICE candidate associated with a PeerConnection.
500  const std::string &peerid) {
501  Json::Value value;
502  std::lock_guard<std::mutex> mutex_lock(peerid_to_connection_mutex_);
503  auto it = peerid_to_connection_.find(peerid);
504  if (it != peerid_to_connection_.end()) {
505  PeerConnectionObserver *obs = it->second;
506  if (obs) {
507  value = obs->GetIceCandidateList();
508  } else {
509  utility::LogError("No observer for peer: {}.", peerid);
510  }
511  }
512  return value;
513 }
514 
515 // Check if factory is initialized.
517  return (peer_connection_factory_.get() != nullptr);
518 }
519 
520 // Create a new PeerConnection.
521 PeerConnectionManager::PeerConnectionObserver *
523  webrtc::PeerConnectionInterface::RTCConfiguration config;
524  for (auto ice_server : ice_server_list_) {
525  webrtc::PeerConnectionInterface::IceServer server;
526  IceServer srv = GetIceServerFromUrl(ice_server);
527  server.uri = srv.url;
528  server.username = srv.user;
529  server.password = srv.pass;
530  config.servers.push_back(server);
531  }
532 
533  // Use example From:
534  // https://soru.site/questions/51578447/api-c-webrtcyi-kullanarak-peerconnection-ve-ucretsiz-baglant-noktasn-serbest-nasl
535  int min_port = 0;
536  int max_port = 65535;
537  std::istringstream is(webrtc_port_range_);
538  std::string port;
539  if (std::getline(is, port, ':')) {
540  min_port = std::stoi(port);
541  if (std::getline(is, port, ':')) {
542  max_port = std::stoi(port);
543  }
544  }
545  std::unique_ptr<cricket::PortAllocator> port_allocator(
546  new cricket::BasicPortAllocator(new rtc::BasicNetworkManager()));
547  port_allocator->SetPortRange(min_port, max_port);
548  utility::LogDebug("CreatePeerConnection webrtcPortRange: {}:{}.", min_port,
549  max_port);
550  utility::LogDebug("CreatePeerConnection peerid: {}.", peerid);
551  PeerConnectionObserver *obs = new PeerConnectionObserver(
552  this, peerid, config, std::move(port_allocator));
553  if (!obs) {
554  utility::LogError("CreatePeerConnection failed.");
555  } else {
556  utility::LogDebug("CreatePeerConnection success!");
557  }
558  return obs;
559 }
560 
561 // Get the capturer from its URL.
562 rtc::scoped_refptr<BitmapTrackSourceInterface>
564  const std::string &window_uid,
565  const std::map<std::string, std::string> &opts) {
566  std::string video = window_uid;
567  if (config_.isMember(video)) {
568  video = config_[video]["video"].asString();
569  }
570 
571  return ImageTrackSource::Create(video, opts);
572 }
573 
574 // Add a stream to a PeerConnection.
576  webrtc::PeerConnectionInterface *peer_connection,
577  const std::string &window_uid,
578  const std::string &options) {
579  bool ret = false;
580 
581  // Compute options.
582  // Example options: "rtptransport=tcp&timeout=60"
583  std::string optstring = options;
584  if (config_.isMember(window_uid)) {
585  std::string urlopts = config_[window_uid]["options"].asString();
586  if (options.empty()) {
587  optstring = urlopts;
588  } else if (options.find_first_of("&") == 0) {
589  optstring = urlopts + options;
590  } else {
591  optstring = options;
592  }
593  }
594 
595  // Convert options string into map.
596  std::istringstream is(optstring);
597  std::map<std::string, std::string> opts;
598  std::string key, value;
599  while (std::getline(std::getline(is, key, '='), value, '&')) {
600  opts[key] = value;
601  }
602 
603  std::string video = window_uid;
604  if (config_.isMember(video)) {
605  video = config_[video]["video"].asString();
606  }
607 
608  // Set bandwidth.
609  if (opts.find("bitrate") != opts.end()) {
610  int bitrate = std::stoi(opts.at("bitrate"));
611 
612  webrtc::BitrateSettings bitrate_param;
613  bitrate_param.min_bitrate_bps = absl::optional<int>(bitrate / 2);
614  bitrate_param.start_bitrate_bps = absl::optional<int>(bitrate);
615  bitrate_param.max_bitrate_bps = absl::optional<int>(bitrate * 2);
616  peer_connection->SetBitrate(bitrate_param);
617  }
618 
619  bool existing_stream = false;
620  {
621  std::lock_guard<std::mutex> mlock(window_uid_to_track_source_mutex_);
622  existing_stream = (window_uid_to_track_source_.find(window_uid) !=
624  }
625 
626  if (!existing_stream) {
627  // Create a new stream and add to window_uid_to_track_source_.
628  rtc::scoped_refptr<BitmapTrackSourceInterface> video_source(
629  this->CreateVideoSource(video, opts));
630  std::lock_guard<std::mutex> mlock(window_uid_to_track_source_mutex_);
631  window_uid_to_track_source_[window_uid] = video_source;
632  }
633 
634  // AddTrack and AddStream to peer_connection.
635  {
636  std::lock_guard<std::mutex> mlock(window_uid_to_track_source_mutex_);
637  auto it = window_uid_to_track_source_.find(window_uid);
638  if (it != window_uid_to_track_source_.end()) {
639  rtc::scoped_refptr<webrtc::MediaStreamInterface> stream =
640  peer_connection_factory_->CreateLocalMediaStream(
641  window_uid);
642  if (!stream.get()) {
643  utility::LogError("Cannot create stream.");
644  } else {
645  rtc::scoped_refptr<BitmapTrackSourceInterface> video_source =
646  it->second;
647  rtc::scoped_refptr<webrtc::VideoTrackInterface> video_track;
648  if (!video_source) {
649  utility::LogError("Cannot create capturer video: {}.",
650  window_uid);
651  } else {
652  rtc::scoped_refptr<BitmapTrackSourceInterface> videoScaled =
654  opts);
655  video_track = peer_connection_factory_->CreateVideoTrack(
656  window_uid + "_video", videoScaled);
657  }
658 
659  if ((video_track) && (!stream->AddTrack(video_track))) {
661  "Adding VideoTrack to MediaStream failed.");
662  }
663 
664  if (!peer_connection->AddStream(stream)) {
665  utility::LogError("Adding stream to PeerConnection failed");
666  } else {
667  utility::LogDebug("Stream added to PeerConnection.");
668  ret = true;
669  }
670  }
671  } else {
672  utility::LogError("Cannot find stream.");
673  }
674  }
675 
676  return ret;
677 }
678 
679 // ICE callback.
680 void PeerConnectionManager::PeerConnectionObserver::OnIceCandidate(
681  const webrtc::IceCandidateInterface *candidate) {
682  std::string sdp;
683  if (!candidate->ToString(&sdp)) {
684  utility::LogError("Failed to serialize candidate.");
685  } else {
686  Json::Value json_message;
687  json_message[k_candidate_sdp_mid_name] = candidate->sdp_mid();
688  json_message[k_candidate_sdp_mline_index_name] =
689  candidate->sdp_mline_index();
690  json_message[k_candidate_sdp_name] = sdp;
691  ice_candidate_list_.append(json_message);
692  }
693 }
694 
695 rtc::scoped_refptr<BitmapTrackSourceInterface>
696 PeerConnectionManager::GetVideoTrackSource(const std::string &window_uid) {
697  {
698  std::lock_guard<std::mutex> mlock(window_uid_to_track_source_mutex_);
699  if (window_uid_to_track_source_.find(window_uid) ==
701  return nullptr;
702  } else {
703  return window_uid_to_track_source_.at(window_uid);
704  }
705  }
706 }
707 
708 void PeerConnectionManager::SendInitFramesToPeer(const std::string &peerid) {
709  std::lock_guard<std::mutex> mutex_lock(window_uid_to_peerids_mutex_);
710  const std::string window_uid = peerid_to_window_uid_.at(peerid);
711  WebRTCWindowSystem::GetInstance()->SendInitFrames(window_uid);
712 }
713 
715  const std::string &window_uid) {
716  utility::LogDebug("PeerConnectionManager::CloseWindowConnections: {}",
717  window_uid);
718  std::set<std::string> peerids;
719  {
720  std::lock_guard<std::mutex> mlock(window_uid_to_peerids_mutex_);
721  peerids = window_uid_to_peerids_.at(window_uid);
722  }
723  for (const std::string &peerid : peerids) {
724  HangUp(peerid);
725  }
726  {
727  std::lock_guard<std::mutex> mlock(window_uid_to_peerids_mutex_);
728  window_uid_to_track_source_.erase(window_uid);
729  }
730 }
731 
732 void PeerConnectionManager::OnFrame(const std::string &window_uid,
733  const std::shared_ptr<core::Tensor> &im) {
734  // Get the WebRTC stream that corresponds to the window_uid.
735  // video_track_source is nullptr if the server is running but no client is
736  // connected.
737  rtc::scoped_refptr<BitmapTrackSourceInterface> video_track_source =
738  GetVideoTrackSource(window_uid);
739  if (video_track_source) {
740  // TODO: this OnFrame(im); is a blocking call. Do we need to handle
741  // OnFrame in a separate thread? e.g. attach to a queue of frames, even
742  // if the queue size is just 1.
743  video_track_source->OnFrame(im);
744  }
745 }
746 
747 } // namespace webrtc_server
748 } // namespace visualization
749 } // namespace cloudViewer
char type
core::Tensor result
Definition: VtkUtils.cpp:76
static rtc::scoped_refptr< BitmapTrackSourceInterface > Create(const std::string &window_uid, const std::map< std::string, std::string > &opts)
Definition: ImageCapturer.h:58
void OnFrame(const std::string &window_uid, const std::shared_ptr< core::Tensor > &im)
std::unordered_map< std::string, std::set< std::string > > window_uid_to_peerids_
PeerConnectionObserver * CreatePeerConnection(const std::string &peerid)
const Json::Value GetIceCandidateList(const std::string &peerid)
rtc::scoped_refptr< webrtc::PeerConnectionFactoryInterface > peer_connection_factory_
rtc::scoped_refptr< BitmapTrackSourceInterface > GetVideoTrackSource(const std::string &window_uid)
std::unordered_map< std::string, rtc::scoped_refptr< BitmapTrackSourceInterface > > window_uid_to_track_source_
bool AddStreams(webrtc::PeerConnectionInterface *peer_connection, const std::string &window_uid, const std::string &options)
std::map< std::string, HttpServerRequestHandler::HttpFunction > func_
PeerConnectionManager(const std::list< std::string > &ice_server_list, const Json::Value &config, const std::string &publish_filter, const std::string &webrtc_udp_port_range)
std::unordered_map< std::string, PeerConnectionObserver * > peerid_to_connection_
std::unordered_map< std::string, std::string > peerid_to_window_uid_
const Json::Value Call(const std::string &peerid, const std::string &window_uid, const std::string &options, const Json::Value &json_message)
const std::map< std::string, HttpServerRequestHandler::HttpFunction > GetHttpApi()
rtc::scoped_refptr< webrtc::PeerConnectionInterface > GetPeerConnection(const std::string &peerid)
rtc::scoped_refptr< BitmapTrackSourceInterface > CreateVideoSource(const std::string &window_uid, const std::map< std::string, std::string > &opts)
const Json::Value AddIceCandidate(const std::string &peerid, const Json::Value &json_message)
static rtc::scoped_refptr< VideoFilter > Create(rtc::scoped_refptr< BitmapTrackSourceInterface > video_source, const std::map< std::string, std::string > &opts)
Definition: VideoFilter.h:37
static std::shared_ptr< WebRTCWindowSystem > GetInstance()
#define LogWarning(...)
Definition: Logging.h:72
#define LogError(...)
Definition: Logging.h:60
#define LogDebug(...)
Definition: Logging.h:90
static webrtc::PeerConnectionFactoryDependencies CreatePeerConnectionFactoryDependencies()
static IceServer GetIceServerFromUrl(const std::string &url)
Generic file read and write utility for python interface.