2024年07月26日
本周计划
- 中心服务
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
- 多路并发性能优化
- H.264 文件的解复用传输
- H.265 文件的解复用传输
- webrtc p2p 调用数证管理服务器, http 文件索引分页 api
- es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
- 联调 h.264 文件的接收及播放
- 联调 h.265 文件的接收及播放
- 通过 webrtc p2p 调用数证管理服务器, http 文件索引分页 api, 实现数证管理服务器文件索引分页查询界面及媒体播放界面
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
2024年07月22日
- 中心服务
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
- 多路并发性能优化
已完成配置文件读取改为: 懒加载单例方式;避免变量传递, 减少程序的复杂性已完成每路 webrtc 初始化改为: 异步方式已完成多路 webrtc 管理初始化为读写锁(写少读多的场景读写锁比互斥锁性能高), 并且某路 webrtc session 销毁时, 通知 webrtc session map 进行 remove 处理
- 多路并发性能优化
- 反复自测, 及时改进自测出来的问题
已完成读取文件使用缓冲(tokio::io::BufReader), 优化文件性能
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
配置文件读取改进后代码
use super::*;
use toml;
#[derive(Deserialize, Debug, Clone)]
pub struct InfoConfig {
pub type_code: Option<String>,
pub type_no: Option<String>,
pub data_dir: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct MqttConfig {
pub id: Option<String>,
pub host: Option<String>,
pub port: Option<u16>,
pub username: Option<String>,
pub password: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct WebrtcConfig {
pub stun_server: Option<String>,
pub turn_server: Option<String>,
pub turn_username: Option<String>,
pub turn_credential: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct RootConfig {
pub mqtt: Option<MqttConfig>,
pub info: Option<InfoConfig>,
pub webrtc: Option<WebrtcConfig>,
}
static ROOT_CONFIG_MAP: Lazy<Arc<RwLock<HashMap<String, RootConfig>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
impl RootConfig {
pub async fn from_file(file_path: &str) -> Result<Self, Box<dyn Error>> {
let mut file_content = String::new();
File::open(file_path)
.await?
.read_to_string(&mut file_content)
.await?;
Ok(toml::from_str(&file_content)?)
}
pub async fn get(key: &str) -> Result<RootConfig, Box<dyn Error>> {
let root_config_option = ROOT_CONFIG_MAP.read()?.get(key).cloned();
if let Some(root_config) = root_config_option {
return Ok(root_config);
}
let root_config = RootConfig::from_file(key).await?;
ROOT_CONFIG_MAP
.write()?
.insert(key.to_string(), root_config.clone());
Ok(root_config)
}
pub fn insert(key: &str, root_config: RootConfig) -> Result<(), Box<dyn Error>> {
ROOT_CONFIG_MAP
.write()?
.insert(key.to_string(), root_config);
Ok(())
}
pub async fn get_default() -> Result<RootConfig, Box<dyn Error>> {
RootConfig::get("config.toml").await
}
pub fn insert_default(root_config: RootConfig) -> Result<(), Box<dyn Error>> {
RootConfig::insert("config.toml", root_config)
}
}
某路 webrtc session 销毁时, 通知 webrtc session map 进行 remove 处理
impl Drop for WebrtcSession {
fn drop(&mut self) {
self.operation_objects
.local_sender
.try_send(("stop".to_owned(), "".to_owned()))
.unwrap_or_else(|e| log::error!("{:?}", e));
match WEBRTC_SESSION_MAP.write() {
Ok(mut v) => {
v.remove(&self.base.remote_session_id);
}
Err(e) => {
log::error!("{:?}", e);
}
}
}
}
2024年07月23日
- 中心服务
- 反复自测, 及时改进自测出来的问题
已完成``Arc<RwLock<HashMap<String, RootConfig>>>改为用Arc<DashMap<String, RootConfig>>第三方库, 专门为并发设计的, 它的内部实现优化了并发访问的性能: https://lib.rs/crates/dashmap已完成``Arc<RwLock<HashMap<String, WebrtcSession>>>改为用Arc<DashMap<String, WebrtcSession>>第三方库, 专门为并发设计的, 它的内部实现优化了并发访问的性能: https://lib.rs/crates/dashmap已完成``Arc::new(Mutex::new(Instant::now()))改为用Arc::new(AtomicU64::new(Instant::now().elapsed().as_secs())), 避免了锁的使用, 在高并发环境下性能更好已完成尽量用{}代码块写法, 限制变量的作用域来间接影响资源的释放时机。当变量离开其作用域时, 如果该变量拥有某个资源(比如, 它是该资源的唯一所有者), 则 Rust 的运行时会自动调用该变量的 Drop trait 的 drop 方法来释放资源已完成在分段读取文件二进程的传输过程中, 每次读完一部分后, 就判断 webrtc 连接如果断开做退出处理, 避免无意义的消耗已完成在批量读取文件二进程的传输过程中, 每次开始一个文件传输前, 就判断 webrtc 连接如果断开做退出处理, 避免无意义的消耗已完成数据通道由三个, 改为四个; 分别为:控制,主,从,其它;控制用来交互信令;其它用来发送图片文件、音频文件;主用来发送视频文件解复用后的视频帧;从用来发送视频文件解复用后的音频
- 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
已完成传输的二进制头部加上: 文件路径长度, 文件路径
- es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
- 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
已完成解析接收到的文件分片二进制头部, 取出该二进制对应那个文件路径, 存入对应的 map 中已完成文件分片二进制全部接收完成, 进行合并处理
- 反复自测, 及时改进自测出来的问题
已完成数据通道由三个, 改为四个; 分别为:控制,主,从,其它;控制用来交互信令;其它用来接收图片文件、音频文件;主用来接收视频文件解复用后的视频帧;从用来接收视频文件解复用后的音频已完成webrtc 通讯结束后的多个异步任务资源回收: 定时判断ping超时异步任务, 接收 mqtt 信令异步任务, 接收控制用数据通道信令异步任务
- 反复自测, 及时改进自测出来的问题
服务端代码片断
async fn file_read_in_chunks(
&self,
transaction: String,
data: String,
) -> Result<(), Box<dyn Error>> {
let root_config = RootConfig::get_default().await?;
let path = format!("{}{}", root_config.info.unwrap().data_dir.unwrap(), data);
let file = File::open(path.clone()).await?;
let file_len = file.metadata().await?.len();
let mut data_map: HashMap<String, Value> = HashMap::<String, Value>::new();
data_map.insert("path".to_owned(), Value::String(data.clone()));
data_map.insert("file_len".to_owned(), Value::Number(file_len.into()));
let data_channel_result_message_dto: DataChannelResultMessageDto<HashMap<String, Value>> =
DataChannelResultMessageDto {
transaction: transaction.clone(),
signalling_type: Some("result-file-start".to_owned()),
data: Some(data_map.clone()),
};
let map_str = serde_json::to_string(&data_channel_result_message_dto)?;
self.operation_objects
.control_data_channel
.send_text(map_str)
.await?;
let mut reader = BufReader::new(file);
let read_size = 65000;
let mut buffer = vec![0u8; read_size];
loop {
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
if n < read_size {
buffer.truncate(n);
}
let mut send_data = Vec::<u8>::new();
let file_path = data.as_bytes();
let file_path_len = file_path.len() as u32;
send_data.extend_from_slice(&file_path_len.to_le_bytes());
send_data.extend_from_slice(&file_path);
send_data.extend_from_slice(&buffer);
let buf = Bytes::from(send_data);
if let Err(e) = self.operation_objects.other_data_channel.send(&buf).await {
log::error!("main_data_channel.send: {:?}", e);
return Err(e.into());
};
loop {
if matches!(
self.operation_objects.peer_connection.connection_state(),
RTCPeerConnectionState::Failed
| RTCPeerConnectionState::Disconnected
| RTCPeerConnectionState::Closed
) {
return Err(format!(
"peer connection {}",
self.operation_objects.peer_connection.connection_state(),
)
.into());
}
let buffered_amount = self
.operation_objects
.other_data_channel
.buffered_amount()
.await;
if buffered_amount + buf.len() > (1024 * 1024 * 10) {
time::sleep(std::time::Duration::from_millis(50)).await;
} else {
break;
}
}
}
前端代码片断
const [_file_chunks_map, { set: file_chunks_map_set, get: file_chunks_map_get, remove: file_chunks_map_remove }] = useMap<string, Uint8Array[]>([]);
const webrtc_data_channel_message_fn = useMemoizedFn((type: String, message: any) => {
webrtc_data_channel_data_add(`[${dayjs().format("YYYY-MM-DD HH:mm:ss.SSS")}] [${type} receiver] ${message}`);
if (_.isString(message)) {
let json = JSON.parse(message);
const transaction = transactionMapGet(json?.transaction);
if (transaction) {
transaction(type, json);
}
} else {
const receivedArrayBuffer = message as ArrayBuffer;
const dataView = new DataView(receivedArrayBuffer);
const headerLength = dataView.getUint32(0, true);
const headerBytes = new Uint8Array(receivedArrayBuffer, 4, headerLength);
const headerString = new TextDecoder().decode(headerBytes);
const remainingBytes = new Uint8Array(receivedArrayBuffer, 4 + headerLength);
const file_chunks = file_chunks_map_get(headerString) ?? [];
file_chunks_map_set(headerString, [...file_chunks, remainingBytes]);
}
});
const webrtc_control_data_channel_message_fn = useMemoizedFn((message: any) => {
webrtc_data_channel_message_fn("control", message);
});
useEffect(() => {
const type = "webrtc-control-data-channel-message";
globalMitt.on(type, webrtc_control_data_channel_message_fn);
return () => {
globalMitt.off(type, webrtc_control_data_channel_message_fn);
};
}, [webrtc_control_data_channel_message_fn]);
const webrtc_other_data_channel_message_fn = useMemoizedFn((message: any) => {
webrtc_data_channel_message_fn("other", message);
});
useEffect(() => {
const type = "webrtc-other-data-channel-message";
globalMitt.on(type, webrtc_other_data_channel_message_fn);
return () => {
globalMitt.off(type, webrtc_other_data_channel_message_fn);
};
}, [webrtc_other_data_channel_message_fn]);2024年07月24日
- 中心服务
- 反复自测, 及时改进自测出来的问题
- webrtc 数据通道传输完成某个文件的所有分片二进制后, 前端合并分片二进制后的文件大小与原始文件大小进行对比不相符: 比原始文件小一些
已完成服务端传输协议进行调整: 每个分片二进制头部包含一段 json 字符串: 文件相对路径, 文件长度及当前已发分片总长库已完成前端用传输协议进行业务判断, 合并文件
- webrtc 数据通道传输完成某个文件的所有分片二进制后, 前端合并分片二进制后的文件大小与原始文件大小进行对比不相符: 比原始文件小一些
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
- 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
- 控制数据通道接收到的读取文件信令分流到两个异步任务中队列消费, 实现视频文件及其它文件的同时传输
已完成视频文件异步任务中队列消费已完成其它文件异步任务中队列消费
- 控制数据通道接收到的读取文件信令分流到两个异步任务中队列消费, 实现视频文件及其它文件的同时传输
- H.264 文件的解复用传输
已完成ffmpeg 安装已完成https://github.com/zmwangx/rust-ffmpeg 集成已完成https://github.com/zmwangx/rust-ffmpeg 下载源代码, 研究使用细节
- 并发传输文件处理: 多个图片、音频、视频一起传输; 以前是排队方式
- es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
- webrtc 数据通道信令对应进行修改
已完成读取其它文件信令为:other-file-read已完成读取视频文件信令为:video-file-read
- webrtc 数据通道信令对应进行修改
- 反复自测, 及时改进自测出来的问题
2024年07月25日
- 中心服务
- 反复自测, 及时改进自测出来的问题
已完成借助系统监视器之类的工具, 测试是否有内存泄露: 多处代码根据测试结果改造用使用弱引用指针 Weak已完成多路一起操作, 播放视频, 音频及图片: 优化响应速度
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
- H.264 文件的解复用传输
已完成https://github.com/zmwangx/rust-ffmpeg 下载源代码, 研究使用细节
- H.264 文件的解复用传输
- es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
- 联调 h.264 文件的接收及播放
- 反复自测, 及时改进自测出来的问题
2024年07月26日
- 中心服务
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
已完成数据通道交互调整, 区分: 文件完全读取, 文件流读取- H.264 文件的解复用传输
已完成https://github.com/zmwangx/rust-ffmpeg 下载源代码, 研究使用细节
- es-center-server-webrtc-app: 开发与 es-center-server-webrtc-service 联调用的播放端, 联调通过后将移入中心服务历史回放模块
已完成数据通道交互调整, 区分: 文件完全读取, 文件流读取- 联调 h.264 文件的接收及播放
已完成https://github.com/videojs/mux.js 下载源代码, 研究使用细节
- es-center-server-webrtc-service: 部署在数证管理服务器上, 实现回放文件的 p2p 传输, 支持编译为 linux, windows 程序
Last updated on