Skip to Content

2024年07月26日

本周计划

  • 中心服务
    • es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
      • 多路并发性能优化
      • H.264 文件的解复用传输
      • H.265 文件的解复用传输
      • webrtc p2p 调用数证管理服务器, http 文件索引分页 api
    • es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
      • 联调 h.264 文件的接收及播放
      • 联调 h.265 文件的接收及播放
      • 通过 webrtc p2p 调用数证管理服务器, http 文件索引分页 api, 实现数证管理服务器文件索引分页查询界面及媒体播放界面

2024年07月22日

  • 中心服务
    • es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
      • 多路并发性能优化
        • 已完成配置文件读取改为: 懒加载单例方式; 避免变量传递, 减少程序的复杂性
        • 已完成每路 webrtc 初始化改为: 异步方式
        • 已完成多路 webrtc 管理初始化为读写锁(写少读多的场景读写锁比互斥锁性能高), 并且某路 webrtc session 销毁时, 通知 webrtc session map 进行 remove 处理
    • 反复自测, 及时改进自测出来的问题
      • 已完成读取文件使用缓冲(tokio::io::BufReader), 优化文件性能

配置文件读取改进后代码

use super::*; use toml; #[derive(Deserialize, Debug, Clone)] pub struct InfoConfig { pub type_code: Option<String>, pub type_no: Option<String>, pub data_dir: Option<String>, } #[derive(Deserialize, Debug, Clone)] pub struct MqttConfig { pub id: Option<String>, pub host: Option<String>, pub port: Option<u16>, pub username: Option<String>, pub password: Option<String>, } #[derive(Deserialize, Debug, Clone)] pub struct WebrtcConfig { pub stun_server: Option<String>, pub turn_server: Option<String>, pub turn_username: Option<String>, pub turn_credential: Option<String>, } #[derive(Deserialize, Debug, Clone)] pub struct RootConfig { pub mqtt: Option<MqttConfig>, pub info: Option<InfoConfig>, pub webrtc: Option<WebrtcConfig>, } static ROOT_CONFIG_MAP: Lazy<Arc<RwLock<HashMap<String, RootConfig>>>> = Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); impl RootConfig { pub async fn from_file(file_path: &str) -> Result<Self, Box<dyn Error>> { let mut file_content = String::new(); File::open(file_path) .await? .read_to_string(&mut file_content) .await?; Ok(toml::from_str(&file_content)?) } pub async fn get(key: &str) -> Result<RootConfig, Box<dyn Error>> { let root_config_option = ROOT_CONFIG_MAP.read()?.get(key).cloned(); if let Some(root_config) = root_config_option { return Ok(root_config); } let root_config = RootConfig::from_file(key).await?; ROOT_CONFIG_MAP .write()? .insert(key.to_string(), root_config.clone()); Ok(root_config) } pub fn insert(key: &str, root_config: RootConfig) -> Result<(), Box<dyn Error>> { ROOT_CONFIG_MAP .write()? .insert(key.to_string(), root_config); Ok(()) } pub async fn get_default() -> Result<RootConfig, Box<dyn Error>> { RootConfig::get("config.toml").await } pub fn insert_default(root_config: RootConfig) -> Result<(), Box<dyn Error>> { RootConfig::insert("config.toml", root_config) } }

某路 webrtc session 销毁时, 通知 webrtc session map 进行 remove 处理

impl Drop for WebrtcSession { fn drop(&mut self) { self.operation_objects .local_sender .try_send(("stop".to_owned(), "".to_owned())) .unwrap_or_else(|e| log::error!("{:?}", e)); match WEBRTC_SESSION_MAP.write() { Ok(mut v) => { v.remove(&self.base.remote_session_id); } Err(e) => { log::error!("{:?}", e); } } } }

