diff --git a/sunshine/audio.cpp b/sunshine/audio.cpp index e577a01f..f1bfefd6 100644 --- a/sunshine/audio.cpp +++ b/sunshine/audio.cpp @@ -80,12 +80,11 @@ void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t confi } } -void capture(packet_queue_t packets, config_t config, void *channel_data) { +void capture(safe::signal_t *shutdown_event, packet_queue_t packets, config_t config, void *channel_data) { auto samples = std::make_shared(); std::thread thread { encodeThread, packets, samples, config, channel_data }; auto fg = util::fail_guard([&]() { - packets->stop(); samples->stop(); thread.join(); }); @@ -103,7 +102,7 @@ void capture(packet_queue_t packets, config_t config, void *channel_data) { auto frame_size = config.packetDuration * stream->sampleRate / 1000; int samples_per_frame = frame_size * stream->channelCount; - while(packets->running()) { + while(!shutdown_event->peek()) { std::vector sample_buffer; sample_buffer.resize(samples_per_frame); diff --git a/sunshine/audio.h b/sunshine/audio.h index 1628d029..4e27c8c0 100644 --- a/sunshine/audio.h +++ b/sunshine/audio.h @@ -12,7 +12,7 @@ struct config_t { using packet_t = util::buffer_t; using packet_queue_t = std::shared_ptr>>; -void capture(packet_queue_t packets, config_t config, void *channel_data); +void capture(safe::signal_t *shutdown_event, packet_queue_t packets, config_t config, void *channel_data); } #endif diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index 4e03ad44..53100688 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -686,7 +686,7 @@ int create_creds(const std::string &pkey, const std::string &cert) { return 0; } -void start(std::shared_ptr> shutdown_event) { +void start(std::shared_ptr shutdown_event) { if(!fs::exists(config::nvhttp.pkey) || !fs::exists(config::nvhttp.cert)) { if(create_creds(config::nvhttp.pkey, config::nvhttp.cert)) { shutdown_event->raise(true); diff --git a/sunshine/nvhttp.h b/sunshine/nvhttp.h index 2e19f70d..eb90540a 100644 --- a/sunshine/nvhttp.h +++ b/sunshine/nvhttp.h @@ -15,7 +15,7 @@ #define CERTIFICATE_FILE CA_DIR "/cacert.pem" namespace nvhttp { -void start(std::shared_ptr> shutdown_event); +void start(std::shared_ptr shutdown_event); } #endif //SUNSHINE_NVHTTP_H diff --git a/sunshine/rtsp.cpp b/sunshine/rtsp.cpp index fa10729d..549e342b 100644 --- a/sunshine/rtsp.cpp +++ b/sunshine/rtsp.cpp @@ -46,6 +46,10 @@ public: explicit rtsp_server_t(std::uint16_t port) : _session_slots (config::stream.channels), _host {net::host_create(_addr, 1, port) } {} + ~rtsp_server_t() { + stop(); + } + template void iterate(std::chrono::duration timeout) { ENetEvent event; @@ -114,22 +118,26 @@ public: _map_cmd_cb.emplace(type, std::move(cb)); } - void stop() { + void stop(bool all = true) { for(auto &slot : _session_slots) { - auto session = slot.lock(); + if (slot && (all || session::state(*slot) == session::state_e::STOPPING)) { + session::stop(*slot); + session::join(*slot); - if (!session) { - continue; + slot.reset(); } + } - ::stream::stop(*session); - ::stream::join(*session); + if(all) { + std::for_each(_host->peers, _host->peers + _host->peerCount, [](auto &peer) { + enet_peer_disconnect_now(&peer, 0); + }); } } bool accept(const std::shared_ptr &session) { for(auto &slot : _session_slots) { - if(slot.expired()) { + if(!slot) { slot = session; return true; @@ -150,7 +158,7 @@ private: std::unordered_map _map_cmd_cb; - std::vector> _session_slots; + std::vector> _session_slots; ENetAddress _addr; net::host_t _host; @@ -367,7 +375,7 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { return; } - auto session = alloc_session(config, launch_session->gcm_key, launch_session->iv); + auto session = session::alloc(config, launch_session->gcm_key, launch_session->iv); if(!server->accept(session)) { BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']'; @@ -375,7 +383,7 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { return; } - start_session(session, platf::from_sockaddr((sockaddr*)&peer->address.address)); + session::start(*session, platf::from_sockaddr((sockaddr*)&peer->address.address)); respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); } @@ -392,7 +400,7 @@ void cmd_play(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); } -void rtpThread(std::shared_ptr> shutdown_event) { +void rtpThread(std::shared_ptr shutdown_event) { input = std::make_shared(); auto fg = util::fail_guard([&]() { input.reset(); @@ -409,9 +417,15 @@ void rtpThread(std::shared_ptr> shutdown_event) { while(!shutdown_event->peek()) { server.iterate(std::min(500ms, config::stream.ping_timeout)); - } - server.stop(); + if(broadcast_shutdown_event.peek()) { + server.stop(); + } + else { + // cleanup all stopped sessions + server.stop(false); + } + } } void print_msg(PRTSP_MESSAGE msg) { diff --git a/sunshine/rtsp.h b/sunshine/rtsp.h index f94ed36b..b75d3b80 100644 --- a/sunshine/rtsp.h +++ b/sunshine/rtsp.h @@ -18,7 +18,7 @@ struct launch_session_t { extern safe::event_t launch_event; -void rtpThread(std::shared_ptr> shutdown_event); +void rtpThread(std::shared_ptr shutdown_event); } diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 12c7970f..f52659db 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -62,13 +62,6 @@ enum class socket_e : int { audio }; -enum class state_e : int { - STOPPED, - STOPPING, - STARTING, - RUNNING, -}; - #pragma pack(push, 1) struct video_packet_raw_t { @@ -96,23 +89,20 @@ using audio_packet_t = util::c_ptr; using message_queue_t = std::shared_ptr>>; using message_queue_queue_t = std::shared_ptr>>; -using session_queue_t = std::shared_ptr>>>; +using session_queue_t = std::shared_ptr>>; struct broadcast_ctx_t { - safe::event_t shutdown_event; - video::packet_queue_t video_packets; audio::packet_queue_t audio_packets; message_queue_queue_t message_queue_queue; session_queue_t session_queue; + std::thread recv_thread; std::thread video_thread; std::thread audio_thread; std::thread control_thread; - std::thread recv_thread; - asio::io_service io; udp::socket video_sock { io, udp::endpoint(udp::v4(), VIDEO_STREAM_PORT) }; @@ -136,17 +126,17 @@ struct session_t { crypto::aes_t gcm_key; crypto::aes_t iv; - std::atomic state; + safe::signal_t shutdown_event; + std::atomic state; }; -void videoThread(std::shared_ptr session, std::string addr_str); -void audioThread(std::shared_ptr session, std::string addr_str); - int start_broadcast(broadcast_ctx_t &ctx); void end_broadcast(broadcast_ctx_t &ctx); std::shared_ptr input; + static auto broadcast = safe::make_shared(start_broadcast, end_broadcast); +safe::signal_t broadcast_shutdown_event; class control_server_t { public: @@ -155,27 +145,30 @@ public: explicit control_server_t(session_queue_t session_queue, std::uint16_t port) : session_queue { session_queue }, _host { net::host_create(_addr, config::stream.channels, port) } {} + void populate_addr_to_session() { + while(session_queue->peek()) { + auto session_opt = session_queue->pop(); + if(!session_opt) { + break; + } + TUPLE_2D_REF(addr_string, session, *session_opt); + + if(session) { + _map_addr_session.try_emplace(addr_string, session).second; + } + else { + _map_addr_session.erase(addr_string); + } + } + } + template void iterate(std::chrono::duration timeout) { ENetEvent event; auto res = enet_host_service(_host.get(), &event, std::chrono::floor(timeout).count()); + populate_addr_to_session(); if(res > 0) { - while(session_queue->peek()) { - auto session_opt = session_queue->pop(); - if(!session_opt) { - return; - } - - TUPLE_2D_REF(addr_string, session, *session_opt); - - if(session) { - _map_addr_session.try_emplace(addr_string, session); - } - else { - _map_addr_session.erase(addr_string); - } - } auto addr_string = platf::from_sockaddr((sockaddr*)&event.peer->address.address); auto it = _map_addr_session.find(addr_string); @@ -206,7 +199,7 @@ public: } else { - cb->second(session.get(), payload); + cb->second(session, payload); } } break; @@ -216,8 +209,8 @@ public: case ENET_EVENT_TYPE_DISCONNECT: BOOST_LOG(info) << "CLIENT DISCONNECTED"sv; // No more clients to send video data to ^_^ - if(session->state == state_e::RUNNING) { - stop(*session); + if(session->state == session::state_e::RUNNING) { + session::stop(*session); } break; case ENET_EVENT_TYPE_NONE: @@ -233,7 +226,7 @@ public: void send(const std::string_view &payload); std::unordered_map> _map_type_cb; - std::unordered_map> _map_addr_session; + std::unordered_map _map_addr_session; session_queue_t session_queue; @@ -356,7 +349,7 @@ void control_server_t::send(const std::string_view & payload) { enet_host_flush(_host.get()); } -void controlBroadcastThread(safe::event_t *shutdown_event, session_queue_t session_queue) { +void controlBroadcastThread(safe::signal_t *shutdown_event, session_queue_t session_queue) { control_server_t server { session_queue, CONTROL_PORT }; server.map(packetTypes[IDX_START_A], [&](session_t *session, const std::string_view &payload) { @@ -411,7 +404,7 @@ void controlBroadcastThread(safe::event_t *shutdown_event, session_queue_t BOOST_LOG(error) << "Failed to verify tag"sv; - stop(*session); + session::stop(*session); } if(tagged_cipher_length >= 16 + session->iv.size()) { @@ -427,7 +420,7 @@ void controlBroadcastThread(safe::event_t *shutdown_event, session_queue_t for(auto &[addr,session] : server._map_addr_session) { if(now > session->pingTimeout) { BOOST_LOG(info) << addr << ": Ping Timeout"sv; - stop(*session); + session::stop(*session); } } @@ -442,7 +435,8 @@ void controlBroadcastThread(safe::event_t *shutdown_event, session_queue_t server.send(std::string_view {(char*)payload.data(), payload.size()}); - //TODO: Terminate session + shutdown_event->raise(true); + continue; } server.iterate(500ms); @@ -519,12 +513,12 @@ void recvThread(broadcast_ctx_t &ctx) { video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]); audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]); - while(!ctx.shutdown_event.peek()) { + while(!broadcast_shutdown_event.peek()) { io.run(); } } -void videoBroadcastThread(safe::event_t *shutdown_event, udp::socket &sock, video::packet_queue_t packets) { +void videoBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, video::packet_queue_t packets) { int lowseq = 0; while(auto packet = packets->pop()) { if(shutdown_event->peek()) { @@ -626,7 +620,7 @@ void videoBroadcastThread(safe::event_t *shutdown_event, udp::socket &sock shutdown_event->raise(true); } -void audioBroadcastThread(safe::event_t *shutdown_event, udp::socket &sock, audio::packet_queue_t packets) { +void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, audio::packet_queue_t packets) { uint16_t frame{1}; while (auto packet = packets->pop()) { @@ -658,9 +652,9 @@ int start_broadcast(broadcast_ctx_t &ctx) { ctx.message_queue_queue = std::make_shared(); ctx.session_queue = std::make_shared(); - ctx.video_thread = std::thread { videoBroadcastThread, &ctx.shutdown_event, std::ref(ctx.video_sock), ctx.video_packets }; - ctx.audio_thread = std::thread { audioBroadcastThread, &ctx.shutdown_event, std::ref(ctx.audio_sock), ctx.audio_packets }; - ctx.control_thread = std::thread { controlBroadcastThread, &ctx.shutdown_event, ctx.session_queue }; + ctx.video_thread = std::thread { videoBroadcastThread, &broadcast_shutdown_event, std::ref(ctx.video_sock), ctx.video_packets }; + ctx.audio_thread = std::thread { audioBroadcastThread, &broadcast_shutdown_event, std::ref(ctx.audio_sock), ctx.audio_packets }; + ctx.control_thread = std::thread { controlBroadcastThread, &broadcast_shutdown_event, ctx.session_queue }; ctx.recv_thread = std::thread { recvThread, std::ref(ctx) }; @@ -668,25 +662,32 @@ int start_broadcast(broadcast_ctx_t &ctx) { } void end_broadcast(broadcast_ctx_t &ctx) { - ctx.shutdown_event.raise(true); + broadcast_shutdown_event.raise(true); + + // Minimize delay stopping video/audio threads ctx.video_packets->stop(); ctx.audio_packets->stop(); + ctx.message_queue_queue->stop(); ctx.io.stop(); - ctx.video_sock.cancel(); - ctx.audio_sock.cancel(); - - BOOST_LOG(debug) << "Waiting for video thread to end..."sv; - ctx.video_thread.join(); - BOOST_LOG(debug) << "Waiting for audio thread to end..."sv; - ctx.audio_thread.join(); - BOOST_LOG(debug) << "Waiting for control thread to end..."sv; - ctx.control_thread.join(); - BOOST_LOG(debug) << "All broadcasting threads ended"sv; + ctx.video_sock.close(); + ctx.audio_sock.close(); ctx.video_packets.reset(); ctx.audio_packets.reset(); + + BOOST_LOG(debug) << "Waiting for main listening thread to end..."sv; + ctx.recv_thread.join(); + BOOST_LOG(debug) << "Waiting for main video thread to end..."sv; + ctx.video_thread.join(); + BOOST_LOG(debug) << "Waiting for main audio thread to end..."sv; + ctx.audio_thread.join(); + BOOST_LOG(debug) << "Waiting for main control thread to end..."sv; + ctx.control_thread.join(); + BOOST_LOG(debug) << "All broadcasting threads ended"sv; + + broadcast_shutdown_event.reset(); } int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) { @@ -721,12 +722,12 @@ int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address & return port; } -void videoThread(std::shared_ptr session, std::string addr_str) { +void videoThread(session_t *session, std::string addr_str) { auto fg = util::fail_guard([&]() { - stop(*session); + session::stop(*session); }); - while(session->state == state_e::STARTING) { + while(session->state == session::state_e::STARTING) { std::this_thread::sleep_for(1ms); } @@ -742,15 +743,15 @@ void videoThread(std::shared_ptr session, std::string addr_str) { session->video_peer.port(port); BOOST_LOG(debug) << "Start capturing Video"sv; - video::capture(ref->video_packets, session->idr_events, session->config.monitor, session.get()); + video::capture(&session->shutdown_event, ref->video_packets, session->idr_events, session->config.monitor, session); } -void audioThread(std::shared_ptr session, std::string addr_str) { +void audioThread(session_t *session, std::string addr_str) { auto fg = util::fail_guard([&]() { - stop(*session); + session::stop(*session); }); - while(session->state == state_e::STARTING) { + while(session->state == session::state_e::STARTING) { std::this_thread::sleep_for(1ms); } @@ -766,11 +767,17 @@ void audioThread(std::shared_ptr session, std::string addr_str) { session->audio_peer.port(port); BOOST_LOG(debug) << "Start capturing Audio"sv; - audio::capture(ref->audio_packets, session->config.audio, session.get()); + audio::capture(&session->shutdown_event, ref->audio_packets, session->config.audio, session); +} + +namespace session { +state_e state(session_t &session) { + return session.state.load(std::memory_order_relaxed); } void stop(session_t &session) { - session.idr_events->stop(); + session.broadcast_ref->session_queue->raise(session.video_peer.address().to_string(), nullptr); + session.shutdown_event.raise(true); auto expected = state_e::RUNNING; session.state.compare_exchange_strong(expected, state_e::STOPPING); @@ -783,19 +790,19 @@ void join(session_t &session) { session.audioThread.join(); } -void start_session(std::shared_ptr session, const std::string &addr_string) { - session->broadcast_ref = broadcast.ref(); - session->broadcast_ref->session_queue->raise(addr_string, session); +void start(session_t &session, const std::string &addr_string) { + session.broadcast_ref = broadcast.ref(); + session.broadcast_ref->session_queue->raise(addr_string, &session); - session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; + session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - session->audioThread = std::thread {audioThread, session, addr_string}; - session->videoThread = std::thread {videoThread, session, addr_string}; + session.audioThread = std::thread {audioThread, &session, addr_string}; + session.videoThread = std::thread {videoThread, &session, addr_string}; - session->state.store(state_e::RUNNING, std::memory_order_relaxed); + session.state.store(state_e::RUNNING, std::memory_order_relaxed); } -std::shared_ptr alloc_session(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) { +std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) { auto session = std::make_shared(); session->config = config; @@ -808,3 +815,4 @@ std::shared_ptr alloc_session(config_t &config, crypto::aes_t &gcm_ke return session; } } +} diff --git a/sunshine/stream.h b/sunshine/stream.h index 29fbdd82..a92eb33b 100644 --- a/sunshine/stream.h +++ b/sunshine/stream.h @@ -26,13 +26,23 @@ struct config_t { std::optional gcmap; }; -std::shared_ptr alloc_session(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv); -void start_session(std::shared_ptr session, const std::string &addr_string); +namespace session { +enum class state_e : int { + STOPPED, + STOPPING, + STARTING, + RUNNING, +}; +std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv); +void start(session_t &session, const std::string &addr_string); void stop(session_t &session); void join(session_t &session); +state_e state(session_t &session); +} extern std::shared_ptr input; +extern safe::signal_t broadcast_shutdown_event; } #endif //SUNSHINE_STREAM_H diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index 21eeba5f..b10089e3 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -93,7 +93,7 @@ public: bool peek() { std::lock_guard lg { _lock }; - return (bool)_status; + return _continue && (bool)_status; } void stop() { @@ -104,7 +104,15 @@ public: _cv.notify_all(); } - bool running() const { + void reset() { + std::lock_guard lg{_lock}; + + _continue = true; + + _status = util::false_v; + } + + [[nodiscard]] bool running() const { return _continue; } private: @@ -137,7 +145,7 @@ public: bool peek() { std::lock_guard lg { _lock }; - return !_queue.empty(); + return _continue && !_queue.empty(); } template @@ -315,6 +323,8 @@ auto make_shared(F_Construct &&fc, F_Destruct &&fd) { std::forward(fc), std::forward(fd) }; } + +using signal_t = event_t; } #endif //SUNSHINE_THREAD_SAFE_H diff --git a/sunshine/video.cpp b/sunshine/video.cpp index c305b058..862ea000 100644 --- a/sunshine/video.cpp +++ b/sunshine/video.cpp @@ -193,6 +193,7 @@ void end_capture(capture_thread_ctx_t &capture_thread_ctx) { } void capture( + safe::signal_t *shutdown_event, packet_queue_t packets, idr_event_t idr_events, config_t config, @@ -344,7 +345,7 @@ void capture( // Initiate scaling context with correct height and width sws_t sws; while(auto img = images->pop()) { - if(!idr_events->running()) { + if(shutdown_event->peek()) { break; } diff --git a/sunshine/video.h b/sunshine/video.h index 873b76bc..70f0f9e3 100644 --- a/sunshine/video.h +++ b/sunshine/video.h @@ -51,6 +51,7 @@ struct config_t { }; void capture( + safe::signal_t *shutdown_event, packet_queue_t packets, idr_event_t idr_events, config_t config,