Correctly identify size of rtsp packet
This commit is contained in:
+114
-64
@@ -51,94 +51,145 @@ public:
|
|||||||
: handle_data_fn { std::move(handle_data_fn) }, sock { ios } {}
|
: handle_data_fn { std::move(handle_data_fn) }, sock { ios } {}
|
||||||
|
|
||||||
void read() {
|
void read() {
|
||||||
|
if(begin == std::end(msg_buf)) {
|
||||||
|
BOOST_LOG(error) << "RTSP: read(): Exceeded maximum rtsp packet size: "sv << msg_buf.size();
|
||||||
|
|
||||||
|
respond(sock, nullptr, 400, "BAD REQUEST", 0, {});
|
||||||
|
|
||||||
|
sock.close();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
sock.async_read_some(
|
sock.async_read_some(
|
||||||
boost::asio::buffer(msg_buf.data(), msg_buf.size()),
|
boost::asio::buffer(begin, (std::size_t)(std::end(msg_buf) - begin)),
|
||||||
boost::bind(
|
boost::bind(
|
||||||
&socket_t::handle_read, shared_from_this(),
|
&socket_t::handle_read, shared_from_this(),
|
||||||
boost::asio::placeholders::error,
|
boost::asio::placeholders::error,
|
||||||
boost::asio::placeholders::bytes_transferred));
|
boost::asio::placeholders::bytes_transferred));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void handle_read(std::shared_ptr<socket_t> &socket, const boost::system::error_code &ec, std::size_t bytes) {
|
void read_payload() {
|
||||||
BOOST_LOG(debug) << "Handle read of size: "sv << bytes << " bytes"sv;
|
if(begin == std::end(msg_buf)) {
|
||||||
|
BOOST_LOG(error) << "RTSP: read_payload(): Exceeded maximum rtsp packet size: "sv << msg_buf.size();
|
||||||
|
|
||||||
|
respond(sock, nullptr, 400, "BAD REQUEST", 0, {});
|
||||||
|
|
||||||
|
sock.close();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sock.async_read_some(
|
||||||
|
boost::asio::buffer(begin, (std::size_t)(std::end(msg_buf) - begin)),
|
||||||
|
boost::bind(
|
||||||
|
&socket_t::handle_payload, shared_from_this(),
|
||||||
|
boost::asio::placeholders::error,
|
||||||
|
boost::asio::placeholders::bytes_transferred));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void handle_payload(std::shared_ptr<socket_t> &socket, const boost::system::error_code &ec, std::size_t bytes) {
|
||||||
|
BOOST_LOG(debug) << "handle_payload(): Handle read of size: "sv << bytes << " bytes"sv;
|
||||||
|
|
||||||
|
auto sock_close = util::fail_guard([&socket]() {
|
||||||
|
boost::system::error_code ec;
|
||||||
|
socket->sock.close(ec);
|
||||||
|
|
||||||
|
if(ec) {
|
||||||
|
BOOST_LOG(error) << "RTSP: handle_payload(): Couldn't close tcp socket: "sv << ec.message();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if(ec) {
|
if(ec) {
|
||||||
BOOST_LOG(error) << "RTSP: Couldn't read from tcp socket: "sv << ec.message();
|
BOOST_LOG(error) << "RTSP: handle_payload(): Couldn't read from tcp socket: "sv << ec.message();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto end = socket->begin + bytes;
|
||||||
|
msg_t req { new msg_t::element_type {} };
|
||||||
|
if(auto status = parseRtspMessage(req.get(), socket->msg_buf.data(), (std::size_t)(end - socket->msg_buf.data()))) {
|
||||||
|
BOOST_LOG(error) << "Malformed RTSP message: ["sv << status << ']';
|
||||||
|
|
||||||
|
respond(socket->sock, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sock_close.disable();
|
||||||
|
|
||||||
|
auto fg = util::fail_guard([&socket]() {
|
||||||
|
socket->read_payload();
|
||||||
|
});
|
||||||
|
|
||||||
|
auto content_lenght = 0;
|
||||||
|
for(auto option = req->options; option != nullptr; option = option->next) {
|
||||||
|
if("Content-length"sv == option->option) {
|
||||||
|
BOOST_LOG(debug) << "Found Content-Length: "sv << option->content << " bytes"sv;
|
||||||
|
|
||||||
|
// If content_length > bytes read, then we need to store current data read,
|
||||||
|
// to be appended by the next read.
|
||||||
|
std::string_view content { option->content };
|
||||||
|
auto begin = std::find_if(std::begin(content), std::end(content), [](auto ch) { return (bool)std::isdigit(ch); });
|
||||||
|
|
||||||
|
content_lenght = util::from_chars(begin, std::end(content));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(end - socket->crlf >= content_lenght) {
|
||||||
|
if(end - socket->crlf > content_lenght) {
|
||||||
|
BOOST_LOG(warning) << "(end - socket->crlf) > content_lenght -- "sv << (std::size_t)(end - socket->crlf) << " > "sv << content_lenght;
|
||||||
|
}
|
||||||
|
|
||||||
|
fg.disable();
|
||||||
|
print_msg(req.get());
|
||||||
|
|
||||||
|
socket->handle_data(std::move(req));
|
||||||
|
}
|
||||||
|
|
||||||
|
socket->begin = end;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void handle_read(std::shared_ptr<socket_t> &socket, const boost::system::error_code &ec, std::size_t bytes) {
|
||||||
|
BOOST_LOG(debug) << "handle_read(): Handle read of size: "sv << bytes << " bytes"sv;
|
||||||
|
|
||||||
|
if(ec) {
|
||||||
|
BOOST_LOG(error) << "RTSP: handle_read(): Couldn't read from tcp socket: "sv << ec.message();
|
||||||
|
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
socket->sock.close(ec);
|
socket->sock.close(ec);
|
||||||
|
|
||||||
if(ec) {
|
if(ec) {
|
||||||
BOOST_LOG(error) << "RTSP: Couldn't close tcp socket: "sv << ec.message();
|
BOOST_LOG(error) << "RTSP: handle_read(): Couldn't close tcp socket: "sv << ec.message();
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto fg = util::fail_guard([&socket]() {
|
auto fg = util::fail_guard([&socket]() {
|
||||||
socket->sock.close();
|
socket->read();
|
||||||
});
|
});
|
||||||
|
|
||||||
msg_t req { new msg_t::element_type {} };
|
auto begin = std::max(socket->begin - 4, socket->begin);
|
||||||
|
auto buf_size = bytes + (begin - socket->begin);
|
||||||
|
auto end = begin + buf_size;
|
||||||
|
|
||||||
auto &incomplete = socket->incomplete;
|
constexpr auto needle = "\r\n\r\n"sv;
|
||||||
|
|
||||||
if(incomplete.empty()) {
|
auto it = std::search(begin, begin + buf_size, std::begin(needle), std::end(needle));
|
||||||
if(auto status = parseRtspMessage(req.get(), socket->msg_buf.data(), bytes)) {
|
if(it == end) {
|
||||||
BOOST_LOG(error) << "Malformed RTSP message: ["sv << status << ']';
|
socket->begin = end;
|
||||||
|
|
||||||
respond(socket->sock, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {});
|
return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(auto option = req->options; option != nullptr; option = option->next) {
|
|
||||||
if("Content-length"sv == option->option) {
|
|
||||||
BOOST_LOG(debug) << "Found Content-Length: "sv << option->content << " bytes"sv;
|
|
||||||
|
|
||||||
// If content_length > bytes read, then we need to store current data read,
|
|
||||||
// to be appended by the next read.
|
|
||||||
std::string_view content { option->content };
|
|
||||||
auto begin = std::find_if(std::begin(content), std::end(content), [](auto ch) { return (bool)std::isdigit(ch); });
|
|
||||||
|
|
||||||
socket->content_length = util::from_chars(begin, std::end(content));
|
|
||||||
if(socket->content_length <= bytes + incomplete.size()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto incomplete_size = incomplete.size();
|
|
||||||
incomplete.resize(incomplete.size() + bytes);
|
|
||||||
|
|
||||||
std::copy_n(socket->msg_buf.data(), bytes, std::begin(incomplete) + incomplete_size);
|
|
||||||
|
|
||||||
socket->read();
|
|
||||||
|
|
||||||
fg.disable();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
auto incomplete_size = incomplete.size();
|
|
||||||
incomplete.resize(incomplete.size() + bytes);
|
|
||||||
|
|
||||||
std::copy_n(socket->msg_buf.data(), bytes, std::begin(incomplete) + incomplete_size);
|
|
||||||
|
|
||||||
if(incomplete.size() < socket->content_length) {
|
|
||||||
fg.disable();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(auto status = parseRtspMessage(req.get(), incomplete.data(), incomplete.size())) {
|
|
||||||
BOOST_LOG(error) << "Malformed RTSP message: ["sv << status << ']';
|
|
||||||
|
|
||||||
respond(socket->sock, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
print_msg(req.get());
|
// Emulate read completion for payload data
|
||||||
|
socket->begin = it + needle.size();
|
||||||
|
socket->crlf = socket->begin;
|
||||||
|
buf_size = end - socket->begin;
|
||||||
|
|
||||||
socket->handle_data(std::move(req));
|
fg.disable();
|
||||||
|
handle_payload(socket, ec, buf_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handle_data(msg_t &&req) {
|
void handle_data(msg_t &&req) {
|
||||||
@@ -149,11 +200,10 @@ public:
|
|||||||
|
|
||||||
tcp::socket sock;
|
tcp::socket sock;
|
||||||
|
|
||||||
|
std::array<char, 2048> msg_buf;
|
||||||
|
|
||||||
std::vector<char> incomplete;
|
char *crlf;
|
||||||
|
char *begin = msg_buf.data();
|
||||||
std::size_t content_length = std::numeric_limits<std::size_t>::max();
|
|
||||||
std::array<char, 1024> msg_buf;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class rtsp_server_t {
|
class rtsp_server_t {
|
||||||
|
|||||||
Reference in New Issue
Block a user