diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index 5e09cb76..6ce58a7e 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -832,8 +832,18 @@ void start(std::shared_ptr shutdown_event) { http_server.config.address = "0.0.0.0"s; http_server.config.port = PORT_HTTP; - std::thread ssl { &https_server_t::start, &https_server }; - std::thread tcp { &http_server_t::start, &http_server }; + try { + https_server.bind(); + http_server.bind(); + } catch(boost::system::system_error &err) { + BOOST_LOG(fatal) << "Couldn't bind http server to ports ["sv << PORT_HTTPS << ", "sv << PORT_HTTP << "]: "sv << err.what(); + + shutdown_event->raise(true); + return; + } + + std::thread ssl { &https_server_t::accept_and_run, &https_server }; + std::thread tcp { &http_server_t::accept_and_run, &http_server }; // Wait for any event shutdown_event->view(); diff --git a/sunshine/rtsp.cpp b/sunshine/rtsp.cpp index 70aac786..cf9a973a 100644 --- a/sunshine/rtsp.cpp +++ b/sunshine/rtsp.cpp @@ -54,11 +54,13 @@ public: } } - void bind(std::uint16_t port) { + int bind(std::uint16_t port) { _session_slots.resize(config::stream.channels); _slot_count = config::stream.channels; _host = net::host_create(_addr, 1, port); + + return !(bool)_host; } void session_raise(launch_session_t launch_session) { @@ -157,16 +159,21 @@ public: } } - bool accept(const std::shared_ptr &session) { + void clear(std::shared_ptr *session_p) { + session_p->reset(); + + ++_slot_count; + } + + std::shared_ptr *accept(std::shared_ptr &session) { for(auto &slot : _session_slots) { if(!slot) { slot = session; - - return true; + return &slot; } } - return false; + return nullptr; } net::host_t::pointer host() const { @@ -412,14 +419,22 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { } auto session = session::alloc(config, launch_session->gcm_key, launch_session->iv); - if(!server->accept(session)) { + + auto slot = server->accept(session); + if(!slot) { BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']'; respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); return; } - session::start(*session, platf::from_sockaddr((sockaddr*)&peer->address.address)); + if(session::start(*session, platf::from_sockaddr((sockaddr*)&peer->address.address))) { + BOOST_LOG(error) << "Failed to start a streaming session"sv; + + server->clear(slot); + respond(server->host(), peer, &option, 500, "Internal Server Error", req->sequenceNumber, {}); + return; + } respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); } @@ -444,7 +459,13 @@ void rtpThread(std::shared_ptr shutdown_event) { server.map("PLAY"sv, &cmd_play); - server.bind(RTSP_SETUP_PORT); + if(server.bind(RTSP_SETUP_PORT)) { + BOOST_LOG(fatal) << "Couldn't bind RTSP server to port ["sv << RTSP_SETUP_PORT << "], likely another process already bound to the port"sv; + shutdown_event->raise(true); + + return; + } + while(!shutdown_event->peek()) { server.iterate(std::min(500ms, config::stream.ping_timeout)); diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 64524109..b9be9f8f 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -99,10 +99,11 @@ static inline void while_starting_do_nothing(std::atomic &stat class control_server_t { public: - control_server_t(control_server_t &&) noexcept = default; - control_server_t &operator=(control_server_t &&) noexcept = default; + int bind(std::uint16_t port) { + _host = net::host_create(_addr, config::stream.channels, port); - explicit control_server_t(std::uint16_t port) : _host { net::host_create(_addr, config::stream.channels, port) } {} + return !(bool)_host; + } void emplace_addr_to_session(const std::string &addr, session_t &session) { auto lg = _map_addr_session.lock(); @@ -160,9 +161,9 @@ struct broadcast_ctx_t { asio::io_service io; - udp::socket video_sock { io, udp::endpoint(udp::v4(), VIDEO_STREAM_PORT) }; - udp::socket audio_sock { io, udp::endpoint(udp::v4(), AUDIO_STREAM_PORT) }; - control_server_t control_server { CONTROL_PORT }; + udp::socket video_sock { io }; + udp::socket audio_sock { io }; + control_server_t control_server; }; struct session_t { @@ -718,6 +719,41 @@ void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, aud } int start_broadcast(broadcast_ctx_t &ctx) { + if(ctx.control_server.bind(CONTROL_PORT)) { + BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << CONTROL_PORT << "], likely another process already bound to the port"sv; + + return -1; + } + + boost::system::error_code ec; + ctx.video_sock.open(udp::v4(), ec); + if(ec) { + BOOST_LOG(fatal) << "Couldn't open socket for Video server: "sv << ec.message(); + + return -1; + } + + ctx.video_sock.bind(udp::endpoint(udp::v4(), VIDEO_STREAM_PORT), ec); + if(ec) { + BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << VIDEO_STREAM_PORT << "]: "sv << ec.message(); + + return -1; + } + + ctx.audio_sock.open(udp::v4(), ec); + if(ec) { + BOOST_LOG(fatal) << "Couldn't open socket for Audio server: "sv << ec.message(); + + return -1; + } + + ctx.audio_sock.bind(udp::endpoint(udp::v4(), AUDIO_STREAM_PORT), ec); + if(ec) { + BOOST_LOG(fatal) << "Couldn't bind Audio server to port ["sv << AUDIO_STREAM_PORT << "]: "sv << ec.message(); + + return -1; + } + ctx.video_packets = std::make_shared(30); ctx.audio_packets = std::make_shared(30); ctx.message_queue_queue = std::make_shared(30); @@ -861,10 +897,14 @@ void join(session_t &session) { BOOST_LOG(debug) << "Session ended"sv; } -void start(session_t &session, const std::string &addr_string) { +int start(session_t &session, const std::string &addr_string) { session.input = input::alloc(); session.broadcast_ref = broadcast.ref(); + if(!session.broadcast_ref) { + return -1; + } + session.broadcast_ref->control_server.emplace_addr_to_session(addr_string, session); session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; @@ -873,6 +913,8 @@ void start(session_t &session, const std::string &addr_string) { session.videoThread = std::thread {videoThread, &session, addr_string}; session.state.store(state_e::RUNNING, std::memory_order_relaxed); + + return 0; } std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) { diff --git a/sunshine/stream.h b/sunshine/stream.h index 8e2e183a..cd63fcb2 100644 --- a/sunshine/stream.h +++ b/sunshine/stream.h @@ -31,7 +31,7 @@ enum class state_e : int { }; std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv); -void start(session_t &session, const std::string &addr_string); +int start(session_t &session, const std::string &addr_string); void stop(session_t &session); void join(session_t &session); state_e state(session_t &session); diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index b2741c3b..d166cc0f 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -311,13 +311,15 @@ public: [[nodiscard]] ptr_t ref() { std::lock_guard lg { _lock }; - if(!_count++) { + if(!_count) { new(_object_buf.data()) element_type; if(_construct(*reinterpret_cast(_object_buf.data()))) { return ptr_t { nullptr }; } } + ++_count; + return ptr_t { this }; } private: