Refactored to allow for multiple targets with different bitrates, gradual backoff of retrying failed ffmpeg runs, and a config.json that works better.

main
Zed A. Shaw 3 weeks ago
parent b0274b8254
commit 984d757914
  1. 27
      config.cpp
  2. 22
      config.hpp
  3. 17
      config.json
  4. 117
      main.cpp
  5. 2
      meson.build
  6. 83
      server.cpp
  7. 39
      server.hpp

@ -0,0 +1,27 @@
#include "config.hpp"
#include <fstream>
using namespace nlohmann;
Config::Config(const string json_file) {
load(json_file);
}
void Config::load(const string file_name) {
std::ifstream infile(file_name);
json_config = json::parse(infile);
listen_at = json_config["listen_at"].template get<string>();
fail_max = json_config["fail_max"].template get<int>();
json send_list = json_config["send_to"];
for(auto &el : send_list.items()) {
json spec = el.value();
SendTo send_spec{
.url = spec["url"].template get<string>(),
.bitrate = spec["bitrate"].template get<string>()
};
send_to[el.key()] = send_spec;
}
}

@ -0,0 +1,22 @@
#pragma once
#include <nlohmann/json.hpp>
#include <map>
using std::string;
struct SendTo {
string url = "";
string bitrate = "";
};
struct Config {
nlohmann::json json_config;
string listen_at = "";
string bitrate = "";
int fail_max = 6;
std::map<string, SendTo> send_to;
Config(const string json_file);
void load(const string file_name);
};

@ -1,5 +1,18 @@
{
"listen_at": "rtmp://192.168.254.147/",
"bitrate": "7M",
"send_to": "https://127.0.0.1:5001/"
"fail_max": 10,
"send_to": {
"Youtube": {
"url": "https://127.0.0.1:5001/",
"bitrate": "9M"
},
"Twitch": {
"url": "https://127.0.0.1:5001/",
"bitrate": "7M"
},
"Twitter": {
"url": "https://127.0.0.1:5001/",
"bitrate": "9M"
}
}
}

