事情是这样的,我最近正在写一个cloudmeeting-视频会议项目。环境是VS+mvsc+vcpkg(中间环境变更和重构就不再说了),使用了libdatachannel和ffmpeg以及QT,opus+h264编码,提供RTMP+webRTC两种推流方式,其中做了很多C++新特性的安全措施,并且设计了安全的分级日志系统。前段时间卡在libdatachannel的集成上面很久,现在有AI的加成下,自己只需要简单读读源码中的例子就能够开始写代码了,这的确是方便不少,但是这样的方式内心也难免会忐忑,每每出了问题都需要重新去看库源码,不敢相信AI的判断,这样浪费时间也是这一次卡了这么久的原因。下面展示一下我的源码,并引出问题部分

mainwindow初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/// mainwindow.cpp
//WebRTC推流线程
m_webRTCPublisherThread = new QThread(this);
m_webRTCPublisher = new WebRTCPublisher(m_publishPacketQueue);
m_webRTCPublisher->moveToThread(m_webRTCPublisherThread);
m_webRTCPublisherThread->start();
QMetaObject::invokeMethod(m_webRTCPublisher, "initThread", Qt::QueuedConnection);

void MainWindow::on_createmeetBtn_clicked() {
connect(m_audioEncoder, &ffmpegEncoder::initializationSuccess, this, &MainWindow::audioEncoderReady);
connect(m_videoEncoder, &ffmpegEncoder::initializationSuccess, this, &MainWindow::videoEncoderReady);
if (m_isAudioRunning || m_isVideoRunning) {
ui->createmeetBtn->setEnabled(true);
qDebug() << "Joining meeting...";
QString webRTCsignalingUrl = "http://10.0.0.10:1985/rtc/v1/publish/";/// 方便演示,写死链接
QString webRTCstreamUrl = "webrtc://10.0.0.10/live/teststream"; // WebRTC 的流地址

AVCodecContext *videoCtx = m_videoEncoder->getCodecContext();
AVCodecContext *audioCtx = m_audioEncoder->getCodecContext();

if (!videoCtx || !audioCtx) {
QMessageBox::warning(this, "Error", "Encoders are not ready yet.");
return;
}
QMetaObject::invokeMethod(m_webRTCPublisher, "init", Qt::QueuedConnection,
Q_ARG(QString, webRTCsignalingUrl),
Q_ARG(QString, webRTCstreamUrl));

QMetaObject::invokeMethod(m_webRTCPublisher, "startPublishing", Qt::QueuedConnection);
} else {
ui->createmeetBtn->setEnabled(false);
}
}

推流初始化

1
2
3
4
5
void WebRTCPublisher::initThread() {
m_networkManager = new QNetworkAccessManager(this); /// 这里是重点,将一个QNetworkAccessManager放在了m_webRTCPublisherThread线程中
rtcPreload();
connect(m_networkManager, &QNetworkAccessManager::finished, this, &WebRTCPublisher::onSignalingReply);
}

值得注意的是,QNetworkAccessManager 及其创建的 QNetworkReply 对象尤其依赖于它们所属线程的事件循环来处理网络操作的异步结果(如图 DNS 查询完成、连接建立、数据接收、请求完成或出错等)。这些内部操作最终会通过 finished, errorOccurred 等信号通知你的代码。因此,你必须在 QNetworkAccessManager 对象所属的同一个线程中创建 (post, get 等) 和操作 QNetworkReply 对象,并且该线程必须有一个正在运行的事件循环。

