Fixed error 400 on rtsp handshake
This commit is contained in:
250
stream.cpp
250
stream.cpp
@@ -123,6 +123,7 @@ void free_msg(PRTSP_MESSAGE msg) {
|
||||
using msg_t = util::safe_ptr<RTSP_MESSAGE, free_msg>;
|
||||
using packet_t = util::safe_ptr<ENetPacket, enet_packet_destroy>;
|
||||
using host_t = util::safe_ptr<ENetHost, enet_host_destroy>;
|
||||
using peer_t = ENetPeer*;
|
||||
using rh_t = util::safe_ptr<reed_solomon, reed_solomon_release>;
|
||||
using video_packet_t = util::safe_ptr<video_packet_raw_t, util::c_free>;
|
||||
using audio_packet_t = util::safe_ptr<audio_packet_raw_t, util::c_free>;
|
||||
@@ -134,6 +135,96 @@ host_t host_create(ENetAddress &addr, std::uint16_t port) {
|
||||
return host_t { enet_host_create(PF_INET, &addr, 1, 1, 0, 0) };
|
||||
}
|
||||
|
||||
void print_msg(PRTSP_MESSAGE msg);
|
||||
void cmd_not_found(host_t &host, peer_t peer, msg_t&& req);
|
||||
|
||||
class rtsp_server_t {
|
||||
public:
|
||||
rtsp_server_t(rtsp_server_t &&) noexcept = default;
|
||||
rtsp_server_t &operator=(rtsp_server_t &&) noexcept = default;
|
||||
|
||||
explicit rtsp_server_t(std::uint16_t port) : _host { host_create(_addr, port) } {}
|
||||
|
||||
template<class T, class X>
|
||||
void iterate(std::chrono::duration<T, X> timeout) {
|
||||
ENetEvent event;
|
||||
auto res = enet_host_service(_host.get(), &event, std::chrono::floor<std::chrono::milliseconds>(timeout).count());
|
||||
|
||||
if(res > 0) {
|
||||
switch(event.type) {
|
||||
case ENET_EVENT_TYPE_RECEIVE:
|
||||
{
|
||||
packet_t packet { event.packet };
|
||||
peer_t peer { event.peer };
|
||||
|
||||
msg_t req { new RTSP_MESSAGE {} };
|
||||
|
||||
//TODO: compare addresses of the peers
|
||||
if(_queue_packet.second == nullptr) {
|
||||
parseRtspMessage(req.get(), (char*)packet->data, packet->dataLength);
|
||||
for(auto option = req->options; option != nullptr; option = option->next) {
|
||||
if("Content-length"sv == option->option) {
|
||||
_queue_packet = std::make_pair(std::move(peer), std::move(packet));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
std::vector<char> full_payload;
|
||||
|
||||
auto old_msg = std::move(_queue_packet);
|
||||
TUPLE_2D_REF(_, old_packet, old_msg);
|
||||
|
||||
|
||||
std::string_view new_payload { (char*)packet->data, packet->dataLength };
|
||||
std::string_view old_payload { (char*)old_packet->data, old_packet->dataLength };
|
||||
full_payload.resize(new_payload.size() + old_payload.size());
|
||||
|
||||
std::copy(std::begin(old_payload), std::end(old_payload), std::begin(full_payload));
|
||||
std::copy(std::begin(new_payload), std::end(new_payload), std::begin(full_payload) + old_payload.size());
|
||||
|
||||
parseRtspMessage(req.get(), full_payload.data(), full_payload.size());
|
||||
}
|
||||
|
||||
print_msg(req.get());
|
||||
|
||||
msg_t resp;
|
||||
auto func = _map_cmd_cb.find(req->message.request.command);
|
||||
if(func != std::end(_map_cmd_cb)) {
|
||||
func->second(_host, peer, std::move(req));
|
||||
}
|
||||
else {
|
||||
cmd_not_found(_host, peer, std::move(req));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
break;
|
||||
case ENET_EVENT_TYPE_CONNECT:
|
||||
std::cout << "CLIENT CONNECTED TO RTSP" << std::endl;
|
||||
break;
|
||||
case ENET_EVENT_TYPE_DISCONNECT:
|
||||
std::cout << "CLIENT DISCONNECTED FROM RTSP" << std::endl;
|
||||
break;
|
||||
case ENET_EVENT_TYPE_NONE:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void map(const std::string_view &type, std::function<void(host_t &, peer_t, msg_t&&)> cb);
|
||||
private:
|
||||
void _respond(peer_t &peer, msg_t &msg);
|
||||
|
||||
// named _queue_packet because I want to make it an actual queue
|
||||
// It's like this for convenience sake
|
||||
std::pair<peer_t, packet_t> _queue_packet;
|
||||
|
||||
std::unordered_map<std::string_view, std::function<void(host_t&, peer_t, msg_t&&)>> _map_cmd_cb;
|
||||
ENetAddress _addr;
|
||||
host_t _host;
|
||||
};
|
||||
|
||||
class server_t {
|
||||
public:
|
||||
server_t(server_t &&) noexcept = default;
|
||||
@@ -383,6 +474,10 @@ std::vector<uint8_t> replace(const std::string_view &original, const std::string
|
||||
return replaced;
|
||||
}
|
||||
|
||||
void rtsp_server_t::map(const std::string_view& cmd, std::function<void(host_t&, peer_t, msg_t&&)> cb) {
|
||||
_map_cmd_cb.emplace(cmd, std::move(cb));
|
||||
}
|
||||
|
||||
void server_t::map(uint16_t type, std::function<void(const std::string_view &)> cb) {
|
||||
_map_type_cb.emplace(type, std::move(cb));
|
||||
}
|
||||
@@ -657,29 +752,60 @@ void videoThread() {
|
||||
captureThread.join();
|
||||
}
|
||||
|
||||
void respond(tcp::socket &sock, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) {
|
||||
RTSP_MESSAGE resp {};
|
||||
void respond(host_t &host, peer_t peer, msg_t &resp) {
|
||||
auto payload = std::make_pair(resp->payload, resp->payloadLength);
|
||||
|
||||
auto g = util::fail_guard([&]() {
|
||||
freeMessage(&resp);
|
||||
auto lg = util::fail_guard([&]() {
|
||||
resp->payload = payload.first;
|
||||
resp->payloadLength = payload.second;
|
||||
});
|
||||
|
||||
createRtspResponse(&resp, nullptr, 0, const_cast<char*>("RTSP/1.0"), statuscode, const_cast<char*>(status_msg), seqn, options, const_cast<char*>(payload.data()), (int)payload.size());
|
||||
resp->payload = nullptr;
|
||||
resp->payloadLength = 0;
|
||||
|
||||
int serialized_len;
|
||||
util::c_ptr<char> raw_resp { serializeRtspMessage(&resp, &serialized_len) };
|
||||
util::c_ptr<char> raw_resp { serializeRtspMessage(resp.get(), &serialized_len) };
|
||||
std::cout << "---Begin Response---"sv << std::endl
|
||||
<< std::string_view { raw_resp.get(), (std::size_t)serialized_len } << std::endl
|
||||
<< std::string_view { payload.first, (std::size_t)payload.second } << std::endl
|
||||
<< "---End Response---"sv << std::endl
|
||||
<< std::endl;
|
||||
|
||||
std::string_view tmp_resp { raw_resp.get(), (size_t)serialized_len };
|
||||
std::cout << "---Begin Response---" << std::endl << tmp_resp << "---End Response---" << std::endl << std::endl;
|
||||
|
||||
asio::write(sock, asio::buffer(tmp_resp));
|
||||
{
|
||||
auto packet = enet_packet_create(tmp_resp.data(), tmp_resp.size(), ENET_PACKET_FLAG_RELIABLE);
|
||||
if(enet_peer_send(peer, 0, packet)) {
|
||||
enet_packet_destroy(packet);
|
||||
return;
|
||||
}
|
||||
|
||||
enet_host_flush(host.get());
|
||||
}
|
||||
|
||||
if(payload.second > 0) {
|
||||
auto packet = enet_packet_create(payload.first, payload.second, ENET_PACKET_FLAG_RELIABLE);;
|
||||
if(enet_peer_send(peer, 0, packet)) {
|
||||
enet_packet_destroy(packet);
|
||||
return;
|
||||
}
|
||||
|
||||
enet_host_flush(host.get());
|
||||
}
|
||||
}
|
||||
|
||||
void cmd_not_found(tcp::socket &&sock, msg_t&& req) {
|
||||
respond(sock, nullptr, 404, "NOT FOUND", req->sequenceNumber, {});
|
||||
void respond(host_t &host, peer_t peer, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) {
|
||||
msg_t resp { new msg_t::element_type };
|
||||
createRtspResponse(resp.get(), nullptr, 0, const_cast<char*>("RTSP/1.0"), statuscode, const_cast<char*>(status_msg), seqn, options, const_cast<char*>(payload.data()), (int)payload.size());
|
||||
|
||||
respond(host, peer, resp);
|
||||
}
|
||||
|
||||
void cmd_option(tcp::socket &&sock, msg_t&& req) {
|
||||
void cmd_not_found(host_t &host, peer_t peer, msg_t&& req) {
|
||||
respond(host, peer, nullptr, 404, "NOT FOUND", req->sequenceNumber, {});
|
||||
}
|
||||
|
||||
void cmd_option(host_t &host, peer_t peer, msg_t&& req) {
|
||||
OPTION_ITEM option {};
|
||||
|
||||
// I know these string literals will not be modified
|
||||
@@ -688,10 +814,10 @@ void cmd_option(tcp::socket &&sock, msg_t&& req) {
|
||||
auto seqn_str = std::to_string(req->sequenceNumber);
|
||||
option.content = const_cast<char*>(seqn_str.c_str());
|
||||
|
||||
respond(sock, &option, 200, "OK", req->sequenceNumber, {});
|
||||
respond(host, peer, &option, 200, "OK", req->sequenceNumber, {});
|
||||
}
|
||||
|
||||
void cmd_describe(tcp::socket &&sock, msg_t&& req) {
|
||||
void cmd_describe(host_t &host, peer_t peer, msg_t&& req) {
|
||||
OPTION_ITEM option {};
|
||||
|
||||
// I know these string literals will not be modified
|
||||
@@ -701,10 +827,10 @@ void cmd_describe(tcp::socket &&sock, msg_t&& req) {
|
||||
option.content = const_cast<char*>(seqn_str.c_str());
|
||||
|
||||
// FIXME: Moonlight will accept the payload, but the value of the option is not correct
|
||||
respond(sock, &option, 200, "OK", req->sequenceNumber, "surround-params=NONE"sv);
|
||||
respond(host, peer, &option, 200, "OK", req->sequenceNumber, "surround-params=NONE"sv);
|
||||
}
|
||||
|
||||
void cmd_setup(tcp::socket &&sock, msg_t &&req) {
|
||||
void cmd_setup(host_t &host, peer_t peer, msg_t &&req) {
|
||||
OPTION_ITEM options[2] {};
|
||||
|
||||
auto &seqn = options[0];
|
||||
@@ -718,7 +844,7 @@ void cmd_setup(tcp::socket &&sock, msg_t &&req) {
|
||||
if(session.client_state >= 0) {
|
||||
// already streaming
|
||||
|
||||
respond(sock, &seqn, 503, "Service Unavailable", req->sequenceNumber, {});
|
||||
respond(host, peer, &seqn, 503, "Service Unavailable", req->sequenceNumber, {});
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -734,15 +860,15 @@ void cmd_setup(tcp::socket &&sock, msg_t &&req) {
|
||||
session_option.content = const_cast<char*>("DEADBEEFCAFE;timeout = 90");
|
||||
}
|
||||
else if(type != "video"sv && type != "control"sv) {
|
||||
cmd_not_found(std::move(sock), std::move(req));
|
||||
cmd_not_found(host, peer, std::move(req));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
respond(sock, &seqn, 200, "OK", req->sequenceNumber, {});
|
||||
respond(host, peer, &seqn, 200, "OK", req->sequenceNumber, {});
|
||||
}
|
||||
|
||||
void cmd_announce(tcp::socket &&sock, msg_t &&req) {
|
||||
void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
|
||||
OPTION_ITEM option {};
|
||||
|
||||
// I know these string literals will not be modified
|
||||
@@ -754,7 +880,7 @@ void cmd_announce(tcp::socket &&sock, msg_t &&req) {
|
||||
if(session.client_state >= 0) {
|
||||
// already streaming
|
||||
|
||||
respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {});
|
||||
respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {});
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -802,25 +928,24 @@ void cmd_announce(tcp::socket &&sock, msg_t &&req) {
|
||||
|
||||
try {
|
||||
|
||||
auto &config = session.config;
|
||||
config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv));
|
||||
config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv));
|
||||
config.audio.packetDuration = util::from_view(args.at("x-nv-aqos.packetDuration"sv));
|
||||
auto &config = session.config;
|
||||
config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv));
|
||||
config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv));
|
||||
config.audio.packetDuration = util::from_view(args.at("x-nv-aqos.packetDuration"sv));
|
||||
|
||||
config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv));
|
||||
config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv));
|
||||
|
||||
config.monitor.height = util::from_view(args.at("x-nv-video[0].clientViewportHt"sv));
|
||||
config.monitor.width = util::from_view(args.at("x-nv-video[0].clientViewportWd"sv));
|
||||
config.monitor.framerate = util::from_view(args.at("x-nv-video[0].maxFPS"sv));
|
||||
config.monitor.bitrate = util::from_view(args.at("x-nv-video[0].initialBitrateKbps"sv));
|
||||
config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv));
|
||||
config.monitor.height = util::from_view(args.at("x-nv-video[0].clientViewportHt"sv));
|
||||
config.monitor.width = util::from_view(args.at("x-nv-video[0].clientViewportWd"sv));
|
||||
config.monitor.framerate = util::from_view(args.at("x-nv-video[0].maxFPS"sv));
|
||||
config.monitor.bitrate = util::from_view(args.at("x-nv-video[0].initialBitrateKbps"sv));
|
||||
config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv));
|
||||
|
||||
} catch(std::out_of_range &) {
|
||||
// This piece of code is reached when for some reason, the payload length received < payload length send
|
||||
// Not sure if this is an issue with Sunshine or Moonlight or the network
|
||||
// TODO: find out
|
||||
respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
|
||||
|
||||
respond(host, peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -834,10 +959,10 @@ void cmd_announce(tcp::socket &&sock, msg_t &&req) {
|
||||
session.videoThread = std::thread {videoThread};
|
||||
session.controlThread = std::thread {controlThread};
|
||||
|
||||
respond(sock, &option, 200, "OK", req->sequenceNumber, {});
|
||||
respond(host, peer, &option, 200, "OK", req->sequenceNumber, {});
|
||||
}
|
||||
|
||||
void cmd_play(tcp::socket &&sock, msg_t &&req) {
|
||||
void cmd_play(host_t &host, peer_t peer, msg_t &&req) {
|
||||
OPTION_ITEM option {};
|
||||
|
||||
// I know these string literals will not be modified
|
||||
@@ -846,64 +971,23 @@ void cmd_play(tcp::socket &&sock, msg_t &&req) {
|
||||
auto seqn_str = std::to_string(req->sequenceNumber);
|
||||
option.content = const_cast<char*>(seqn_str.c_str());
|
||||
|
||||
respond(sock, &option, 200, "OK", req->sequenceNumber, {});
|
||||
respond(host, peer, &option, 200, "OK", req->sequenceNumber, {});
|
||||
}
|
||||
|
||||
void rtpThread() {
|
||||
session.client_state = -1;
|
||||
|
||||
asio::io_service io;
|
||||
rtsp_server_t server(RTSP_SETUP_PORT);
|
||||
|
||||
tcp::acceptor acceptor { io, tcp::endpoint { tcp::v6(), RTSP_SETUP_PORT } };
|
||||
server.map("OPTIONS"sv, &cmd_option);
|
||||
server.map("DESCRIBE"sv, &cmd_describe);
|
||||
server.map("SETUP"sv, &cmd_setup);
|
||||
server.map("ANNOUNCE"sv, &cmd_announce);
|
||||
|
||||
std::unordered_map<std::string_view, std::function<void(tcp::socket &&, msg_t &&)>> map_cmd_func;
|
||||
map_cmd_func.emplace("OPTIONS"sv, &cmd_option);
|
||||
map_cmd_func.emplace("DESCRIBE"sv, &cmd_describe);
|
||||
map_cmd_func.emplace("SETUP"sv, &cmd_setup);
|
||||
map_cmd_func.emplace("ANNOUNCE"sv, &cmd_announce);
|
||||
|
||||
map_cmd_func.emplace("PLAY"sv, &cmd_play);
|
||||
server.map("PLAY"sv, &cmd_play);
|
||||
|
||||
while(true) {
|
||||
tcp::socket sock { io };
|
||||
|
||||
acceptor.accept(sock);
|
||||
|
||||
std::array<char, 4096> rtsp_raw;
|
||||
|
||||
// As defined in moonlight-common-c/src/PlatformSockets.c
|
||||
constexpr auto TCPv4_MSS = 536;
|
||||
constexpr auto TCPv6_MSS = 1220;
|
||||
|
||||
std::size_t len = 0;
|
||||
|
||||
auto pos = std::begin(rtsp_raw);
|
||||
do {
|
||||
std::array<char, TCPv4_MSS> buf;
|
||||
auto bytes_read = sock.receive(asio::buffer(buf));
|
||||
|
||||
if(bytes_read + len > rtsp_raw.size()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::copy(std::begin(buf), std::end(buf), pos + len);
|
||||
len += bytes_read;
|
||||
} while((len % TCPv4_MSS) == 0);
|
||||
|
||||
rtsp_raw[std::min(rtsp_raw.size() -1, len)] = '\0';
|
||||
|
||||
msg_t req { new RTSP_MESSAGE {} };
|
||||
parseRtspMessage(req.get(), rtsp_raw.data(), len);
|
||||
|
||||
print_msg(req.get());
|
||||
|
||||
auto func = map_cmd_func.find(req->message.request.command);
|
||||
if(func == std::end(map_cmd_func)) {
|
||||
cmd_not_found(std::move(sock), std::move(req));
|
||||
}
|
||||
else {
|
||||
func->second(std::move(sock), std::move(req));
|
||||
}
|
||||
server.iterate(1s);
|
||||
|
||||
if(session.client_state == 0) {
|
||||
session.audioThread.join();
|
||||
|
||||
Reference in New Issue
Block a user