Merge remote-tracking branch 'origin/dependabot/npm_and_yarn/dev-dependencies-cddfeb1403'
This commit is contained in:
84
src/rtsp.cpp
84
src/rtsp.cpp
@@ -26,6 +26,7 @@ extern "C" {
|
||||
#include "sync.h"
|
||||
#include "video.h"
|
||||
|
||||
#include <set>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace asio = boost::asio;
|
||||
@@ -417,13 +418,6 @@ namespace rtsp_stream {
|
||||
|
||||
int
|
||||
bind(net::af_e af, std::uint16_t port, boost::system::error_code &ec) {
|
||||
{
|
||||
auto lg = _session_slots.lock();
|
||||
|
||||
_session_slots->resize(config::stream.channels);
|
||||
_slot_count = config::stream.channels;
|
||||
}
|
||||
|
||||
acceptor.open(af == net::IPV4 ? tcp::v4() : tcp::v6(), ec);
|
||||
if (ec) {
|
||||
return -1;
|
||||
@@ -529,7 +523,6 @@ namespace rtsp_stream {
|
||||
}
|
||||
raised_timeout = now + config::stream.ping_timeout;
|
||||
|
||||
--_slot_count;
|
||||
launch_event.raise(std::move(launch_session));
|
||||
}
|
||||
|
||||
@@ -552,9 +545,14 @@ namespace rtsp_stream {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get the number of active sessions.
|
||||
* @return Count of active sessions.
|
||||
*/
|
||||
int
|
||||
session_count() const {
|
||||
return config::stream.channels - _slot_count;
|
||||
session_count() {
|
||||
auto lg = _session_slots.lock();
|
||||
return _session_slots->size();
|
||||
}
|
||||
|
||||
safe::event_t<std::shared_ptr<launch_session_t>> launch_event;
|
||||
@@ -573,20 +571,21 @@ namespace rtsp_stream {
|
||||
auto discarded = launch_event.pop(0s);
|
||||
if (discarded) {
|
||||
BOOST_LOG(debug) << "Event timeout: "sv << discarded->unique_id;
|
||||
++_slot_count;
|
||||
}
|
||||
}
|
||||
|
||||
auto lg = _session_slots.lock();
|
||||
|
||||
for (auto &slot : *_session_slots) {
|
||||
if (slot && (all || stream::session::state(*slot) == stream::session::state_e::STOPPING)) {
|
||||
stream::session::stop(*slot);
|
||||
stream::session::join(*slot);
|
||||
for (auto i = _session_slots->begin(); i != _session_slots->end();) {
|
||||
auto &slot = *(*i);
|
||||
if (all || stream::session::state(slot) == stream::session::state_e::STOPPING) {
|
||||
stream::session::stop(slot);
|
||||
stream::session::join(slot);
|
||||
|
||||
slot.reset();
|
||||
|
||||
++_slot_count;
|
||||
i = _session_slots->erase(i);
|
||||
}
|
||||
else {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -595,27 +594,25 @@ namespace rtsp_stream {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Removes the provided session from the set of sessions.
|
||||
* @param session The session to remove.
|
||||
*/
|
||||
void
|
||||
clear(std::shared_ptr<stream::session_t> *session_p) {
|
||||
remove(const std::shared_ptr<stream::session_t> &session) {
|
||||
auto lg = _session_slots.lock();
|
||||
|
||||
session_p->reset();
|
||||
|
||||
++_slot_count;
|
||||
_session_slots->erase(session);
|
||||
}
|
||||
|
||||
std::shared_ptr<stream::session_t> *
|
||||
accept(std::shared_ptr<stream::session_t> &session) {
|
||||
/**
|
||||
* @brief Inserts the provided session into the set of sessions.
|
||||
* @param session The session to insert.
|
||||
*/
|
||||
void
|
||||
insert(const std::shared_ptr<stream::session_t> &session) {
|
||||
auto lg = _session_slots.lock();
|
||||
|
||||
for (auto &slot : *_session_slots) {
|
||||
if (!slot) {
|
||||
slot = session;
|
||||
return &slot;
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
_session_slots->emplace(session);
|
||||
BOOST_LOG(info) << "New streaming session started [active sessions: "sv << _session_slots->size() << ']';
|
||||
}
|
||||
|
||||
std::shared_ptr<stream::session_t>
|
||||
@@ -646,10 +643,9 @@ namespace rtsp_stream {
|
||||
private:
|
||||
std::unordered_map<std::string_view, cmd_func_t> _map_cmd_cb;
|
||||
|
||||
sync_util::sync_t<std::vector<std::shared_ptr<stream::session_t>>> _session_slots;
|
||||
sync_util::sync_t<std::set<std::shared_ptr<stream::session_t>>> _session_slots;
|
||||
|
||||
std::chrono::steady_clock::time_point raised_timeout;
|
||||
int _slot_count;
|
||||
|
||||
boost::asio::io_service ios;
|
||||
tcp::acceptor acceptor { ios };
|
||||
@@ -687,6 +683,11 @@ namespace rtsp_stream {
|
||||
return server.get_all_session_uuids();
|
||||
}
|
||||
|
||||
void
|
||||
terminate_sessions() {
|
||||
server.clear(true);
|
||||
}
|
||||
|
||||
int
|
||||
send(tcp::socket &sock, const std::string_view &sv) {
|
||||
std::size_t bytes_send = 0;
|
||||
@@ -1145,19 +1146,12 @@ namespace rtsp_stream {
|
||||
}
|
||||
|
||||
auto stream_session = stream::session::alloc(config, session);
|
||||
|
||||
auto slot = server->accept(stream_session);
|
||||
if (!slot) {
|
||||
BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']';
|
||||
|
||||
respond(sock, session, &option, 503, "Service Unavailable", req->sequenceNumber, {});
|
||||
return;
|
||||
}
|
||||
server->insert(stream_session);
|
||||
|
||||
if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) {
|
||||
BOOST_LOG(error) << "Failed to start a streaming session"sv;
|
||||
|
||||
server->clear(slot);
|
||||
server->remove(stream_session);
|
||||
respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {});
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user