-
-
Notifications
You must be signed in to change notification settings - Fork 440
Description
udp 16128 0 0.0.0.0:51353 0.0.0.0:* 25057/./rtw off (0.00/0/0)
udp 215040 0 224.0.0.251:5353 0.0.0.0:* 25057/./rtw off (0.00/0/0)
unix 3 [ ] STREAM CONNECTED 4163554844 25057/./rtw
unix 3 [ ] STREAM CONNECTED 4163526477 25057/./rtw
let mut pcs = self.pcs.write().await;
info!("-----------------pcs.len:{:?}", pcs.len());
info!("-----------------output:{:?}", key);
if let Some(pc) = pcs.remove(&key) {
info!("start clean peerconnect");
let count = Arc::strong_count(&pc);
let weak_count = Arc::weak_count(&pc);
info!(
" start PC strong_count={}, weak_count={}",
count, weak_count
);
let senders = pc.get_senders().await;
for s in senders {
if let Err(e) = s.stop().await {
warn!("Failed to stop sender: {:?}", e);
}
}
let receiver = pc.get_receivers().await;
for r in receiver {
if let Err(e) = r.stop().await {
warn!("Failed to stop sender: {:?}", e);
}
}
let tran = pc.get_transceivers().await;
for t in tran {
if let Err(e) = t.stop().await {
warn!("Failed to stop sender: {:?}", e);
}
}
if let Err(e) = pc.close().await {
warn!("Failed to close peer connection: {:?}", e);
}
if let Err(_) = timeout(Duration::from_secs(10), async {
while pc.ice_connection_state()
!= webrtc::ice_transport::ice_connection_state::RTCIceConnectionState::Closed
{
tokio::time::sleep(Duration::from_secs(1)).await;
}
})
.await
{
warn!("Timed out waiting for ICE agent to close")
}
let count = Arc::strong_count(&pc);
let weak_count = Arc::weak_count(&pc);
info!(" end PC strong_count={}, weak_count={}", count, weak_count);
drop(pc);
info!("-----------------safe delete:{:?}", key);
}
info!("Stopped stream for camera: {:?}", &key);
}
use crate::CONFIG;
use anyhow::{Context, Ok};
use futures::{future::FutureExt, pin_mut, select};
use std::sync::Arc;
use tracing::{info, info_span, warn, Instrument};
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::policy::ice_transport_policy::RTCIceTransportPolicy;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
pub async fn create_answer(
offer: RTCSessionDescription,
video_track: Arc,
) -> anyhow::Result<(RTCSessionDescription, Arc)> {
// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();
m.register_default_codecs()?;
// Create an InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create an InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new();
// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;
// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
// Prepare the configuration
let turn_server = &CONFIG.conf.turn_server;
let turn_user = &CONFIG.conf.turn_username;
let turn_pwd = &CONFIG.conf.turn_pwd;
let config = RTCConfiguration {
ice_transport_policy: RTCIceTransportPolicy::Relay,
ice_servers: vec![RTCIceServer {
urls: vec![turn_server.into()],
username: turn_user.to_owned(),
credential: turn_pwd.to_owned(),
..Default::default()
}],
..Default::default()
};
// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
// Add this newly created track to the PeerConnection
let rtp_sender = peer_connection.add_track(video_track).await?;
// Channel to send a signal when the client gets disconnected so we can clean up
let (disconnected_tx, mut disconnected_rx) = tokio::sync::mpsc::channel::<()>(1);
// Task to read data from the client
tokio::spawn(
async move {
let mut rtcp_buf = vec![0u8; 1500];
let disconnected_fut = disconnected_rx.recv().fuse();
pin_mut!(disconnected_fut);
loop {
// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors.
// For things like NACK this needs to be called.
let recv_rtcp_fut = rtp_sender.read(&mut rtcp_buf).fuse();
pin_mut!(recv_rtcp_fut);
select! {
_ = disconnected_fut => {
info!("Client is disconnected; cleaning up RTP sender");
let _ = rtp_sender.stop().await;
break;
},
// Nothing to actually do with the RTCP packets, but we're supposed to read them, anyway
_ = recv_rtcp_fut => {}
}
}
anyhow::Result::<()>::Ok(())
}
.instrument(info_span!("client_loop")),
);
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
warn!("===================state chage to : {:?}", &s);
if matches!(
s,
RTCPeerConnectionState::Failed
| RTCPeerConnectionState::Closed
| RTCPeerConnectionState::Disconnected
)
// if s == RTCPeerConnectionState::Failed
// || s == RTCPeerConnectionState::Closed
// || s == RTCPeerConnectionState::Disconnected
{
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
warn!("Peer Connection state is Failed; cleaning up connection.");
// Send the disconnected signal so we can clean up the connection objects and stop trying to send data to a connection that's no longer active.
let _ = disconnected_tx.try_send(());
}
Box::pin(async {})
}));
// Set the remote SessionDescription
peer_connection.set_remote_description(offer).await?;
// Create an answer
let answer = peer_connection.create_answer(None).await?;
// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;
// Sets the LocalDescription, and starts our UDP listeners
peer_connection.set_local_description(answer).await?;
// Block until ICE Gathering is complete, disabling trickle ICE
let _ = gather_complete.recv().await;
// Output the answer
let local = peer_connection
.local_description()
.await
.context("no local description")?;
Ok((local, peer_connection))
}