Fix bad function call

This commit is contained in:
loki
2020-02-08 23:41:27 +01:00
parent 834f7b9063
commit 4b216d6676
6 changed files with 168 additions and 140 deletions

View File

@@ -43,10 +43,32 @@ static const short packetTypes[] = {
0x0100, // Termination
};
constexpr auto VIDEO_STREAM_PORT = 47998;
constexpr auto CONTROL_PORT = 47999;
constexpr auto AUDIO_STREAM_PORT = 48000;
namespace asio = boost::asio;
namespace sys = boost::system;
using asio::ip::tcp;
using asio::ip::udp;
using namespace std::literals;
namespace stream {
enum class socket_e : int {
video,
audio
};
enum class state_e : int {
STOPPED,
STOPPING,
STARTING,
RUNNING,
};
#pragma pack(push, 1)
struct video_packet_raw_t {
@@ -72,19 +94,59 @@ using rh_t = util::safe_ptr<reed_solomon, reed_solomon_release>;
using video_packet_t = util::c_ptr<video_packet_raw_t>;
using audio_packet_t = util::c_ptr<audio_packet_raw_t>;
using message_queue_t = std::shared_ptr<safe::queue_t<std::pair<std::uint16_t, std::string>>>;
using message_queue_queue_t = std::shared_ptr<safe::queue_t<std::tuple<socket_e, asio::ip::address, message_queue_t>>>;
using session_queue_t = std::shared_ptr<safe::queue_t<std::pair<std::string, std::shared_ptr<session_t>>>>;
struct broadcast_ctx_t {
safe::event_t<bool> shutdown_event;
video::packet_queue_t video_packets;
audio::packet_queue_t audio_packets;
message_queue_queue_t message_queue_queue;
session_queue_t session_queue;
std::thread video_thread;
std::thread audio_thread;
std::thread control_thread;
std::thread recv_thread;
asio::io_service io;
udp::socket video_sock { io, udp::endpoint(udp::v4(), VIDEO_STREAM_PORT) };
udp::socket audio_sock { io, udp::endpoint(udp::v4(), AUDIO_STREAM_PORT) };
};
struct session_t {
config_t config;
std::thread audioThread;
std::thread videoThread;
std::chrono::steady_clock::time_point pingTimeout;
safe::shared_t<broadcast_ctx_t>::ptr_t broadcast_ref;
udp::endpoint video_peer;
udp::endpoint audio_peer;
video::idr_event_t idr_events;
crypto::aes_t gcm_key;
crypto::aes_t iv;
std::atomic<state_e> state;
};
void videoThread(std::shared_ptr<session_t> session, std::string addr_str);
void audioThread(std::shared_ptr<session_t> session, std::string addr_str);
int start_broadcast(broadcast_ctx_t &ctx);
void end_broadcast(broadcast_ctx_t &ctx);
std::shared_ptr<input::input_t> input;
static auto broadcast = safe::make_shared<broadcast_ctx_t>(start_broadcast, end_broadcast);
void stop(session_t &session) {
session.idr_events->stop();
auto expected = state_e::RUNNING;
session.state.compare_exchange_strong(expected, state_e::STOPPING);
}
class control_server_t {
public:
control_server_t(control_server_t &&) noexcept = default;
@@ -379,6 +441,8 @@ void controlBroadcastThread(safe::event_t<bool> *shutdown_event, session_queue_t
payload[1] = reason;
server.send(std::string_view {(char*)payload.data(), payload.size()});
//TODO: Terminate session
}
server.iterate(500ms);
@@ -388,44 +452,21 @@ void controlBroadcastThread(safe::event_t<bool> *shutdown_event, session_queue_t
void recvThread(broadcast_ctx_t &ctx) {
std::map<asio::ip::address, message_queue_t> peer_to_video_session;
std::map<asio::ip::address, message_queue_t> peer_to_audio_session;
std::map<asio::ip::address, message_queue_t> peer_to_control_session;
auto &video_sock = ctx.video_sock;
auto &audio_sock = ctx.audio_sock;
auto &session_queue = ctx.message_queue_queue;
auto &message_queue_queue = ctx.message_queue_queue;
auto &io = ctx.io;
udp::endpoint peer;
std::array<char, 2048> buf[2];
std::function<void(const boost::system::error_code, size_t)> recv_func[2];
auto recv_func_factory = [&](udp::socket &sock, int buf_elem, std::map<asio::ip::address, message_queue_t> &peer_to_session) {
std::function<void(const boost::system::error_code, size_t)> recv_func = [&](const boost::system::error_code &ec, size_t bytes) {
if(ec || !bytes) {
BOOST_LOG(fatal) << "Couldn't receive data from udp socket: "sv << ec.message();
log_flush();
std::abort();
}
auto it = peer_to_session.find(peer.address());
if(it != std::end(peer_to_session)) {
it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes });
}
sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func);
};
return recv_func;
};
video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func_factory(video_sock, 0, peer_to_video_session));
audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func_factory(audio_sock, 1, peer_to_audio_session));
while(!ctx.shutdown_event.peek()) {
while(session_queue->peek()) {
auto message_queue_opt = session_queue->pop();
auto populate_peer_to_session = [&]() {
while(message_queue_queue->peek()) {
auto message_queue_opt = message_queue_queue->pop();
TUPLE_3D_REF(socket_type, addr, message_queue, *message_queue_opt);
switch(socket_type) {
@@ -447,6 +488,36 @@ void recvThread(broadcast_ctx_t &ctx) {
break;
}
}
};
auto recv_func_init = [&](udp::socket &sock, int buf_elem, std::map<asio::ip::address, message_queue_t> &peer_to_session) {
recv_func[buf_elem] = [&,buf_elem](const boost::system::error_code &ec, size_t bytes) {
populate_peer_to_session();
if(ec || !bytes) {
BOOST_LOG(fatal) << "Couldn't receive data from udp socket: "sv << ec.message();
log_flush();
std::abort();
}
auto it = peer_to_session.find(peer.address());
if(it != std::end(peer_to_session)) {
it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes });
}
sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func[buf_elem]);
};
};
recv_func_init(video_sock, 0, peer_to_video_session);
recv_func_init(audio_sock, 1, peer_to_audio_session);
video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]);
audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]);
while(!ctx.shutdown_event.peek()) {
io.run_one();
}
@@ -696,4 +767,43 @@ void audioThread(std::shared_ptr<session_t> session, std::string addr_str) {
audio::capture(ref->audio_packets, session->config.audio, session.get());
}
void stop(session_t &session) {
session.idr_events->stop();
auto expected = state_e::RUNNING;
session.state.compare_exchange_strong(expected, state_e::STOPPING);
}
void join(session_t &session) {
BOOST_LOG(debug) << "Waiting for video to end..."sv;
session.videoThread.join();
BOOST_LOG(debug) << "Waiting for audio to end..."sv;
session.audioThread.join();
}
void start_session(std::shared_ptr<session_t> session, const std::string &addr_string) {
session->broadcast_ref = broadcast.ref();
session->broadcast_ref->session_queue->raise(addr_string, session);
session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
session->audioThread = std::thread {audioThread, session, addr_string};
session->videoThread = std::thread {videoThread, session, addr_string};
session->state.store(state_e::RUNNING, std::memory_order_relaxed);
}
std::shared_ptr<session_t> alloc_session(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) {
auto session = std::make_shared<session_t>();
session->config = config;
session->gcm_key = gcm_key;
session->iv = iv;
session->idr_events = std::make_shared<video::idr_event_t::element_type>();
session->state.store(state_e::STOPPED, std::memory_order_relaxed);
return session;
}
}