Add event_t

This commit is contained in:
loki
2019-12-11 19:06:52 +01:00
parent 390d65a8dc
commit e0c1e4ec55
7 changed files with 91 additions and 25 deletions
+1 -2
View File
@@ -69,8 +69,7 @@ set(SUNSHINE_TARGET_FILES
sunshine/stream.cpp sunshine/stream.cpp
sunshine/stream.h sunshine/stream.h
sunshine/video.cpp sunshine/video.cpp
sunshine/video.h sunshine/video.h sunshine/thread_safe.h
sunshine/queue.h
sunshine/input.cpp sunshine/input.cpp
sunshine/input.h sunshine/input.h
sunshine/audio.cpp sunshine/audio.cpp
+3 -3
View File
@@ -6,7 +6,7 @@
#include "platform/common.h" #include "platform/common.h"
#include "utility.h" #include "utility.h"
#include "queue.h" #include "thread_safe.h"
#include "audio.h" #include "audio.h"
namespace audio { namespace audio {
@@ -73,7 +73,7 @@ void encodeThread(std::shared_ptr<safe::queue_t<packet_t>> packets, std::shared_
} }
packet.fake_resize(bytes); packet.fake_resize(bytes);
packets->push(std::move(packet)); packets->raise(std::move(packet));
} }
} }
@@ -95,7 +95,7 @@ void capture(std::shared_ptr<safe::queue_t<packet_t>> packets, config_t config)
while(packets->running()) { while(packets->running()) {
auto sample = platf::audio(mic, bytes_per_frame); auto sample = platf::audio(mic, bytes_per_frame);
samples->push(std::move(sample)); samples->raise(std::move(sample));
} }
samples->stop(); samples->stop();
+1 -1
View File
@@ -2,7 +2,7 @@
#define SUNSHINE_AUDIO_H #define SUNSHINE_AUDIO_H
#include "utility.h" #include "utility.h"
#include "queue.h" #include "thread_safe.h"
namespace audio { namespace audio {
struct config_t { struct config_t {
int packetDuration; int packetDuration;
+5 -5
View File
@@ -28,7 +28,7 @@ extern "C" {
#include "stream.h" #include "stream.h"
#include "audio.h" #include "audio.h"
#include "video.h" #include "video.h"
#include "queue.h" #include "thread_safe.h"
#include "crypto.h" #include "crypto.h"
#include "input.h" #include "input.h"
@@ -431,7 +431,7 @@ void control_server_t::map(uint16_t type, std::function<void(const std::string_v
_map_type_cb.emplace(type, std::move(cb)); _map_type_cb.emplace(type, std::move(cb));
} }
void controlThread(video::event_queue_t idr_events) { void controlThread(video::idr_event_t idr_events) {
control_server_t server { CONTROL_PORT }; control_server_t server { CONTROL_PORT };
auto input = platf::input(); auto input = platf::input();
@@ -477,7 +477,7 @@ void controlThread(video::event_queue_t idr_events) {
std::cout << "firstFrame [" << firstFrame << ']' << std::endl; std::cout << "firstFrame [" << firstFrame << ']' << std::endl;
std::cout << "lastFrame [" << lastFrame << ']' << std::endl; std::cout << "lastFrame [" << lastFrame << ']' << std::endl;
idr_events->push(std::make_pair(firstFrame, lastFrame)); idr_events->raise(std::make_pair(firstFrame, lastFrame));
}); });
server.map(packetTypes[IDX_INPUT_DATA], [&input](const std::string_view &payload) mutable { server.map(packetTypes[IDX_INPUT_DATA], [&input](const std::string_view &payload) mutable {
@@ -584,7 +584,7 @@ void audioThread() {
captureThread.join(); captureThread.join();
} }
void videoThread(video::event_queue_t idr_events) { void videoThread(video::idr_event_t idr_events) {
auto &config = session.config; auto &config = session.config;
int lowseq = 0; int lowseq = 0;
@@ -893,7 +893,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
session.video_packets = std::make_shared<video::packet_queue_t::element_type>(); session.video_packets = std::make_shared<video::packet_queue_t::element_type>();
session.audio_packets = std::make_shared<audio::packet_queue_t::element_type>(); session.audio_packets = std::make_shared<audio::packet_queue_t::element_type>();
video::event_queue_t idr_events { new video::event_queue_t::element_type }; video::idr_event_t idr_events {new video::idr_event_t::element_type };
session.audioThread = std::thread {audioThread}; session.audioThread = std::thread {audioThread};
session.videoThread = std::thread {videoThread, idr_events}; session.videoThread = std::thread {videoThread, idr_events};
session.controlThread = std::thread {controlThread, idr_events}; session.controlThread = std::thread {controlThread, idr_events};
+71 -4
View File
@@ -2,8 +2,8 @@
// Created by loki on 6/10/19. // Created by loki on 6/10/19.
// //
#ifndef SUNSHINE_QUEUE_H #ifndef SUNSHINE_THREAD_SAFE_H
#define SUNSHINE_QUEUE_H #define SUNSHINE_THREAD_SAFE_H
#include <vector> #include <vector>
#include <mutex> #include <mutex>
@@ -12,6 +12,73 @@
#include "utility.h" #include "utility.h"
namespace safe { namespace safe {
template<class T>
class event_t {
using status_t = util::either_t<
(std::is_same_v<T, bool> ||
util::instantiation_of_v<std::unique_ptr, T> ||
util::instantiation_of_v<std::shared_ptr, T> ||
std::is_pointer_v<T>),
T, std::optional<T>>;
public:
template<class...Args>
void raise(Args &&...args) {
std::lock_guard lg { _lock };
if(!_continue) {
return;
}
_status = status_t { std::forward<Args>(args)... };
_cv.notify_all();
}
status_t pop() {
std::unique_lock ul{_lock};
if (!_continue) {
return util::false_v<status_t>;
}
while (!_status) {
_cv.wait(ul);
if (!_continue) {
return util::false_v<status_t>;
}
}
auto val = std::move(_status);
_status = util::false_v<status_t>;
return val;
}
bool peek() {
std::lock_guard lg { _lock };
return (bool)_status;
}
void stop() {
std::lock_guard lg{_lock};
_continue = false;
_cv.notify_all();
}
bool running() const {
return _continue;
}
private:
bool _continue{true};
status_t _status;
std::condition_variable _cv;
std::mutex _lock;
};
template<class T> template<class T>
class queue_t { class queue_t {
@@ -24,7 +91,7 @@ class queue_t {
public: public:
template<class ...Args> template<class ...Args>
void push(Args &&... args) { void raise(Args &&... args) {
std::lock_guard lg{_lock}; std::lock_guard lg{_lock};
if(!_continue) { if(!_continue) {
@@ -90,4 +157,4 @@ private:
} }
#endif //SUNSHINE_QUEUE_H #endif //SUNSHINE_THREAD_SAFE_H
+7 -7
View File
@@ -34,7 +34,7 @@ void free_packet(AVPacket *packet) {
using ctx_t = util::safe_ptr<AVCodecContext, free_ctx>; using ctx_t = util::safe_ptr<AVCodecContext, free_ctx>;
using frame_t = util::safe_ptr<AVFrame, free_frame>; using frame_t = util::safe_ptr<AVFrame, free_frame>;
using sws_t = util::safe_ptr<SwsContext, sws_freeContext>; using sws_t = util::safe_ptr<SwsContext, sws_freeContext>;
using img_queue_t = std::shared_ptr<safe::queue_t<platf::img_t>>; using img_event_t = std::shared_ptr<safe::event_t<platf::img_t>>;
auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) { auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) {
avcodec_open2(ctx.get(), codec, options); avcodec_open2(ctx.get(), codec, options);
@@ -78,14 +78,14 @@ void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::im
exit(1); exit(1);
} }
packets->push(std::move(packet)); packets->raise(std::move(packet));
} }
} }
void encodeThread( void encodeThread(
img_queue_t images, img_event_t images,
packet_queue_t packets, packet_queue_t packets,
event_queue_t idr_events, idr_event_t idr_events,
config_t config) { config_t config) {
int framerate = config.framerate; int framerate = config.framerate;
@@ -158,10 +158,10 @@ void encodeThread(
packets->stop(); packets->stop();
} }
void capture_display(packet_queue_t packets, event_queue_t idr_events, config_t config) { void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config) {
int framerate = config.framerate; int framerate = config.framerate;
img_queue_t images { new safe::queue_t<platf::img_t> }; img_event_t images {new img_event_t::element_type };
std::thread encoderThread { &encodeThread, images, packets, idr_events, config }; std::thread encoderThread { &encodeThread, images, packets, idr_events, config };
@@ -172,7 +172,7 @@ void capture_display(packet_queue_t packets, event_queue_t idr_events, config_t
auto next_snapshot = std::chrono::steady_clock::now() + time_span; auto next_snapshot = std::chrono::steady_clock::now() + time_span;
auto img = platf::snapshot(disp); auto img = platf::snapshot(disp);
images->push(std::move(img)); images->raise(std::move(img));
img.reset(); img.reset();
auto t = std::chrono::steady_clock::now(); auto t = std::chrono::steady_clock::now();
+3 -3
View File
@@ -5,7 +5,7 @@
#ifndef SUNSHINE_VIDEO_H #ifndef SUNSHINE_VIDEO_H
#define SUNSHINE_VIDEO_H #define SUNSHINE_VIDEO_H
#include "queue.h" #include "thread_safe.h"
struct AVPacket; struct AVPacket;
namespace video { namespace video {
@@ -13,7 +13,7 @@ void free_packet(AVPacket *packet);
using packet_t = util::safe_ptr<AVPacket, free_packet>; using packet_t = util::safe_ptr<AVPacket, free_packet>;
using packet_queue_t = std::shared_ptr<safe::queue_t<packet_t>>; using packet_queue_t = std::shared_ptr<safe::queue_t<packet_t>>;
using event_queue_t = std::shared_ptr<safe::queue_t<std::pair<int64_t, int64_t>>>; using idr_event_t = std::shared_ptr<safe::event_t<std::pair<int64_t, int64_t>>>;
struct config_t { struct config_t {
int width; int width;
@@ -23,7 +23,7 @@ struct config_t {
int slicesPerFrame; int slicesPerFrame;
}; };
void capture_display(packet_queue_t packets, event_queue_t idr_events, config_t config); void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config);
} }
#endif //SUNSHINE_VIDEO_H #endif //SUNSHINE_VIDEO_H