设置回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
vvoid WebRTCPublisher::initializePeerConnection() {
try {
m_peerConnection = std::make_unique<rtc::PeerConnection>(m_rtcConfig);

/// description回调
m_peerConnection->onLocalDescription([this](const rtc::Description &description) {
auto descriptionToString = [](rtc::Description::Type type) {
switch (type) {
case rtc::Description::Type::Unspec: return "Unspec";
case rtc::Description::Type::Offer: return "Offer";
case rtc::Description::Type::Answer: return "Answer";
case rtc::Description::Type::Pranswer: return "Pranswer";
case rtc::Description::Type::Rollback: return "Rollback";
default: return "Unknown";
}
};
WRITE_LOG("WebRTC PeerConnection description: %s", descriptionToString(description.type()));
});
//// PC State回调
m_peerConnection->onStateChange([this](rtc::PeerConnection::State state) {
auto stateToString = [](rtc::PeerConnection::State s) {
switch (s) {
case rtc::PeerConnection::State::New: return "New";
case rtc::PeerConnection::State::Connecting: return "Connecting";
case rtc::PeerConnection::State::Connected: return "Connected";
case rtc::PeerConnection::State::Disconnected: return "Disconnected";
case rtc::PeerConnection::State::Failed: return "Failed";
case rtc::PeerConnection::State::Closed: return "Closed";
default: return "Unknown";
}
};
WRITE_LOG("WebRTC PeerConnection state changed: %s", stateToString(state));

if (state == rtc::PeerConnection::State::Connected) {
emit publisherStarted();
} else if (state == rtc::PeerConnection::State::Failed) {
emit errorOccurred("WebRTC connection failed.");
stopPublishing();
}
});
/// Signaling State回调
m_peerConnection->onSignalingStateChange([](rtc::PeerConnection::SignalingState state) {
auto stateToString = [](rtc::PeerConnection::SignalingState s) {
switch (s) {
case rtc::PeerConnection::SignalingState::Stable: return "Stable";
case rtc::PeerConnection::SignalingState::HaveLocalOffer: return "HaveLocalOffer";
case rtc::PeerConnection::SignalingState::HaveRemoteOffer: return "HaveRemoteOffer";
case rtc::PeerConnection::SignalingState::HaveLocalPranswer: return "HaveLocalPranswer";
default: return "Unknown";
}
};
WRITE_LOG("Signaling state changed: %s", stateToString(state));
});
/// ICE State回调 出错点
m_peerConnection->onGatheringStateChange([this](rtc::PeerConnection::GatheringState state) {
auto stateToString = [](rtc::PeerConnection::GatheringState s) {
switch (s) {
case rtc::PeerConnection::GatheringState::New: return "New";
case rtc::PeerConnection::GatheringState::InProgress: return "InProgress";
case rtc::PeerConnection::GatheringState::Complete: return "Complete";
default: return "Unknown";
}
};
WRITE_LOG("WebRTC ICE Gathering state changed: %s", stateToString(state));

if (state == rtc::PeerConnection::GatheringState::Complete) {
auto description = m_peerConnection->localDescription();
if (description.has_value()) {
WRITE_LOG("ICE Gathering complete. Sending offer to server.");
std::string sdp_offer = description.value();
// QMetaObject::invokeMethod(this, [this, sdp_offer]() {
// sendOfferToSignalingServer(sdp_offer);
// }, Qt::QueuedConnection);
sendOfferToSignalingServer(sdp_offer);
} else {
WRITE_LOG("CRITICAL: ICE Gathering is complete, but local description is NOT available.");
emit errorOccurred("Failed to get local SDP description after ICE gathering.");
}
}
});

rtc::Description::Video video("video", rtc::Description::Direction::SendOnly);
video.addH264Codec(96, std::nullopt);
m_videoTrack = m_peerConnection->addTrack(video);
WRITE_LOG("Video track (H.264) added.");
rtc::Description::Audio audio("audio", rtc::Description::Direction::SendOnly);

// 添加 Opus 编解码器,使用 payload type111
// 第二个参数 profile 是可选的
audio.addOpusCodec(111, std::nullopt);
m_audioTrack = m_peerConnection->addTrack(audio);
WRITE_LOG("Audio track (Opus) added.");

// 在添加 tracks 后立即设置本地描述
m_peerConnection->setLocalDescription();
} catch (const std::exception &e) {
QString error = QString("Failed to create PeerConnection: %1").arg(e.what());
WRITE_LOG(error.toStdString().c_str());
emit errorOccurred(error);
}
}

这里需要格外注意ICE State回调中GatheringState为complete的时候,需要使用lamda函数。这里也就是踩坑的地方

lamda函数调用与普通调用

整个过程中,首先创建了m_webRTCPublisherThread,并将 m_webRTCPublisher 对象 moveToThread 到了这个线程。接下来调用了 initThread,在 initThread 中创建了 m_networkManager = new QNetworkAccessManager(this);。这意味着 m_networkManager 也属于 m_webRTCPublisherThread。并且m_webRTCPublisherThread具备自己的事件循环。

如果像上述代码,直接调用sendOfferToSignalingServer(sdp_offer); onGatheringStateChange回调函数会在libdatachannel的某个内部线程执行,同样的,sendOfferToSignalingServer也会在这个线程内部开始执行,主要包括m_networkManager->post()尝试创建一些内部对象,比如 QNetworkReply,并且可能还会创建 QTimer 或其他需要事件循环的对象。这时Qt 检测到你试图在一个与其父对象 (m_networkManager) 不同的线程中创建这些子对象或使用依赖事件循环的功能,因此这个线程根本就不会被执行。

而使用lamda调用是一种经典的跨线程调用,这里的this指的就是webRTCPublisher,因此可以保证是在m_webRTCPublisherThread执行的函数,与m_networkManager也不会冲突。因此也可以正确执行

排错过程

最经典的还是加日志,wireshark抓包,SRS修改日志级别。


本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

undefined