pass session event objects through safe::mail_t

This commit is contained in:
loki
2021-06-22 22:26:11 +02:00
parent cf9eb961fc
commit 7e3abefc2c
10 changed files with 122 additions and 91 deletions

View File

@@ -150,10 +150,6 @@ public:
};
struct broadcast_ctx_t {
video::packet_queue_t video_packets;
audio::packet_queue_t audio_packets;
std::shared_ptr<safe::post_t<safe::signal_t>> broadcast_shutdown_event;
message_queue_queue_t message_queue_queue;
std::thread recv_thread;
@@ -170,6 +166,9 @@ struct broadcast_ctx_t {
struct session_t {
config_t config;
safe::mail_t mail;
std::shared_ptr<input::input_t> input;
std::thread audioThread;
@@ -182,7 +181,7 @@ struct session_t {
struct {
int lowseq;
udp::endpoint peer;
video::idr_event_t idr_events;
safe::mail_raw_t::event_t<video::idr_t> idr_events;
} video;
struct {
@@ -197,7 +196,7 @@ struct session_t {
crypto::aes_t gcm_key;
crypto::aes_t iv;
safe::signal_t shutdown_event;
safe::mail_raw_t::event_t<bool> shutdown_event;
safe::signal_t controlEnd;
std::atomic<session::state_e> state;
@@ -402,7 +401,7 @@ std::vector<uint8_t> replace(const std::string_view &original, const std::string
return replaced;
}
void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *server) {
void controlBroadcastThread(control_server_t *server) {
server->map(packetTypes[IDX_START_A], [&](session_t *session, const std::string_view &payload) {
BOOST_LOG(debug) << "type [IDX_START_A]"sv;
});
@@ -466,6 +465,7 @@ void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *se
input::passthrough(session->input, std::move(plaintext));
});
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
while(!shutdown_event->peek()) {
{
auto lg = server->_map_addr_session.lock();
@@ -507,7 +507,7 @@ void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *se
auto lg = server->_map_addr_session.lock();
for(auto pos = std::begin(*server->_map_addr_session); pos != std::end(*server->_map_addr_session); ++pos) {
auto session = pos->second.second;
session->shutdown_event.raise(true);
session->shutdown_event->raise(true);
}
}
@@ -522,8 +522,10 @@ void recvThread(broadcast_ctx_t &ctx) {
auto &video_sock = ctx.video_sock;
auto &audio_sock = ctx.audio_sock;
auto &message_queue_queue = ctx.message_queue_queue;
auto &io = ctx.io;
auto &message_queue_queue = ctx.message_queue_queue;
auto broadcast_shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
auto &io = ctx.io;
udp::endpoint peer;
@@ -594,12 +596,15 @@ void recvThread(broadcast_ctx_t &ctx) {
video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]);
audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]);
while(!ctx.broadcast_shutdown_event->peek()) {
while(!broadcast_shutdown_event->peek()) {
io.run();
}
}
void videoBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, video::packet_queue_t packets) {
void videoBroadcastThread(udp::socket &sock) {
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
auto packets = mail::man->queue<video::packet_t>(mail::video_packets);
while(auto packet = packets->pop()) {
if(shutdown_event->peek()) {
break;
@@ -693,7 +698,10 @@ void videoBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, vid
shutdown_event->raise(true);
}
void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, audio::packet_queue_t packets) {
void audioBroadcastThread(udp::socket &sock) {
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
auto packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
while(auto packet = packets->pop()) {
if(shutdown_event->peek()) {
break;
@@ -722,8 +730,6 @@ void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, aud
}
int start_broadcast(broadcast_ctx_t &ctx) {
ctx.broadcast_shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
if(ctx.control_server.bind(CONTROL_PORT)) {
BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << CONTROL_PORT << "], likely another process already bound to the port"sv;
@@ -759,13 +765,11 @@ int start_broadcast(broadcast_ctx_t &ctx) {
return -1;
}
ctx.video_packets = std::make_shared<video::packet_queue_t::element_type>(30);
ctx.audio_packets = std::make_shared<audio::packet_queue_t::element_type>(30);
ctx.message_queue_queue = std::make_shared<message_queue_queue_t::element_type>(30);
ctx.video_thread = std::thread { videoBroadcastThread, ctx.broadcast_shutdown_event.get(), std::ref(ctx.video_sock), ctx.video_packets };
ctx.audio_thread = std::thread { audioBroadcastThread, ctx.broadcast_shutdown_event.get(), std::ref(ctx.audio_sock), ctx.audio_packets };
ctx.control_thread = std::thread { controlBroadcastThread, ctx.broadcast_shutdown_event.get(), &ctx.control_server };
ctx.video_thread = std::thread { videoBroadcastThread, std::ref(ctx.video_sock) };
ctx.audio_thread = std::thread { audioBroadcastThread, std::ref(ctx.audio_sock) };
ctx.control_thread = std::thread { controlBroadcastThread, &ctx.control_server };
ctx.recv_thread = std::thread { recvThread, std::ref(ctx) };
@@ -773,11 +777,16 @@ int start_broadcast(broadcast_ctx_t &ctx) {
}
void end_broadcast(broadcast_ctx_t &ctx) {
ctx.broadcast_shutdown_event->raise(true);
auto broadcast_shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
broadcast_shutdown_event->raise(true);
auto video_packets = mail::man->queue<video::packet_t>(mail::video_packets);
auto audio_packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
// Minimize delay stopping video/audio threads
ctx.video_packets->stop();
ctx.audio_packets->stop();
video_packets->stop();
audio_packets->stop();
ctx.message_queue_queue->stop();
ctx.io.stop();
@@ -785,8 +794,8 @@ void end_broadcast(broadcast_ctx_t &ctx) {
ctx.video_sock.close();
ctx.audio_sock.close();
ctx.video_packets.reset();
ctx.audio_packets.reset();
video_packets.reset();
audio_packets.reset();
BOOST_LOG(debug) << "Waiting for main listening thread to end..."sv;
ctx.recv_thread.join();
@@ -798,7 +807,7 @@ void end_broadcast(broadcast_ctx_t &ctx) {
ctx.control_thread.join();
BOOST_LOG(debug) << "All broadcasting threads ended"sv;
ctx.broadcast_shutdown_event->reset();
broadcast_shutdown_event->reset();
}
int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) {
@@ -850,7 +859,7 @@ void videoThread(session_t *session, std::string addr_str) {
session->video.peer.port(port);
BOOST_LOG(debug) << "Start capturing Video"sv;
video::capture(&session->shutdown_event, ref->video_packets, session->video.idr_events, session->config.monitor, session);
video::capture(session->mail, session->config.monitor, session);
}
void audioThread(session_t *session, std::string addr_str) {
@@ -872,7 +881,7 @@ void audioThread(session_t *session, std::string addr_str) {
session->audio.peer.port(port);
BOOST_LOG(debug) << "Start capturing Audio"sv;
audio::capture(&session->shutdown_event, ref->audio_packets, session->config.audio, session);
audio::capture(session->mail, session->config.audio, session);
}
namespace session {
@@ -888,7 +897,7 @@ void stop(session_t &session) {
return;
}
session.shutdown_event.raise(true);
session.shutdown_event->raise(true);
}
void join(session_t &session) {
@@ -905,7 +914,7 @@ void join(session_t &session) {
}
int start(session_t &session, const std::string &addr_string) {
session.input = input::alloc();
session.input = input::alloc(session.mail);
session.broadcast_ref = broadcast.ref();
if(!session.broadcast_ref) {
@@ -927,11 +936,15 @@ int start(session_t &session, const std::string &addr_string) {
std::shared_ptr<session_t> alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) {
auto session = std::make_shared<session_t>();
auto mail = std::make_shared<safe::mail_raw_t>();
session->shutdown_event = mail->event<bool>(mail::shutdown);
session->config = config;
session->gcm_key = gcm_key;
session->iv = iv;
session->video.idr_events = std::make_shared<video::idr_event_t::element_type>();
session->video.idr_events = mail->event<video::idr_t>(mail::idr);
session->video.lowseq = 0;
session->audio.frame = 1;
@@ -939,6 +952,8 @@ std::shared_ptr<session_t> alloc(config_t &config, crypto::aes_t &gcm_key, crypt
session->control.peer = nullptr;
session->state.store(state_e::STOPPED, std::memory_order_relaxed);
session->mail = std::move(mail);
return session;
}
} // namespace session