From 984d757914bf477f8425ac14b6152bc003cb2ee5 Mon Sep 17 00:00:00 2001 From: "Zed A. Shaw" Date: Sun, 22 Sep 2024 18:13:25 -0400 Subject: [PATCH] Refactored to allow for multiple targets with different bitrates, gradual backoff of retrying failed ffmpeg runs, and a config.json that works better. --- config.cpp | 27 ++++++++++++ config.hpp | 22 ++++++++++ config.json | 17 +++++++- main.cpp | 117 +--------------------------------------------------- meson.build | 2 + server.cpp | 83 +++++++++++++++++++++++++++++++++++++ server.hpp | 39 ++++++++++++++++++ 7 files changed, 190 insertions(+), 117 deletions(-) create mode 100644 config.cpp create mode 100644 config.hpp create mode 100644 server.cpp create mode 100644 server.hpp diff --git a/config.cpp b/config.cpp new file mode 100644 index 0000000..b812c9d --- /dev/null +++ b/config.cpp @@ -0,0 +1,27 @@ +#include "config.hpp" +#include + +using namespace nlohmann; + +Config::Config(const string json_file) { + load(json_file); +} + +void Config::load(const string file_name) { + std::ifstream infile(file_name); + json_config = json::parse(infile); + + listen_at = json_config["listen_at"].template get(); + fail_max = json_config["fail_max"].template get(); + json send_list = json_config["send_to"]; + + for(auto &el : send_list.items()) { + json spec = el.value(); + SendTo send_spec{ + .url = spec["url"].template get(), + .bitrate = spec["bitrate"].template get() + }; + + send_to[el.key()] = send_spec; + } +} diff --git a/config.hpp b/config.hpp new file mode 100644 index 0000000..fb84465 --- /dev/null +++ b/config.hpp @@ -0,0 +1,22 @@ +#pragma once +#include +#include + +using std::string; + +struct SendTo { + string url = ""; + string bitrate = ""; +}; + +struct Config { + nlohmann::json json_config; + string listen_at = ""; + string bitrate = ""; + int fail_max = 6; + std::map send_to; + + Config(const string json_file); + + void load(const string file_name); +}; diff --git a/config.json b/config.json index 941059f..4b5f681 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,18 @@ { "listen_at": "rtmp://192.168.254.147/", - "bitrate": "7M", - "send_to": "https://127.0.0.1:5001/" + "fail_max": 10, + "send_to": { + "Youtube": { + "url": "https://127.0.0.1:5001/", + "bitrate": "9M" + }, + "Twitch": { + "url": "https://127.0.0.1:5001/", + "bitrate": "7M" + }, + "Twitter": { + "url": "https://127.0.0.1:5001/", + "bitrate": "9M" + } + } } diff --git a/main.cpp b/main.cpp index 5a37742..9daa096 100644 --- a/main.cpp +++ b/main.cpp @@ -1,123 +1,10 @@ #define FSM_DEBUG 1 #include -#include -#include -#include -#include -#include #include "dbc.hpp" -#include "fsm.hpp" +#include "server.hpp" +#include "config.hpp" -#define BUF_MAX 1024 * 10 using namespace fmt; -using namespace nlohmann; -using namespace std::chrono_literals; - -struct Config { - json json_config; - string listen_at = ""; - string bitrate = ""; - string send_to = ""; - - Config(const string json_file) { - load(json_file); - } - - void load(const string file_name) { - std::ifstream infile(file_name); - json_config = json::parse(infile); - - listen_at = json_config["listen_at"].template get(); - bitrate = json_config["bitrate"].template get(); - send_to = json_config["send_to"].template get(); - } -}; - -FILE *run_ffmpeg(Config &config) { - string ffmpeg_cmd = format("ffmpeg -listen 1 -i {} -bufsize 3000k -maxrate {} -flags +global_header -c:v libx264 -preset veryfast -tune zerolatency -g:v 60 -vb {} -c:a copy -f flv {} 2> output.log", - config.listen_at, - config.bitrate, - config.bitrate, - config.send_to); - - println("RUNNING: {}", ffmpeg_cmd); - - return popen(ffmpeg_cmd.c_str(), "re"); -} - - -enum class ServerState { - START, READING, STOP, ERROR -}; - -enum class ServerEvent { - GO -}; - -class Server : DeadSimpleFSM { - Config &config; - FILE *handle = nullptr; - char buffer[BUF_MAX]; - char *res = nullptr; - std::chrono::milliseconds wait_time = 200ms; - -public: - - Server(Config &config) : config(config) {}; - - bool stopped() { - return in_state(ServerState::STOP); - } - - void event(ServerEvent ev) { - switch(_state) { - FSM_STATE(ServerState, START, ev); - FSM_STATE(ServerState, READING, ev); - FSM_STATE(ServerState, STOP, ev); - FSM_STATE(ServerState, ERROR, ev); - } - } - - void START(ServerEvent ev) { - handle = run_ffmpeg(config); - if(handle == nullptr) { - dbc::log("ERROR when launching ffmpeg"); - state(ServerState::ERROR); - } else { - wait_time = 200ms; - state(ServerState::READING); - } - } - - void READING(ServerEvent ev) { - res = fgets(buffer, BUF_MAX, handle); - - if(res == nullptr) { - dbc::log("STOP shutting down..."); - state(ServerState::STOP); - } else { - state(ServerState::READING); - } - } - - void STOP(ServerEvent ev) { - int rc = pclose(handle); - if(rc != 0) { - dbc::log("ERROR when calling pclose on ffmpeg"); - state(ServerState::ERROR); - } else { - state(ServerState::STOP); - } - } - - void ERROR(ServerEvent ev) { - println("Error in server, waiting {}...", wait_time); - wait_time *= 2; - std::this_thread::sleep_for(wait_time); - dbc::log("Attempting to run it again..."); - state(ServerState::START); - } -}; int main(int argc, char *argv[]) { dbc::check(argc == 2, "USAGE: distributary config.json"); diff --git a/meson.build b/meson.build index 9d1e365..073733e 100644 --- a/meson.build +++ b/meson.build @@ -16,6 +16,8 @@ runtests = executable('runtests', [ distributary = executable('distributary', [ 'main.cpp', 'dbc.cpp', + 'config.cpp', + 'server.cpp', ], dependencies: depends) diff --git a/server.cpp b/server.cpp new file mode 100644 index 0000000..3af872b --- /dev/null +++ b/server.cpp @@ -0,0 +1,83 @@ +#include "server.hpp" +#include +#include +#include +#include +#include "dbc.hpp" + +using namespace fmt; + +FILE *run_ffmpeg(Config &config) { + string ffmpeg_cmd = format("ffmpeg -listen 1 -i {} -bufsize 3000k", + config.listen_at); + + for(auto &el : config.send_to) { + SendTo send_spec = el.second; + ffmpeg_cmd += format(" -maxrate {} -flags +global_header -c:v libx264 -preset veryfast -tune zerolatency -g:v 60 -vb {} -c:a copy -f flv {}", + send_spec.bitrate, send_spec.bitrate, send_spec.url); + } + + println("RUNNING: {}", ffmpeg_cmd); + + return popen(ffmpeg_cmd.c_str(), "re"); +} + +bool Server::stopped() { + return in_state(ServerState::STOP); +} + +void Server::event(ServerEvent ev) { + switch(_state) { + FSM_STATE(ServerState, START, ev); + FSM_STATE(ServerState, READING, ev); + FSM_STATE(ServerState, STOP, ev); + FSM_STATE(ServerState, ERROR, ev); + } +} + +void Server::START(ServerEvent ev) { + handle = run_ffmpeg(config); + if(handle == nullptr) { + dbc::log("ERROR when launching ffmpeg"); + state(ServerState::ERROR); + } else { + wait_time = 200ms; + fail_count = 0; + state(ServerState::READING); + } +} + +void Server::READING(ServerEvent ev) { + res = fgets(buffer, BUF_MAX, handle); + + if(res == nullptr) { + dbc::log("STOP shutting down..."); + state(ServerState::STOP); + } else { + state(ServerState::READING); + } +} + +void Server::STOP(ServerEvent ev) { + int rc = pclose(handle); + if(rc != 0) { + dbc::log("ERROR when calling pclose on ffmpeg"); + state(ServerState::ERROR); + } else { + state(ServerState::STOP); + } +} + +void Server::ERROR(ServerEvent ev) { + fail_count++; + if(fail_count < config.fail_max) { + println("Error in server, waiting {}...", wait_time); + wait_time *= 2; + std::this_thread::sleep_for(wait_time); + dbc::log("Attempting to run it again..."); + state(ServerState::START); + } else { + dbc::log("ffmpeg failed to many times, exiting."); + state(ServerState::STOP); + } +} diff --git a/server.hpp b/server.hpp new file mode 100644 index 0000000..87cf273 --- /dev/null +++ b/server.hpp @@ -0,0 +1,39 @@ +#pragma once +#include "fsm.hpp" +#include "config.hpp" +#include +using namespace nlohmann; +using namespace std::chrono_literals; + +#define BUF_MAX 1024 * 10 + +enum class ServerState { + START, READING, STOP, ERROR +}; + +enum class ServerEvent { + GO +}; + +class Server : DeadSimpleFSM { + Config &config; + FILE *handle = nullptr; + char buffer[BUF_MAX]; + char *res = nullptr; + std::chrono::milliseconds wait_time = 200ms; + int fail_count = 0; + +public: + + Server(Config &config) : config(config) {}; + + bool stopped(); + + void event(ServerEvent ev); + + // states + void START(ServerEvent ev); + void READING(ServerEvent ev); + void STOP(ServerEvent ev); + void ERROR(ServerEvent ev); +};