Don't quit application when a session still running

This commit is contained in:
loki
2020-03-18 21:12:05 +01:00
parent e571b29868
commit c0116e0cdd
3 changed files with 73 additions and 23 deletions
+30 -7
View File
@@ -527,6 +527,15 @@ void launch(resp_https_t response, req_https_t request) {
response->write(data.str()); response->write(data.str());
}); });
BOOST_LOG(fatal) << stream::session_count();
if(stream::session_count() == config::stream.channels) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
return;
}
auto args = request->parse_query_string(); auto args = request->parse_query_string();
auto appid = util::from_view(args.at("appid")) -1; auto appid = util::from_view(args.at("appid")) -1;
@@ -558,7 +567,7 @@ void launch(resp_https_t response, req_https_t request) {
auto next = std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session.iv)); auto next = std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session.iv));
std::fill(next, std::end(launch_session.iv), 0); std::fill(next, std::end(launch_session.iv), 0);
stream::launch_event.raise(launch_session); stream::launch_session_raise(launch_session);
tree.put("root.<xmlattr>.status_code", 200); tree.put("root.<xmlattr>.status_code", 200);
tree.put("root.gamesession", 1); tree.put("root.gamesession", 1);
@@ -575,6 +584,16 @@ void resume(resp_https_t response, req_https_t request) {
response->write(data.str()); response->write(data.str());
}); });
// It is possible that due a race condition that this if-statement gives a false negative,
// that is automatically resolved in rtsp_server_t
if(stream::session_count() == config::stream.channels) {
BOOST_LOG(fatal) << stream::session_count();
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
return;
}
auto current_appid = proc::proc.running(); auto current_appid = proc::proc.running();
if(current_appid == -1) { if(current_appid == -1) {
tree.put("root.resume", 0); tree.put("root.resume", 0);
@@ -594,7 +613,7 @@ void resume(resp_https_t response, req_https_t request) {
auto next = std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session.iv)); auto next = std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session.iv));
std::fill(next, std::end(launch_session.iv), 0); std::fill(next, std::end(launch_session.iv), 0);
stream::launch_event.raise(launch_session); stream::launch_session_raise(launch_session);
tree.put("root.<xmlattr>.status_code", 200); tree.put("root.<xmlattr>.status_code", 200);
tree.put("root.resume", 1); tree.put("root.resume", 1);
@@ -611,17 +630,21 @@ void cancel(resp_https_t response, req_https_t request) {
response->write(data.str()); response->write(data.str());
}); });
if(proc::proc.running() == -1) { // It is possible that due a race condition that this if-statement gives a false positive,
tree.put("root.cancel", 1); // the client should try again
tree.put("root.<xmlattr>.status_code", 200); if(stream::session_count() != 0) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
return; return;
} }
proc::proc.terminate();
tree.put("root.cancel", 1); tree.put("root.cancel", 1);
tree.put("root.<xmlattr>.status_code", 200); tree.put("root.<xmlattr>.status_code", 200);
if(proc::proc.running() != -1) {
proc::proc.terminate();
}
} }
void appasset(resp_https_t response, req_https_t request) { void appasset(resp_https_t response, req_https_t request) {
+41 -15
View File
@@ -43,20 +43,31 @@ class rtsp_server_t;
using msg_t = util::safe_ptr<RTSP_MESSAGE, free_msg>; using msg_t = util::safe_ptr<RTSP_MESSAGE, free_msg>;
using cmd_func_t = std::function<void(rtsp_server_t*, net::peer_t, msg_t&&)>; using cmd_func_t = std::function<void(rtsp_server_t*, net::peer_t, msg_t&&)>;
safe::event_t<launch_session_t> launch_event;
void print_msg(PRTSP_MESSAGE msg); void print_msg(PRTSP_MESSAGE msg);
void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t&& req); void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t&& req);
class rtsp_server_t { class rtsp_server_t {
public: public:
rtsp_server_t(rtsp_server_t &&) noexcept = default;
rtsp_server_t &operator=(rtsp_server_t &&) noexcept = default;
explicit rtsp_server_t(std::uint16_t port) : _session_slots (config::stream.channels), _host {net::host_create(_addr, 1, port) } {}
~rtsp_server_t() { ~rtsp_server_t() {
stop(); if(_host) {
clear();
}
}
void bind(std::uint16_t port) {
_session_slots.resize(config::stream.channels);
_slot_count = config::stream.channels;
_host = net::host_create(_addr, 1, port);
}
void session_raise(launch_session_t launch_session) {
--_slot_count;
launch_event.raise(launch_session);
}
int session_count() const {
return config::stream.channels - _slot_count;
} }
template<class T, class X> template<class T, class X>
@@ -127,13 +138,15 @@ public:
_map_cmd_cb.emplace(type, std::move(cb)); _map_cmd_cb.emplace(type, std::move(cb));
} }
void stop(bool all = true) { void clear(bool all = true) {
for(auto &slot : _session_slots) { for(auto &slot : _session_slots) {
if (slot && (all || session::state(*slot) == session::state_e::STOPPING)) { if (slot && (all || session::state(*slot) == session::state_e::STOPPING)) {
session::stop(*slot); session::stop(*slot);
session::join(*slot); session::join(*slot);
slot.reset(); slot.reset();
++_slot_count;
} }
} }
@@ -159,6 +172,9 @@ public:
net::host_t::pointer host() const { net::host_t::pointer host() const {
return _host.get(); return _host.get();
} }
safe::event_t<launch_session_t> launch_event;
private: private:
// named _queue_packet because I want to make it an actual queue // named _queue_packet because I want to make it an actual queue
@@ -169,10 +185,21 @@ private:
std::vector<std::shared_ptr<session_t>> _session_slots; std::vector<std::shared_ptr<session_t>> _session_slots;
int _slot_count;
ENetAddress _addr; ENetAddress _addr;
net::host_t _host; net::host_t _host;
}; };
rtsp_server_t server;
void launch_session_raise(launch_session_t launch_session) {
server.session_raise(launch_session);
}
int session_count() {
return server.session_count();
}
void respond(net::host_t::pointer host, net::peer_t peer, msg_t &resp) { void respond(net::host_t::pointer host, net::peer_t peer, msg_t &resp) {
auto payload = std::make_pair(resp->payload, resp->payloadLength); auto payload = std::make_pair(resp->payload, resp->payloadLength);
@@ -297,13 +324,13 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) {
auto seqn_str = to_string(req->sequenceNumber); auto seqn_str = to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str()); option.content = const_cast<char*>(seqn_str.c_str());
if(!launch_event.peek()) { if(!server->launch_event.peek()) {
// /launch has not been used // /launch has not been used
respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return; return;
} }
auto launch_session { launch_event.pop() }; auto launch_session { server->launch_event.pop() };
std::string_view payload { req->payload, (size_t)req->payloadLength }; std::string_view payload { req->payload, (size_t)req->payloadLength };
@@ -410,8 +437,6 @@ void cmd_play(rtsp_server_t *server, net::peer_t peer, msg_t &&req) {
} }
void rtpThread(std::shared_ptr<safe::signal_t> shutdown_event) { void rtpThread(std::shared_ptr<safe::signal_t> shutdown_event) {
rtsp_server_t server(RTSP_SETUP_PORT);
server.map("OPTIONS"sv, &cmd_option); server.map("OPTIONS"sv, &cmd_option);
server.map("DESCRIBE"sv, &cmd_describe); server.map("DESCRIBE"sv, &cmd_describe);
server.map("SETUP"sv, &cmd_setup); server.map("SETUP"sv, &cmd_setup);
@@ -419,15 +444,16 @@ void rtpThread(std::shared_ptr<safe::signal_t> shutdown_event) {
server.map("PLAY"sv, &cmd_play); server.map("PLAY"sv, &cmd_play);
server.bind(RTSP_SETUP_PORT);
while(!shutdown_event->peek()) { while(!shutdown_event->peek()) {
server.iterate(std::min(500ms, config::stream.ping_timeout)); server.iterate(std::min(500ms, config::stream.ping_timeout));
if(broadcast_shutdown_event.peek()) { if(broadcast_shutdown_event.peek()) {
server.stop(); server.clear();
} }
else { else {
// cleanup all stopped sessions // cleanup all stopped sessions
server.stop(false); server.clear(false);
} }
} }
} }
+2 -1
View File
@@ -16,7 +16,8 @@ struct launch_session_t {
crypto::aes_t iv; crypto::aes_t iv;
}; };
extern safe::event_t<launch_session_t> launch_event; void launch_session_raise(launch_session_t launch_session);
int session_count();
void rtpThread(std::shared_ptr<safe::signal_t> shutdown_event); void rtpThread(std::shared_ptr<safe::signal_t> shutdown_event);