2024年07月23日

  • 中心服务
    • 反复自测, 及时改进自测出来的问题
      • 已完成``Arc<RwLock<HashMap<String, RootConfig>>>改为用 Arc<DashMap<String, RootConfig>>第三方库, 专门为并发设计的, 它的内部实现优化了并发访问的性能: https://lib.rs/crates/dashmap 
      • 已完成``Arc<RwLock<HashMap<String, WebrtcSession>>>改为用 Arc<DashMap<String, WebrtcSession>>第三方库, 专门为并发设计的, 它的内部实现优化了并发访问的性能: https://lib.rs/crates/dashmap 
      • 已完成``Arc::new(Mutex::new(Instant::now()))改为用 Arc::new(AtomicU64::new(Instant::now().elapsed().as_secs())), 避免了锁的使用, 在高并发环境下性能更好
      • 已完成尽量用 {}代码块写法, 限制变量的作用域来间接影响资源的释放时机。当变量离开其作用域时, 如果该变量拥有某个资源(比如, 它是该资源的唯一所有者), 则 Rust 的运行时会自动调用该变量的 Drop trait 的 drop 方法来释放资源
      • 已完成在分段读取文件二进程的传输过程中, 每次读完一部分后, 就判断 webrtc 连接如果断开做退出处理, 避免无意义的消耗
      • 已完成在批量读取文件二进程的传输过程中, 每次开始一个文件传输前, 就判断 webrtc 连接如果断开做退出处理, 避免无意义的消耗
      • 已完成数据通道由三个, 改为四个; 分别为: 控制, , , 其它; 控制用来交互信令; 其它用来发送图片文件、音频文件; 用来发送视频文件解复用后的视频帧; 用来发送视频文件解复用后的音频
    • 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
      • 已完成传输的二进制头部加上: 文件路径长度, 文件路径
    • es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
    • 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
      • 已完成解析接收到的文件分片二进制头部, 取出该二进制对应那个文件路径, 存入对应的 map 中
      • 已完成文件分片二进制全部接收完成, 进行合并处理
    • 反复自测, 及时改进自测出来的问题
      • 已完成数据通道由三个, 改为四个; 分别为: 控制, , , 其它; 控制用来交互信令; 其它用来接收图片文件、音频文件; 用来接收视频文件解复用后的视频帧; 用来接收视频文件解复用后的音频
      • 已完成webrtc 通讯结束后的多个异步任务资源回收: 定时判断ping超时异步任务, 接收 mqtt 信令异步任务, 接收控制用数据通道信令异步任务

服务端代码片断

