21 #include <api/peer_connection_interface.h>
22 #include <rtc_base/strings/json.h>
29 #include <unordered_map>
36 namespace visualization {
37 namespace webrtc_server {
76 class VideoSink :
public rtc::VideoSinkInterface<webrtc::VideoFrame> {
78 VideoSink(webrtc::VideoTrackInterface* track) : track_(track) {
79 track_->AddOrUpdateSink(
this, rtc::VideoSinkWants());
81 virtual ~VideoSink() { track_->RemoveSink(
this); }
84 virtual void OnFrame(
const webrtc::VideoFrame& video_frame) {
85 rtc::scoped_refptr<webrtc::I420BufferInterface> buffer(
86 video_frame.video_frame_buffer()->ToI420());
88 buffer->height(), buffer->width());
92 rtc::scoped_refptr<webrtc::VideoTrackInterface> track_;
95 class SetSessionDescriptionObserver
96 :
public webrtc::SetSessionDescriptionObserver {
98 static SetSessionDescriptionObserver* Create(
99 webrtc::PeerConnectionInterface* pc,
100 std::promise<const webrtc::SessionDescriptionInterface*>&
102 return new rtc::RefCountedObject<SetSessionDescriptionObserver>(
105 virtual void OnSuccess() {
107 if (pc_->local_description()) {
108 promise_.set_value(pc_->local_description());
109 pc_->local_description()->ToString(&sdp);
110 }
else if (pc_->remote_description()) {
111 promise_.set_value(pc_->remote_description());
112 pc_->remote_description()->ToString(&sdp);
115 virtual void OnFailure(webrtc::RTCError
error) {
117 promise_.set_value(
nullptr);
121 SetSessionDescriptionObserver(
122 webrtc::PeerConnectionInterface* pc,
123 std::promise<const webrtc::SessionDescriptionInterface*>&
125 : pc_(pc), promise_(promise) {};
128 webrtc::PeerConnectionInterface* pc_;
129 std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
132 class CreateSessionDescriptionObserver
133 :
public webrtc::CreateSessionDescriptionObserver {
135 static CreateSessionDescriptionObserver* Create(
136 webrtc::PeerConnectionInterface* pc,
137 std::promise<const webrtc::SessionDescriptionInterface*>&
139 return new rtc::RefCountedObject<CreateSessionDescriptionObserver>(
142 virtual void OnSuccess(webrtc::SessionDescriptionInterface* desc) {
144 desc->ToString(&sdp);
145 pc_->SetLocalDescription(
146 SetSessionDescriptionObserver::Create(pc_, promise_), desc);
148 virtual void OnFailure(webrtc::RTCError
error) {
150 promise_.set_value(
nullptr);
154 CreateSessionDescriptionObserver(
155 webrtc::PeerConnectionInterface* pc,
156 std::promise<const webrtc::SessionDescriptionInterface*>&
158 : pc_(pc), promise_(promise) {};
161 webrtc::PeerConnectionInterface* pc_;
162 std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
165 class PeerConnectionStatsCollectorCallback
166 :
public webrtc::RTCStatsCollectorCallback {
168 PeerConnectionStatsCollectorCallback() {}
169 void clearReport() { report_.
clear(); }
170 Json::Value getReport() {
return report_; }
173 virtual void OnStatsDelivered(
174 const rtc::scoped_refptr<const webrtc::RTCStatsReport>&
176 for (
const webrtc::RTCStats& stats : *report) {
177 Json::Value stats_members;
178 for (
const webrtc::RTCStatsMemberInterface* member :
180 stats_members[member->name()] = member->ValueToString();
182 report_[stats.id()] = stats_members;
189 class DataChannelObserver :
public webrtc::DataChannelObserver {
193 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
194 const std::string& peerid)
195 : peer_connection_manager_(peer_connection_manager),
196 data_channel_(data_channel),
198 data_channel_->RegisterObserver(
this);
200 virtual ~DataChannelObserver() { data_channel_->UnregisterObserver(); }
203 virtual void OnStateChange() {
205 const std::string label = data_channel_->label();
206 const std::string state =
207 webrtc::DataChannelInterface::DataStateString(
208 data_channel_->state());
210 "DataChannelObserver::OnStateChange label: {}, state: {}, "
212 label, state, peerid_);
213 std::string msg(label +
" " + state);
214 webrtc::DataBuffer buffer(msg);
215 data_channel_->Send(buffer);
220 if (label ==
"ClientDataChannel" && state ==
"open") {
222 std::lock_guard<std::mutex> mutex_lock(
223 peer_connection_manager_
225 peer_connection_manager_->peerid_data_channel_ready_.insert(
228 peer_connection_manager_->SendInitFramesToPeer(peerid_);
230 if (label ==
"ClientDataChannel" &&
231 (state ==
"closed" || state ==
"closing")) {
232 std::lock_guard<std::mutex> mutex_lock(
233 peer_connection_manager_->peerid_data_channel_mutex_);
234 peer_connection_manager_->peerid_data_channel_ready_.erase(
238 virtual void OnMessage(
const webrtc::DataBuffer& buffer) {
239 std::string msg((
const char*)buffer.data.data(),
242 data_channel_->label(), msg);
246 if (!reply.empty()) {
247 webrtc::DataBuffer buffer(reply);
248 data_channel_->Send(buffer);
254 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
255 const std::string peerid_;
258 class PeerConnectionObserver :
public webrtc::PeerConnectionObserver {
260 PeerConnectionObserver(
262 const std::string& peerid,
263 const webrtc::PeerConnectionInterface::RTCConfiguration& config,
264 std::unique_ptr<cricket::PortAllocator> port_allocator)
265 : peer_connection_manager_(peer_connection_manager),
267 local_channel_(
nullptr),
268 remote_channel_(
nullptr),
269 ice_candidate_list_(Json::arrayValue),
271 pc_ = peer_connection_manager_->peer_connection_factory_
272 ->CreatePeerConnection(config,
273 std::move(port_allocator),
277 rtc::scoped_refptr<webrtc::DataChannelInterface> channel =
278 pc_->CreateDataChannel(
"ServerDataChannel",
nullptr);
279 local_channel_ =
new DataChannelObserver(
280 peer_connection_manager_, channel, peerid_);
283 stats_callback_ =
new rtc::RefCountedObject<
284 PeerConnectionStatsCollectorCallback>();
287 virtual ~PeerConnectionObserver() {
288 delete local_channel_;
289 delete remote_channel_;
299 Json::Value GetStats() {
300 stats_callback_->clearReport();
301 pc_->GetStats(stats_callback_);
303 while ((stats_callback_->getReport().empty()) && (--
count > 0)) {
304 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
306 return Json::Value(stats_callback_->getReport());
309 rtc::scoped_refptr<webrtc::PeerConnectionInterface>
315 virtual void OnAddStream(
316 rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
319 stream->GetVideoTracks().size());
320 webrtc::VideoTrackVector videoTracks = stream->GetVideoTracks();
321 if (videoTracks.size() > 0) {
322 video_sink_.reset(
new VideoSink(videoTracks.at(0)));
325 virtual void OnRemoveStream(
326 rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
329 virtual void OnDataChannel(
330 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
332 "PeerConnectionObserver::OnDataChannel peerid: {}",
334 remote_channel_ =
new DataChannelObserver(peer_connection_manager_,
337 virtual void OnRenegotiationNeeded() {
338 std::lock_guard<std::mutex> mutex_lock(
339 peer_connection_manager_->peerid_data_channel_mutex_);
340 peer_connection_manager_->peerid_data_channel_ready_.erase(peerid_);
342 "PeerConnectionObserver::OnRenegotiationNeeded peerid: {}",
345 virtual void OnIceCandidate(
346 const webrtc::IceCandidateInterface* candidate);
348 virtual void OnSignalingChange(
349 webrtc::PeerConnectionInterface::SignalingState state) {
352 virtual void OnIceConnectionChange(
353 webrtc::PeerConnectionInterface::IceConnectionState state) {
355 webrtc::PeerConnectionInterface::kIceConnectionFailed) ||
357 webrtc::PeerConnectionInterface::kIceConnectionClosed)) {
358 ice_candidate_list_.clear();
360 std::thread([
this]() {
361 peer_connection_manager_->HangUp(peerid_);
367 virtual void OnIceGatheringChange(
368 webrtc::PeerConnectionInterface::IceGatheringState) {}
372 const std::string peerid_;
373 rtc::scoped_refptr<webrtc::PeerConnectionInterface> pc_;
374 DataChannelObserver* local_channel_;
375 DataChannelObserver* remote_channel_;
376 Json::Value ice_candidate_list_;
377 rtc::scoped_refptr<PeerConnectionStatsCollectorCallback>
379 std::unique_ptr<VideoSink> video_sink_;
385 const Json::Value& config,
386 const std::string& publish_filter,
387 const std::string& webrtc_udp_port_range);
391 const std::map<std::string, HttpServerRequestHandler::HttpFunction>
396 const Json::Value& json_message);
398 const Json::Value
HangUp(
const std::string& peerid);
399 const Json::Value
Call(
const std::string& peerid,
400 const std::string& window_uid,
401 const std::string& options,
402 const Json::Value& json_message);
409 void OnFrame(
const std::string& window_uid,
410 const std::shared_ptr<core::Tensor>& im);
414 const std::string& window_uid);
416 bool AddStreams(webrtc::PeerConnectionInterface* peer_connection,
417 const std::string& window_uid,
418 const std::string& options);
420 const std::string& window_uid,
421 const std::map<std::string, std::string>& opts);
424 const std::string& peerid);
428 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
432 std::unordered_map<std::string, PeerConnectionObserver*>
440 std::unordered_map<std::string,
441 rtc::scoped_refptr<BitmapTrackSourceInterface>>
446 std::unordered_map<std::string, std::set<std::string>>
455 std::map<std::string, HttpServerRequestHandler::HttpFunction>
func_;
466 struct formatter<webrtc::PeerConnectionInterface::SignalingState> {
467 template <
typename FormatContext>
468 auto format(
const webrtc::PeerConnectionInterface::SignalingState& state,
469 FormatContext& ctx)
const -> decltype(ctx.out()) {
470 using namespace webrtc;
471 const char* text =
nullptr;
473 case PeerConnectionInterface::SignalingState::kStable:
476 case PeerConnectionInterface::SignalingState::kHaveLocalOffer:
477 text =
"kHaveLocalOffer";
479 case PeerConnectionInterface::SignalingState::kHaveLocalPrAnswer:
480 text =
"kHaveLocalPrAnswer";
482 case PeerConnectionInterface::SignalingState::kHaveRemoteOffer:
483 text =
"kHaveRemoteOffer";
485 case PeerConnectionInterface::SignalingState::kHaveRemotePrAnswer:
486 text =
"kHaveRemotePrAnswer";
488 case PeerConnectionInterface::SignalingState::kClosed:
494 return format_to(ctx.out(),
"{}", text);
497 template <
typename ParseContext>
498 constexpr
auto parse(ParseContext& ctx) -> decltype(ctx.begin()) {
#define CLOUDVIEWER_FUNCTION
void clear() override
Clears the entity from all its points and features.
std::mutex window_uid_to_track_source_mutex_
const std::regex publish_filter_
std::mutex peerid_to_connection_mutex_
const Json::Value GetMediaList()
std::mutex peerid_data_channel_mutex_
std::mutex window_uid_to_peerids_mutex_
void OnFrame(const std::string &window_uid, const std::shared_ptr< core::Tensor > &im)
std::list< std::string > ice_server_list_
virtual ~PeerConnectionManager()
void CloseWindowConnections(const std::string &window_uid)
std::unordered_map< std::string, std::set< std::string > > window_uid_to_peerids_
PeerConnectionObserver * CreatePeerConnection(const std::string &peerid)
const Json::Value GetIceCandidateList(const std::string &peerid)
const Json::Value GetIceServers()
const Json::Value config_
rtc::scoped_refptr< webrtc::PeerConnectionFactoryInterface > peer_connection_factory_
rtc::scoped_refptr< BitmapTrackSourceInterface > GetVideoTrackSource(const std::string &window_uid)
std::unordered_map< std::string, rtc::scoped_refptr< BitmapTrackSourceInterface > > window_uid_to_track_source_
bool AddStreams(webrtc::PeerConnectionInterface *peer_connection, const std::string &window_uid, const std::string &options)
std::map< std::string, HttpServerRequestHandler::HttpFunction > func_
std::unordered_set< std::string > peerid_data_channel_ready_
PeerConnectionManager(const std::list< std::string > &ice_server_list, const Json::Value &config, const std::string &publish_filter, const std::string &webrtc_udp_port_range)
std::unordered_map< std::string, PeerConnectionObserver * > peerid_to_connection_
std::unordered_map< std::string, std::string > peerid_to_window_uid_
const Json::Value Call(const std::string &peerid, const std::string &window_uid, const std::string &options, const Json::Value &json_message)
std::string webrtc_port_range_
std::unique_ptr< webrtc::TaskQueueFactory > task_queue_factory_
const std::map< std::string, HttpServerRequestHandler::HttpFunction > GetHttpApi()
rtc::scoped_refptr< webrtc::PeerConnectionInterface > GetPeerConnection(const std::string &peerid)
bool InitializePeerConnection()
rtc::scoped_refptr< BitmapTrackSourceInterface > CreateVideoSource(const std::string &window_uid, const std::map< std::string, std::string > &opts)
bool WindowStillUsed(const std::string &window_uid)
const Json::Value HangUp(const std::string &peerid)
void SendInitFramesToPeer(const std::string &peerid)
const Json::Value AddIceCandidate(const std::string &peerid, const Json::Value &json_message)
static std::shared_ptr< WebRTCWindowSystem > GetInstance()
static void error(char *msg)
Generic file read and write utility for python interface.