A simple program that accepts RTMP streams and then restreams them directly to other services. I use it to record at a higher rate but stream to target services at the rate they want. It's also useful for streaming to multiple sites.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
distributary/main.cpp

132 lines
3.0 KiB

#define FSM_DEBUG 1
#include <fmt/core.h>
#include <fmt/chrono.h>
#include <nlohmann/json.hpp>
#include <fstream>
#include <chrono>
#include <thread>
#include "dbc.hpp"
#include "fsm.hpp"
#define BUF_MAX 1024 * 10
using namespace fmt;
using namespace nlohmann;
using namespace std::chrono_literals;
struct Config {
json json_config;
string listen_at = "";
string bitrate = "";
string send_to = "";
Config(const string json_file) {
load(json_file);
}
void load(const string file_name) {
std::ifstream infile(file_name);
json_config = json::parse(infile);
listen_at = json_config["listen_at"].template get<string>();
bitrate = json_config["bitrate"].template get<string>();
send_to = json_config["send_to"].template get<string>();
}
};
FILE *run_ffmpeg(Config &config) {
string ffmpeg_cmd = format("ffmpeg -listen 1 -i {} -bufsize 3000k -maxrate {} -flags +global_header -c:v libx264 -preset veryfast -tune zerolatency -g:v 60 -vb {} -c:a copy -f flv {} 2> output.log",
config.listen_at,
config.bitrate,
config.bitrate,
config.send_to);
println("RUNNING: {}", ffmpeg_cmd);
return popen(ffmpeg_cmd.c_str(), "re");
}
enum class ServerState {
START, READING, STOP, ERROR
};
enum class ServerEvent {
GO
};
class Server : DeadSimpleFSM<ServerState, ServerEvent> {
Config &config;
FILE *handle = nullptr;
char buffer[BUF_MAX];
char *res = nullptr;
std::chrono::milliseconds wait_time = 200ms;
public:
Server(Config &config) : config(config) {};
bool stopped() {
return in_state(ServerState::STOP);
}
void event(ServerEvent ev) {
switch(_state) {
FSM_STATE(ServerState, START, ev);
FSM_STATE(ServerState, READING, ev);
FSM_STATE(ServerState, STOP, ev);
FSM_STATE(ServerState, ERROR, ev);
}
}
void START(ServerEvent ev) {
handle = run_ffmpeg(config);
if(handle == nullptr) {
dbc::log("ERROR when launching ffmpeg");
state(ServerState::ERROR);
} else {
wait_time = 200ms;
state(ServerState::READING);
}
}
void READING(ServerEvent ev) {
res = fgets(buffer, BUF_MAX, handle);
if(res == nullptr) {
dbc::log("STOP shutting down...");
state(ServerState::STOP);
} else {
state(ServerState::READING);
}
}
void STOP(ServerEvent ev) {
int rc = pclose(handle);
if(rc != 0) {
dbc::log("ERROR when calling pclose on ffmpeg");
state(ServerState::ERROR);
} else {
state(ServerState::STOP);
}
}
void ERROR(ServerEvent ev) {
println("Error in server, waiting {}...", wait_time);
wait_time *= 2;
std::this_thread::sleep_for(wait_time);
dbc::log("Attempting to run it again...");
state(ServerState::START);
}
};
int main(int argc, char *argv[]) {
dbc::check(argc == 2, "USAGE: distributary config.json");
println("Using config {}", argv[1]);
Config config(argv[1]);
Server server(config);
while(!server.stopped()) {
server.event(ServerEvent::GO);
}
}