@ -1,123 +1,10 @@
#define FSM_DEBUG 1
#include <fmt/core.h>
#include <fmt/chrono.h>
#include <nlohmann/json.hpp>
#include <fstream>
#include <chrono>
#include <thread>
#include "dbc.hpp"
#include "fsm.hpp"
#include "server.hpp"
#include "config.hpp"
#define BUF_MAX 1024 * 10
using namespace fmt;
using namespace nlohmann;
using namespace std::chrono_literals;
struct Config {
json json_config;
string listen_at = "";
string bitrate = "";
string send_to = "";
Config(const string json_file) {
load(json_file);
}
void load(const string file_name) {
std::ifstream infile(file_name);
json_config = json::parse(infile);
listen_at = json_config["listen_at"].template get<string>();
bitrate = json_config["bitrate"].template get<string>();
send_to = json_config["send_to"].template get<string>();
}
};
FILE *run_ffmpeg(Config &config) {
string ffmpeg_cmd = format("ffmpeg -listen 1 -i {} -bufsize 3000k -maxrate {} -flags +global_header -c:v libx264 -preset veryfast -tune zerolatency -g:v 60 -vb {} -c:a copy -f flv {} 2> output.log",
config.listen_at,
config.bitrate,
config.bitrate,
config.send_to);
println("RUNNING: {}", ffmpeg_cmd);
return popen(ffmpeg_cmd.c_str(), "re");
}
enum class ServerState {
START, READING, STOP, ERROR
};
enum class ServerEvent {
GO
};
class Server : DeadSimpleFSM<ServerState, ServerEvent> {
Config &config;
FILE *handle = nullptr;
char buffer[BUF_MAX];
char *res = nullptr;
std::chrono::milliseconds wait_time = 200ms;
public:
Server(Config &config) : config(config) {};
bool stopped() {
return in_state(ServerState::STOP);
}
void event(ServerEvent ev) {
switch(_state) {
FSM_STATE(ServerState, START, ev);
FSM_STATE(ServerState, READING, ev);
FSM_STATE(ServerState, STOP, ev);
FSM_STATE(ServerState, ERROR, ev);
}
}
void START(ServerEvent ev) {
handle = run_ffmpeg(config);
if(handle == nullptr) {
dbc::log("ERROR when launching ffmpeg");
state(ServerState::ERROR);
} else {
wait_time = 200ms;
state(ServerState::READING);
}
}
void READING(ServerEvent ev) {
res = fgets(buffer, BUF_MAX, handle);
if(res == nullptr) {
dbc::log("STOP shutting down...");
state(ServerState::STOP);
} else {
state(ServerState::READING);
}
}
void STOP(ServerEvent ev) {
int rc = pclose(handle);
if(rc != 0) {
dbc::log("ERROR when calling pclose on ffmpeg");
state(ServerState::ERROR);
} else {
state(ServerState::STOP);
}
}
void ERROR(ServerEvent ev) {
println("Error in server, waiting {}...", wait_time);
wait_time *= 2;
std::this_thread::sleep_for(wait_time);
dbc::log("Attempting to run it again...");
state(ServerState::START);
}
};
int main(int argc, char *argv[]) {
dbc::check(argc == 2, "USAGE: distributary config.json");

@ -16,6 +16,8 @@ runtests = executable('runtests', [
distributary = executable('distributary', [
'main.cpp',
'dbc.cpp',
'config.cpp',
'server.cpp',
],
dependencies: depends)

@ -0,0 +1,83 @@
#include "server.hpp"
#include <nlohmann/json.hpp>
#include <fmt/core.h>
#include <fmt/chrono.h>
#include <thread>
#include "dbc.hpp"
using namespace fmt;
FILE *run_ffmpeg(Config &config) {
string ffmpeg_cmd = format("ffmpeg -listen 1 -i {} -bufsize 3000k",
config.listen_at);
for(auto &el : config.send_to) {
SendTo send_spec = el.second;
ffmpeg_cmd += format(" -maxrate {} -flags +global_header -c:v libx264 -preset veryfast -tune zerolatency -g:v 60 -vb {} -c:a copy -f flv {}",
send_spec.bitrate, send_spec.bitrate, send_spec.url);
}
println("RUNNING: {}", ffmpeg_cmd);
return popen(ffmpeg_cmd.c_str(), "re");
}
bool Server::stopped() {
return in_state(ServerState::STOP);
}
void Server::event(ServerEvent ev) {
switch(_state) {
FSM_STATE(ServerState, START, ev);
FSM_STATE(ServerState, READING, ev);
FSM_STATE(ServerState, STOP, ev);
FSM_STATE(ServerState, ERROR, ev);
}
}
void Server::START(ServerEvent ev) {
handle = run_ffmpeg(config);
if(handle == nullptr) {
dbc::log("ERROR when launching ffmpeg");
state(ServerState::ERROR);
} else {
wait_time = 200ms;
fail_count = 0;
state(ServerState::READING);
}
}
void Server::READING(ServerEvent ev) {
res = fgets(buffer, BUF_MAX, handle);
if(res == nullptr) {
dbc::log("STOP shutting down...");
state(ServerState::STOP);
} else {
state(ServerState::READING);
}
}
void Server::STOP(ServerEvent ev) {
int rc = pclose(handle);
if(rc != 0) {
dbc::log("ERROR when calling pclose on ffmpeg");
state(ServerState::ERROR);
} else {
state(ServerState::STOP);
}
}
void Server::ERROR(ServerEvent ev) {
fail_count++;
if(fail_count < config.fail_max) {
println("Error in server, waiting {}...", wait_time);
wait_time *= 2;
std::this_thread::sleep_for(wait_time);
dbc::log("Attempting to run it again...");
state(ServerState::START);
} else {
dbc::log("ffmpeg failed to many times, exiting.");
state(ServerState::STOP);
}
}

@ -0,0 +1,39 @@
#pragma once
#include "fsm.hpp"
#include "config.hpp"
#include <chrono>
using namespace nlohmann;
using namespace std::chrono_literals;
#define BUF_MAX 1024 * 10
enum class ServerState {
START, READING, STOP, ERROR
};
enum class ServerEvent {
GO
};
class Server : DeadSimpleFSM<ServerState, ServerEvent> {
Config &config;
FILE *handle = nullptr;
char buffer[BUF_MAX];
char *res = nullptr;
std::chrono::milliseconds wait_time = 200ms;
int fail_count = 0;
public:
Server(Config &config) : config(config) {};
bool stopped();
void event(ServerEvent ev);
// states
void START(ServerEvent ev);
void READING(ServerEvent ev);
void STOP(ServerEvent ev);
void ERROR(ServerEvent ev);
};
Loading…
Cancel
Save