async fn file_read_in_chunks( &self, transaction: String, data: String, ) -> Result<(), Box<dyn Error>> { let root_config = RootConfig::get_default().await?; let path = format!("{}{}", root_config.info.unwrap().data_dir.unwrap(), data); let file = File::open(path.clone()).await?; let file_len = file.metadata().await?.len(); let mut data_map: HashMap<String, Value> = HashMap::<String, Value>::new(); data_map.insert("path".to_owned(), Value::String(data.clone())); data_map.insert("file_len".to_owned(), Value::Number(file_len.into())); let data_channel_result_message_dto: DataChannelResultMessageDto<HashMap<String, Value>> = DataChannelResultMessageDto { transaction: transaction.clone(), signalling_type: Some("result-file-start".to_owned()), data: Some(data_map.clone()), }; let map_str = serde_json::to_string(&data_channel_result_message_dto)?; self.operation_objects .control_data_channel .send_text(map_str) .await?; let mut reader = BufReader::new(file); let read_size = 65000; let mut buffer = vec![0u8; read_size]; loop { let n = reader.read(&mut buffer).await?; if n == 0 { break; } if n < read_size { buffer.truncate(n); } let mut send_data = Vec::<u8>::new(); let file_path = data.as_bytes(); let file_path_len = file_path.len() as u32; send_data.extend_from_slice(&file_path_len.to_le_bytes()); send_data.extend_from_slice(&file_path); send_data.extend_from_slice(&buffer); let buf = Bytes::from(send_data); if let Err(e) = self.operation_objects.other_data_channel.send(&buf).await { log::error!("main_data_channel.send: {:?}", e); return Err(e.into()); }; loop { if matches!( self.operation_objects.peer_connection.connection_state(), RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Closed ) { return Err(format!( "peer connection {}", self.operation_objects.peer_connection.connection_state(), ) .into()); } let buffered_amount = self .operation_objects .other_data_channel .buffered_amount() .await; if buffered_amount + buf.len() > (1024 * 1024 * 10) { time::sleep(std::time::Duration::from_millis(50)).await; } else { break; } } }

前端代码片断

const [_file_chunks_map, { set: file_chunks_map_set, get: file_chunks_map_get, remove: file_chunks_map_remove }] = useMap<string, Uint8Array[]>([]); const webrtc_data_channel_message_fn = useMemoizedFn((type: String, message: any) => { webrtc_data_channel_data_add(`[${dayjs().format("YYYY-MM-DD HH:mm:ss.SSS")}] [${type} receiver] ${message}`); if (_.isString(message)) { let json = JSON.parse(message); const transaction = transactionMapGet(json?.transaction); if (transaction) { transaction(type, json); } } else { const receivedArrayBuffer = message as ArrayBuffer; const dataView = new DataView(receivedArrayBuffer); const headerLength = dataView.getUint32(0, true); const headerBytes = new Uint8Array(receivedArrayBuffer, 4, headerLength); const headerString = new TextDecoder().decode(headerBytes); const remainingBytes = new Uint8Array(receivedArrayBuffer, 4 + headerLength); const file_chunks = file_chunks_map_get(headerString) ?? []; file_chunks_map_set(headerString, [...file_chunks, remainingBytes]); } }); const webrtc_control_data_channel_message_fn = useMemoizedFn((message: any) => { webrtc_data_channel_message_fn("control", message); }); useEffect(() => { const type = "webrtc-control-data-channel-message"; globalMitt.on(type, webrtc_control_data_channel_message_fn); return () => { globalMitt.off(type, webrtc_control_data_channel_message_fn); }; }, [webrtc_control_data_channel_message_fn]); const webrtc_other_data_channel_message_fn = useMemoizedFn((message: any) => { webrtc_data_channel_message_fn("other", message); }); useEffect(() => { const type = "webrtc-other-data-channel-message"; globalMitt.on(type, webrtc_other_data_channel_message_fn); return () => { globalMitt.off(type, webrtc_other_data_channel_message_fn); }; }, [webrtc_other_data_channel_message_fn]);

2024年07月24日

  • 中心服务
    • 反复自测, 及时改进自测出来的问题
      • webrtc 数据通道传输完成某个文件的所有分片二进制后, 前端合并分片二进制后的文件大小与原始文件大小进行对比不相符: 比原始文件小一些
        • 已完成服务端传输协议进行调整: 每个分片二进制头部包含一段 json 字符串: 文件相对路径, 文件长度及当前已发分片总长库
        • 已完成前端用传输协议进行业务判断, 合并文件
    • es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
      • 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
        • 控制数据通道接收到的读取文件信令分流到两个异步任务中队列消费, 实现视频文件及其它文件的同时传输
          • 已完成视频文件异步任务中队列消费
          • 已完成其它文件异步任务中队列消费
      • H.264 文件的解复用传输
    • es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
      • webrtc 数据通道信令对应进行修改
        • 已完成读取其它文件信令为: other-file-read
        • 已完成读取视频文件信令为: video-file-read

2024年07月25日

  • 中心服务
    • 反复自测, 及时改进自测出来的问题
      • 已完成借助系统监视器之类的工具, 测试是否有内存泄露: 多处代码根据测试结果改造用使用弱引用指针 Weak
      • 已完成多路一起操作, 播放视频, 音频及图片: 优化响应速度
    • es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
    • es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
      • 联调 h.264 文件的接收及播放

2024年07月26日

  • 中心服务
    • es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
    • es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
      • 已完成数据通道交互调整, 区分: 文件完全读取, 文件流读取
      • 联调 h.264 文件的接收及播放
Last updated on