feat(stream)!: remove limit on concurrent sessions and allow quitting apps with active sessions (#3325)
This commit is contained in:
@@ -186,32 +186,6 @@ editing the `conf` file in a text editor. Use the examples as reference.
|
|||||||
</tr>
|
</tr>
|
||||||
</table>
|
</table>
|
||||||
|
|
||||||
### channels
|
|
||||||
|
|
||||||
<table>
|
|
||||||
<tr>
|
|
||||||
<td>Description</td>
|
|
||||||
<td colspan="2">
|
|
||||||
Sunshine can support multiple clients streaming simultaneously,
|
|
||||||
at the cost of higher CPU and GPU usage.
|
|
||||||
@note{All connected clients share control of the same streaming session.}
|
|
||||||
@warning{Some hardware encoders may have limitations that reduce performance with multiple streams.}
|
|
||||||
</td>
|
|
||||||
</tr>
|
|
||||||
<tr>
|
|
||||||
<td>Default</td>
|
|
||||||
<td colspan="2">@code{}
|
|
||||||
1
|
|
||||||
@endcode</td>
|
|
||||||
</tr>
|
|
||||||
<tr>
|
|
||||||
<td>Example</td>
|
|
||||||
<td colspan="2">@code{}
|
|
||||||
channels = 1
|
|
||||||
@endcode</td>
|
|
||||||
</tr>
|
|
||||||
</table>
|
|
||||||
|
|
||||||
### global_prep_cmd
|
### global_prep_cmd
|
||||||
|
|
||||||
<table>
|
<table>
|
||||||
|
|||||||
@@ -395,7 +395,6 @@ namespace config {
|
|||||||
APPS_JSON_PATH,
|
APPS_JSON_PATH,
|
||||||
|
|
||||||
20, // fecPercentage
|
20, // fecPercentage
|
||||||
1, // channels
|
|
||||||
|
|
||||||
ENCRYPTION_MODE_NEVER, // lan_encryption_mode
|
ENCRYPTION_MODE_NEVER, // lan_encryption_mode
|
||||||
ENCRYPTION_MODE_OPPORTUNISTIC, // wan_encryption_mode
|
ENCRYPTION_MODE_OPPORTUNISTIC, // wan_encryption_mode
|
||||||
@@ -1046,8 +1045,6 @@ namespace config {
|
|||||||
stream.ping_timeout = std::chrono::milliseconds(to);
|
stream.ping_timeout = std::chrono::milliseconds(to);
|
||||||
}
|
}
|
||||||
|
|
||||||
int_between_f(vars, "channels", stream.channels, { 1, std::numeric_limits<int>::max() });
|
|
||||||
|
|
||||||
int_between_f(vars, "lan_encryption_mode", stream.lan_encryption_mode, { 0, 2 });
|
int_between_f(vars, "lan_encryption_mode", stream.lan_encryption_mode, { 0, 2 });
|
||||||
int_between_f(vars, "wan_encryption_mode", stream.wan_encryption_mode, { 0, 2 });
|
int_between_f(vars, "wan_encryption_mode", stream.wan_encryption_mode, { 0, 2 });
|
||||||
|
|
||||||
|
|||||||
@@ -94,9 +94,6 @@ namespace config {
|
|||||||
|
|
||||||
int fec_percentage;
|
int fec_percentage;
|
||||||
|
|
||||||
// max unique instances of video and audio streams
|
|
||||||
int channels;
|
|
||||||
|
|
||||||
// Video encryption settings for LAN and WAN streams
|
// Video encryption settings for LAN and WAN streams
|
||||||
int lan_encryption_mode;
|
int lan_encryption_mode;
|
||||||
int wan_encryption_mode;
|
int wan_encryption_mode;
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ namespace net {
|
|||||||
}
|
}
|
||||||
|
|
||||||
host_t
|
host_t
|
||||||
host_create(af_e af, ENetAddress &addr, std::size_t peers, std::uint16_t port) {
|
host_create(af_e af, ENetAddress &addr, std::uint16_t port) {
|
||||||
static std::once_flag enet_init_flag;
|
static std::once_flag enet_init_flag;
|
||||||
std::call_once(enet_init_flag, []() {
|
std::call_once(enet_init_flag, []() {
|
||||||
enet_initialize();
|
enet_initialize();
|
||||||
@@ -173,7 +173,8 @@ namespace net {
|
|||||||
enet_address_set_host(&addr, any_addr.data());
|
enet_address_set_host(&addr, any_addr.data());
|
||||||
enet_address_set_port(&addr, port);
|
enet_address_set_port(&addr, port);
|
||||||
|
|
||||||
auto host = host_t { enet_host_create(af == IPV4 ? AF_INET : AF_INET6, &addr, peers, 0, 0, 0) };
|
// Maximum of 128 clients, which should be enough for anyone
|
||||||
|
auto host = host_t { enet_host_create(af == IPV4 ? AF_INET : AF_INET6, &addr, 128, 0, 0, 0) };
|
||||||
|
|
||||||
// Enable opportunistic QoS tagging (automatically disables if the network appears to drop tagged packets)
|
// Enable opportunistic QoS tagging (automatically disables if the network appears to drop tagged packets)
|
||||||
enet_socket_set_option(host->socket, ENET_SOCKOPT_QOS, 1);
|
enet_socket_set_option(host->socket, ENET_SOCKOPT_QOS, 1);
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ namespace net {
|
|||||||
from_address(const std::string_view &view);
|
from_address(const std::string_view &view);
|
||||||
|
|
||||||
host_t
|
host_t
|
||||||
host_create(af_e af, ENetAddress &addr, std::size_t peers, std::uint16_t port);
|
host_create(af_e af, ENetAddress &addr, std::uint16_t port);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get the address family enum value from a string.
|
* @brief Get the address family enum value from a string.
|
||||||
|
|||||||
@@ -820,14 +820,6 @@ namespace nvhttp {
|
|||||||
response->close_connection_after_response = true;
|
response->close_connection_after_response = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (rtsp_stream::session_count() == config::stream.channels) {
|
|
||||||
tree.put("root.resume", 0);
|
|
||||||
tree.put("root.<xmlattr>.status_code", 503);
|
|
||||||
tree.put("root.<xmlattr>.status_message", "The host's concurrent stream limit has been reached. Stop an existing stream or increase the 'Channels' value in the Sunshine Web UI.");
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto args = request->parse_query_string();
|
auto args = request->parse_query_string();
|
||||||
if (
|
if (
|
||||||
args.find("rikey"s) == std::end(args) ||
|
args.find("rikey"s) == std::end(args) ||
|
||||||
@@ -913,16 +905,6 @@ namespace nvhttp {
|
|||||||
response->close_connection_after_response = true;
|
response->close_connection_after_response = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
// It is possible that due a race condition that this if-statement gives a false negative,
|
|
||||||
// that is automatically resolved in rtsp_server_t
|
|
||||||
if (rtsp_stream::session_count() == config::stream.channels) {
|
|
||||||
tree.put("root.resume", 0);
|
|
||||||
tree.put("root.<xmlattr>.status_code", 503);
|
|
||||||
tree.put("root.<xmlattr>.status_message", "The host's concurrent stream limit has been reached. Stop an existing stream or increase the 'Channels' value in the Sunshine Web UI.");
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto current_appid = proc::proc.running();
|
auto current_appid = proc::proc.running();
|
||||||
if (current_appid == 0) {
|
if (current_appid == 0) {
|
||||||
tree.put("root.resume", 0);
|
tree.put("root.resume", 0);
|
||||||
@@ -999,19 +981,11 @@ namespace nvhttp {
|
|||||||
response->close_connection_after_response = true;
|
response->close_connection_after_response = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
// It is possible that due a race condition that this if-statement gives a false positive,
|
|
||||||
// the client should try again
|
|
||||||
if (rtsp_stream::session_count() != 0) {
|
|
||||||
tree.put("root.resume", 0);
|
|
||||||
tree.put("root.<xmlattr>.status_code", 503);
|
|
||||||
tree.put("root.<xmlattr>.status_message", "All sessions must be disconnected before quitting");
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tree.put("root.cancel", 1);
|
tree.put("root.cancel", 1);
|
||||||
tree.put("root.<xmlattr>.status_code", 200);
|
tree.put("root.<xmlattr>.status_code", 200);
|
||||||
|
|
||||||
|
rtsp_stream::terminate_sessions();
|
||||||
|
|
||||||
if (proc::proc.running() > 0) {
|
if (proc::proc.running() > 0) {
|
||||||
proc::proc.terminate();
|
proc::proc.terminate();
|
||||||
}
|
}
|
||||||
|
|||||||
84
src/rtsp.cpp
84
src/rtsp.cpp
@@ -26,6 +26,7 @@ extern "C" {
|
|||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "video.h"
|
#include "video.h"
|
||||||
|
|
||||||
|
#include <set>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
namespace asio = boost::asio;
|
namespace asio = boost::asio;
|
||||||
@@ -417,13 +418,6 @@ namespace rtsp_stream {
|
|||||||
|
|
||||||
int
|
int
|
||||||
bind(net::af_e af, std::uint16_t port, boost::system::error_code &ec) {
|
bind(net::af_e af, std::uint16_t port, boost::system::error_code &ec) {
|
||||||
{
|
|
||||||
auto lg = _session_slots.lock();
|
|
||||||
|
|
||||||
_session_slots->resize(config::stream.channels);
|
|
||||||
_slot_count = config::stream.channels;
|
|
||||||
}
|
|
||||||
|
|
||||||
acceptor.open(af == net::IPV4 ? tcp::v4() : tcp::v6(), ec);
|
acceptor.open(af == net::IPV4 ? tcp::v4() : tcp::v6(), ec);
|
||||||
if (ec) {
|
if (ec) {
|
||||||
return -1;
|
return -1;
|
||||||
@@ -529,7 +523,6 @@ namespace rtsp_stream {
|
|||||||
}
|
}
|
||||||
raised_timeout = now + config::stream.ping_timeout;
|
raised_timeout = now + config::stream.ping_timeout;
|
||||||
|
|
||||||
--_slot_count;
|
|
||||||
launch_event.raise(std::move(launch_session));
|
launch_event.raise(std::move(launch_session));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -552,9 +545,14 @@ namespace rtsp_stream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get the number of active sessions.
|
||||||
|
* @return Count of active sessions.
|
||||||
|
*/
|
||||||
int
|
int
|
||||||
session_count() const {
|
session_count() {
|
||||||
return config::stream.channels - _slot_count;
|
auto lg = _session_slots.lock();
|
||||||
|
return _session_slots->size();
|
||||||
}
|
}
|
||||||
|
|
||||||
safe::event_t<std::shared_ptr<launch_session_t>> launch_event;
|
safe::event_t<std::shared_ptr<launch_session_t>> launch_event;
|
||||||
@@ -573,20 +571,21 @@ namespace rtsp_stream {
|
|||||||
auto discarded = launch_event.pop(0s);
|
auto discarded = launch_event.pop(0s);
|
||||||
if (discarded) {
|
if (discarded) {
|
||||||
BOOST_LOG(debug) << "Event timeout: "sv << discarded->unique_id;
|
BOOST_LOG(debug) << "Event timeout: "sv << discarded->unique_id;
|
||||||
++_slot_count;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto lg = _session_slots.lock();
|
auto lg = _session_slots.lock();
|
||||||
|
|
||||||
for (auto &slot : *_session_slots) {
|
for (auto i = _session_slots->begin(); i != _session_slots->end();) {
|
||||||
if (slot && (all || stream::session::state(*slot) == stream::session::state_e::STOPPING)) {
|
auto &slot = *(*i);
|
||||||
stream::session::stop(*slot);
|
if (all || stream::session::state(slot) == stream::session::state_e::STOPPING) {
|
||||||
stream::session::join(*slot);
|
stream::session::stop(slot);
|
||||||
|
stream::session::join(slot);
|
||||||
|
|
||||||
slot.reset();
|
i = _session_slots->erase(i);
|
||||||
|
}
|
||||||
++_slot_count;
|
else {
|
||||||
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -595,36 +594,33 @@ namespace rtsp_stream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Removes the provided session from the set of sessions.
|
||||||
|
* @param session The session to remove.
|
||||||
|
*/
|
||||||
void
|
void
|
||||||
clear(std::shared_ptr<stream::session_t> *session_p) {
|
remove(const std::shared_ptr<stream::session_t> &session) {
|
||||||
auto lg = _session_slots.lock();
|
auto lg = _session_slots.lock();
|
||||||
|
_session_slots->erase(session);
|
||||||
session_p->reset();
|
|
||||||
|
|
||||||
++_slot_count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<stream::session_t> *
|
/**
|
||||||
accept(std::shared_ptr<stream::session_t> &session) {
|
* @brief Inserts the provided session into the set of sessions.
|
||||||
|
* @param session The session to insert.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
insert(const std::shared_ptr<stream::session_t> &session) {
|
||||||
auto lg = _session_slots.lock();
|
auto lg = _session_slots.lock();
|
||||||
|
_session_slots->emplace(session);
|
||||||
for (auto &slot : *_session_slots) {
|
BOOST_LOG(info) << "New streaming session started [active sessions: "sv << _session_slots->size() << ']';
|
||||||
if (!slot) {
|
|
||||||
slot = session;
|
|
||||||
return &slot;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nullptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unordered_map<std::string_view, cmd_func_t> _map_cmd_cb;
|
std::unordered_map<std::string_view, cmd_func_t> _map_cmd_cb;
|
||||||
|
|
||||||
sync_util::sync_t<std::vector<std::shared_ptr<stream::session_t>>> _session_slots;
|
sync_util::sync_t<std::set<std::shared_ptr<stream::session_t>>> _session_slots;
|
||||||
|
|
||||||
std::chrono::steady_clock::time_point raised_timeout;
|
std::chrono::steady_clock::time_point raised_timeout;
|
||||||
int _slot_count;
|
|
||||||
|
|
||||||
boost::asio::io_service ios;
|
boost::asio::io_service ios;
|
||||||
tcp::acceptor acceptor { ios };
|
tcp::acceptor acceptor { ios };
|
||||||
@@ -652,6 +648,11 @@ namespace rtsp_stream {
|
|||||||
return server.session_count();
|
return server.session_count();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
terminate_sessions() {
|
||||||
|
server.clear(true);
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
send(tcp::socket &sock, const std::string_view &sv) {
|
send(tcp::socket &sock, const std::string_view &sv) {
|
||||||
std::size_t bytes_send = 0;
|
std::size_t bytes_send = 0;
|
||||||
@@ -1110,19 +1111,12 @@ namespace rtsp_stream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto stream_session = stream::session::alloc(config, session);
|
auto stream_session = stream::session::alloc(config, session);
|
||||||
|
server->insert(stream_session);
|
||||||
auto slot = server->accept(stream_session);
|
|
||||||
if (!slot) {
|
|
||||||
BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']';
|
|
||||||
|
|
||||||
respond(sock, session, &option, 503, "Service Unavailable", req->sequenceNumber, {});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) {
|
if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) {
|
||||||
BOOST_LOG(error) << "Failed to start a streaming session"sv;
|
BOOST_LOG(error) << "Failed to start a streaming session"sv;
|
||||||
|
|
||||||
server->clear(slot);
|
server->remove(stream_session);
|
||||||
respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {});
|
respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
10
src/rtsp.h
10
src/rtsp.h
@@ -48,9 +48,19 @@ namespace rtsp_stream {
|
|||||||
void
|
void
|
||||||
launch_session_clear(uint32_t launch_session_id);
|
launch_session_clear(uint32_t launch_session_id);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get the number of active sessions.
|
||||||
|
* @return Count of active sessions.
|
||||||
|
*/
|
||||||
int
|
int
|
||||||
session_count();
|
session_count();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Terminates all running streaming sessions.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
terminate_sessions();
|
||||||
|
|
||||||
void
|
void
|
||||||
rtpThread();
|
rtpThread();
|
||||||
|
|
||||||
|
|||||||
@@ -252,7 +252,7 @@ namespace stream {
|
|||||||
public:
|
public:
|
||||||
int
|
int
|
||||||
bind(net::af_e address_family, std::uint16_t port) {
|
bind(net::af_e address_family, std::uint16_t port) {
|
||||||
_host = net::host_create(address_family, _addr, config::stream.channels, port);
|
_host = net::host_create(address_family, _addr, port);
|
||||||
|
|
||||||
return !(bool) _host;
|
return !(bool) _host;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -203,7 +203,6 @@
|
|||||||
id: "advanced",
|
id: "advanced",
|
||||||
name: "Advanced",
|
name: "Advanced",
|
||||||
options: {
|
options: {
|
||||||
"channels": 1,
|
|
||||||
"fec_percentage": 20,
|
"fec_percentage": 20,
|
||||||
"qp": 28,
|
"qp": 28,
|
||||||
"min_threads": 2,
|
"min_threads": 2,
|
||||||
|
|||||||
@@ -73,16 +73,6 @@ function removeCmd(index) {
|
|||||||
<div class="form-text">{{ $t('config.log_level_desc') }}</div>
|
<div class="form-text">{{ $t('config.log_level_desc') }}</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<!-- Maximum Connected Clients -->
|
|
||||||
<div class="mb-3">
|
|
||||||
<label for="channels" class="form-label">{{ $t('config.channels') }}</label>
|
|
||||||
<input type="text" class="form-control" id="channels" placeholder="1" v-model="config.channels" />
|
|
||||||
<div class="form-text">
|
|
||||||
{{ $t('config.channels_desc_1') }}<br>
|
|
||||||
{{ $t('_common.note') }} {{ $t('config.channels_desc_2') }}
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<!-- Global Prep Commands -->
|
<!-- Global Prep Commands -->
|
||||||
<div id="global_prep_cmd" class="mb-3 d-flex flex-column">
|
<div id="global_prep_cmd" class="mb-3 d-flex flex-column">
|
||||||
<label class="form-label">{{ $t('config.global_prep_cmd') }}</label>
|
<label class="form-label">{{ $t('config.global_prep_cmd') }}</label>
|
||||||
|
|||||||
Reference in New Issue
Block a user