Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Bug] UDP sockets are not released after RTCPeerConnection is closed #739

Open
@cyberkunlun

Description

udp 16128 0 0.0.0.0:51353 0.0.0.0:* 25057/./rtw off (0.00/0/0)
udp 215040 0 224.0.0.251:5353 0.0.0.0:* 25057/./rtw off (0.00/0/0)
unix 3 [ ] STREAM CONNECTED 4163554844 25057/./rtw
unix 3 [ ] STREAM CONNECTED 4163526477 25057/./rtw

let mut pcs = self.pcs.write().await;
info!("-----------------pcs.len:{:?}", pcs.len());
info!("-----------------output:{:?}", key);
if let Some(pc) = pcs.remove(&key) {
info!("start clean peerconnect");
let count = Arc::strong_count(&pc);
let weak_count = Arc::weak_count(&pc);
info!(
" start PC strong_count={}, weak_count={}",
count, weak_count
);
let senders = pc.get_senders().await;
for s in senders {
if let Err(e) = s.stop().await {
warn!("Failed to stop sender: {:?}", e);
}
}
let receiver = pc.get_receivers().await;
for r in receiver {
if let Err(e) = r.stop().await {
warn!("Failed to stop sender: {:?}", e);
}
}
let tran = pc.get_transceivers().await;
for t in tran {
if let Err(e) = t.stop().await {
warn!("Failed to stop sender: {:?}", e);
}
}
if let Err(e) = pc.close().await {
warn!("Failed to close peer connection: {:?}", e);
}

 if let Err(_) = timeout(Duration::from_secs(10), async {
 while pc.ice_connection_state()
 != webrtc::ice_transport::ice_connection_state::RTCIceConnectionState::Closed
 {
 tokio::time::sleep(Duration::from_secs(1)).await;
 }
 })
 .await
 {
 warn!("Timed out waiting for ICE agent to close")
 }
 let count = Arc::strong_count(&pc);
 let weak_count = Arc::weak_count(&pc);
 info!(" end PC strong_count={}, weak_count={}", count, weak_count);
 drop(pc);
 info!("-----------------safe delete:{:?}", key);
 }
 info!("Stopped stream for camera: {:?}", &key);
 }

use crate::CONFIG;
use anyhow::{Context, Ok};
use futures::{future::FutureExt, pin_mut, select};
use std::sync::Arc;
use tracing::{info, info_span, warn, Instrument};
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::policy::ice_transport_policy::RTCIceTransportPolicy;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;

pub async fn create_answer(
offer: RTCSessionDescription,
video_track: Arc,
) -> anyhow::Result<(RTCSessionDescription, Arc)> {
// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();

m.register_default_codecs()?;
// Create an InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create an InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new();
// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;
// Create the API object with the MediaEngine
let api = APIBuilder::new()
 .with_media_engine(m)
 .with_interceptor_registry(registry)
 .build();
// Prepare the configuration
let turn_server = &CONFIG.conf.turn_server;
let turn_user = &CONFIG.conf.turn_username;
let turn_pwd = &CONFIG.conf.turn_pwd;
let config = RTCConfiguration {
 ice_transport_policy: RTCIceTransportPolicy::Relay,
 ice_servers: vec![RTCIceServer {
 urls: vec![turn_server.into()],
 username: turn_user.to_owned(),
 credential: turn_pwd.to_owned(),
 ..Default::default()
 }],
 ..Default::default()
};
// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
// Add this newly created track to the PeerConnection
let rtp_sender = peer_connection.add_track(video_track).await?;
// Channel to send a signal when the client gets disconnected so we can clean up
let (disconnected_tx, mut disconnected_rx) = tokio::sync::mpsc::channel::<()>(1);
// Task to read data from the client
tokio::spawn(
 async move {
 let mut rtcp_buf = vec![0u8; 1500];
 let disconnected_fut = disconnected_rx.recv().fuse();
 pin_mut!(disconnected_fut);
 loop {
 // Read incoming RTCP packets
 // Before these packets are returned they are processed by interceptors.
 // For things like NACK this needs to be called.
 let recv_rtcp_fut = rtp_sender.read(&mut rtcp_buf).fuse();
 pin_mut!(recv_rtcp_fut);
 select! {
 _ = disconnected_fut => {
 info!("Client is disconnected; cleaning up RTP sender");
 let _ = rtp_sender.stop().await;
 break;
 },
 // Nothing to actually do with the RTCP packets, but we're supposed to read them, anyway
 _ = recv_rtcp_fut => {}
 }
 }
 anyhow::Result::<()>::Ok(())
 }
 .instrument(info_span!("client_loop")),
);
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
 warn!("===================state chage to : {:?}", &s);
 if matches!(
 s,
 RTCPeerConnectionState::Failed
 | RTCPeerConnectionState::Closed
 | RTCPeerConnectionState::Disconnected
 )
 // if s == RTCPeerConnectionState::Failed
 // || s == RTCPeerConnectionState::Closed
 // || s == RTCPeerConnectionState::Disconnected
 {
 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
 warn!("Peer Connection state is Failed; cleaning up connection.");
 // Send the disconnected signal so we can clean up the connection objects and stop trying to send data to a connection that's no longer active.
 let _ = disconnected_tx.try_send(());
 }
 Box::pin(async {})
}));
// Set the remote SessionDescription
peer_connection.set_remote_description(offer).await?;
// Create an answer
let answer = peer_connection.create_answer(None).await?;
// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;
// Sets the LocalDescription, and starts our UDP listeners
peer_connection.set_local_description(answer).await?;
// Block until ICE Gathering is complete, disabling trickle ICE
let _ = gather_complete.recv().await;
// Output the answer
let local = peer_connection
 .local_description()
 .await
 .context("no local description")?;
Ok((local, peer_connection))

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

      Relationships

      None yet

      Development

      No branches or pull requests

      Issue actions

        AltStyle によって変換されたページ (->オリジナル) /