From e0c1e4ec55b65b750f267d46784b83fa9a548813 Mon Sep 17 00:00:00 2001 From: loki Date: Wed, 11 Dec 2019 19:06:52 +0100 Subject: [PATCH] Add event_t --- CMakeLists.txt | 3 +- sunshine/audio.cpp | 6 +-- sunshine/audio.h | 2 +- sunshine/stream.cpp | 10 ++-- sunshine/{queue.h => thread_safe.h} | 75 +++++++++++++++++++++++++++-- sunshine/video.cpp | 14 +++--- sunshine/video.h | 6 +-- 7 files changed, 91 insertions(+), 25 deletions(-) rename sunshine/{queue.h => thread_safe.h} (51%) diff --git a/CMakeLists.txt b/CMakeLists.txt index ee68f7d7..71d92ea1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -69,8 +69,7 @@ set(SUNSHINE_TARGET_FILES sunshine/stream.cpp sunshine/stream.h sunshine/video.cpp - sunshine/video.h - sunshine/queue.h + sunshine/video.h sunshine/thread_safe.h sunshine/input.cpp sunshine/input.h sunshine/audio.cpp diff --git a/sunshine/audio.cpp b/sunshine/audio.cpp index b7bb2df7..2bd76b16 100644 --- a/sunshine/audio.cpp +++ b/sunshine/audio.cpp @@ -6,7 +6,7 @@ #include "platform/common.h" #include "utility.h" -#include "queue.h" +#include "thread_safe.h" #include "audio.h" namespace audio { @@ -73,7 +73,7 @@ void encodeThread(std::shared_ptr> packets, std::shared_ } packet.fake_resize(bytes); - packets->push(std::move(packet)); + packets->raise(std::move(packet)); } } @@ -95,7 +95,7 @@ void capture(std::shared_ptr> packets, config_t config) while(packets->running()) { auto sample = platf::audio(mic, bytes_per_frame); - samples->push(std::move(sample)); + samples->raise(std::move(sample)); } samples->stop(); diff --git a/sunshine/audio.h b/sunshine/audio.h index eb64b0ed..9b181fdf 100644 --- a/sunshine/audio.h +++ b/sunshine/audio.h @@ -2,7 +2,7 @@ #define SUNSHINE_AUDIO_H #include "utility.h" -#include "queue.h" +#include "thread_safe.h" namespace audio { struct config_t { int packetDuration; diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index d0006f9e..116c3d80 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -28,7 +28,7 @@ extern "C" { #include "stream.h" #include "audio.h" #include "video.h" -#include "queue.h" +#include "thread_safe.h" #include "crypto.h" #include "input.h" @@ -431,7 +431,7 @@ void control_server_t::map(uint16_t type, std::functionpush(std::make_pair(firstFrame, lastFrame)); + idr_events->raise(std::make_pair(firstFrame, lastFrame)); }); server.map(packetTypes[IDX_INPUT_DATA], [&input](const std::string_view &payload) mutable { @@ -584,7 +584,7 @@ void audioThread() { captureThread.join(); } -void videoThread(video::event_queue_t idr_events) { +void videoThread(video::idr_event_t idr_events) { auto &config = session.config; int lowseq = 0; @@ -893,7 +893,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { session.video_packets = std::make_shared(); session.audio_packets = std::make_shared(); - video::event_queue_t idr_events { new video::event_queue_t::element_type }; + video::idr_event_t idr_events {new video::idr_event_t::element_type }; session.audioThread = std::thread {audioThread}; session.videoThread = std::thread {videoThread, idr_events}; session.controlThread = std::thread {controlThread, idr_events}; diff --git a/sunshine/queue.h b/sunshine/thread_safe.h similarity index 51% rename from sunshine/queue.h rename to sunshine/thread_safe.h index cd76bed1..7112339a 100644 --- a/sunshine/queue.h +++ b/sunshine/thread_safe.h @@ -2,8 +2,8 @@ // Created by loki on 6/10/19. // -#ifndef SUNSHINE_QUEUE_H -#define SUNSHINE_QUEUE_H +#ifndef SUNSHINE_THREAD_SAFE_H +#define SUNSHINE_THREAD_SAFE_H #include #include @@ -12,6 +12,73 @@ #include "utility.h" namespace safe { +template +class event_t { + using status_t = util::either_t< + (std::is_same_v || + util::instantiation_of_v || + util::instantiation_of_v || + std::is_pointer_v), + T, std::optional>; + +public: + template + void raise(Args &&...args) { + std::lock_guard lg { _lock }; + if(!_continue) { + return; + } + + _status = status_t { std::forward(args)... }; + + _cv.notify_all(); + } + + status_t pop() { + std::unique_lock ul{_lock}; + + if (!_continue) { + return util::false_v; + } + + while (!_status) { + _cv.wait(ul); + + if (!_continue) { + return util::false_v; + } + } + + auto val = std::move(_status); + _status = util::false_v; + return val; + } + + bool peek() { + std::lock_guard lg { _lock }; + + return (bool)_status; + } + + void stop() { + std::lock_guard lg{_lock}; + + _continue = false; + + _cv.notify_all(); + } + + bool running() const { + return _continue; + } +private: + + bool _continue{true}; + status_t _status; + + std::condition_variable _cv; + std::mutex _lock; +}; template class queue_t { @@ -24,7 +91,7 @@ class queue_t { public: template - void push(Args &&... args) { + void raise(Args &&... args) { std::lock_guard lg{_lock}; if(!_continue) { @@ -90,4 +157,4 @@ private: } -#endif //SUNSHINE_QUEUE_H +#endif //SUNSHINE_THREAD_SAFE_H diff --git a/sunshine/video.cpp b/sunshine/video.cpp index 4bcff2d5..45e92039 100644 --- a/sunshine/video.cpp +++ b/sunshine/video.cpp @@ -34,7 +34,7 @@ void free_packet(AVPacket *packet) { using ctx_t = util::safe_ptr; using frame_t = util::safe_ptr; using sws_t = util::safe_ptr; -using img_queue_t = std::shared_ptr>; +using img_event_t = std::shared_ptr>; auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) { avcodec_open2(ctx.get(), codec, options); @@ -78,14 +78,14 @@ void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::im exit(1); } - packets->push(std::move(packet)); + packets->raise(std::move(packet)); } } void encodeThread( - img_queue_t images, + img_event_t images, packet_queue_t packets, - event_queue_t idr_events, + idr_event_t idr_events, config_t config) { int framerate = config.framerate; @@ -158,10 +158,10 @@ void encodeThread( packets->stop(); } -void capture_display(packet_queue_t packets, event_queue_t idr_events, config_t config) { +void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config) { int framerate = config.framerate; - img_queue_t images { new safe::queue_t }; + img_event_t images {new img_event_t::element_type }; std::thread encoderThread { &encodeThread, images, packets, idr_events, config }; @@ -172,7 +172,7 @@ void capture_display(packet_queue_t packets, event_queue_t idr_events, config_t auto next_snapshot = std::chrono::steady_clock::now() + time_span; auto img = platf::snapshot(disp); - images->push(std::move(img)); + images->raise(std::move(img)); img.reset(); auto t = std::chrono::steady_clock::now(); diff --git a/sunshine/video.h b/sunshine/video.h index c07d1d26..f2f793d5 100644 --- a/sunshine/video.h +++ b/sunshine/video.h @@ -5,7 +5,7 @@ #ifndef SUNSHINE_VIDEO_H #define SUNSHINE_VIDEO_H -#include "queue.h" +#include "thread_safe.h" struct AVPacket; namespace video { @@ -13,7 +13,7 @@ void free_packet(AVPacket *packet); using packet_t = util::safe_ptr; using packet_queue_t = std::shared_ptr>; -using event_queue_t = std::shared_ptr>>; +using idr_event_t = std::shared_ptr>>; struct config_t { int width; @@ -23,7 +23,7 @@ struct config_t { int slicesPerFrame; }; -void capture_display(packet_queue_t packets, event_queue_t idr_events, config_t config); +void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config); } #endif //SUNSHINE_VIDEO_H