diff --git a/config.example.yml b/config.example.yml index 54c7bf3..56cb745 100644 --- a/config.example.yml +++ b/config.example.yml @@ -8,4 +8,5 @@ address: ":3000" default_duration: "15m" playoutscript: "decklink_playout.sh" playoutscriptpath: "/path/to/ffmpeg-scripts/playout" -tmpdir: "/tmp" \ No newline at end of file +tmpdir: "/tmp" +prometheusPushGateway: "http://localhost:9091" \ No newline at end of file diff --git a/main.go b/main.go index 84f5154..187c6f3 100644 --- a/main.go +++ b/main.go @@ -16,12 +16,13 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary type Config struct { - Outputs []string `yaml:"outputs"` - Address string `yaml:"address" env:"ADDRESS" env-default:":3000"` - DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"` - PlayoutScript string `yaml:"playoutscript"` - PlayoutScriptPath string `yaml:"playoutscriptpath" env:""` - ProgressDir string `yaml:"tmpdir"` + Outputs []string `yaml:"outputs"` + Address string `yaml:"address" env:"ADDRESS" env-default:":3000"` + DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"` + PlayoutScript string `yaml:"playoutscript"` + PlayoutScriptPath string `yaml:"playoutscriptpath" env:""` + ProgressDir string `yaml:"tmpdir"` + PrometheusPushGateway string `yaml:"prometheusPushGateway"` } type Job struct { @@ -109,7 +110,7 @@ func main() { log.Fatal("No configfile: ", err) } - s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir) + s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway) if err != nil { log.Fatal("Failed to init Store: ", err.Error()) } diff --git a/playout/playout.go b/playout/playout.go index 042d6a5..9c799e0 100644 --- a/playout/playout.go +++ b/playout/playout.go @@ -1,10 +1,16 @@ package playout import ( + "bytes" + "ffmpeg-playout/status" "fmt" "log" + "net/http" + "net/url" + "os" "os/exec" "path" + "strconv" "time" ) @@ -19,23 +25,23 @@ type Job struct { } type Config struct { - PlayoutScriptPath string - PlayoutScript string - ProgressDir string + PlayoutScriptPath string + PlayoutScript string + ProgressDir string + PrometheusPushGateway string } +//nolint:funlen func (p *Job) Playout(cfg *Config) { // TODO delete playout Job from store after finishing/aborting playout for { - log.Println(p.StartAt) - log.Println(time.Until(p.StartAt)) select { case ctrlMsg := <-p.ControlChannel: fmt.Println(ctrlMsg) continue case <-time.After(time.Until(p.StartAt)): - log.Println("Start Playout") - progressPath := path.Join(cfg.ProgressDir, string(p.ID)) + log.Printf("Start Playout for %v", p.ID) + progressPath := path.Join(cfg.ProgressDir, strconv.Itoa(p.ID)) playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript) cmd := exec.Command(playoutScript, //nolint:gosec fmt.Sprintf("-i %v", p.Source), @@ -50,6 +56,46 @@ func (p *Job) Playout(cfg *Config) { } pid := cmd.Process.Pid log.Printf("PID for %v: %v", p.ID, pid) + log.Println(cmd.Args) + go func() { + stats := make(chan status.Status) + c := 0 + for { + if c > 15 { + return + } + if _, err := os.Stat("/path/to/whatever"); os.IsNotExist(err) { + time.Sleep(1 * time.Second) + c++ + continue + } + err := status.FFmpegStatusWatcher(stats, progressPath) + if err != nil { + log.Println(err) + continue + } else { + break + } + } + u, _ := url.Parse(cfg.PrometheusPushGateway) + u.Path = path.Join(u.Path, "metrics", "job", "Decklink_Outputs", p.Output) + c = 0 + for c < 4 { + select { + case stat := <-stats: + metric := fmt.Sprintf("frame %v\n dup_frames %v\n bitrate %v\n fps %v\n speed %v\n", stat.Frame, stat.Dupframes, stat.Bitrate, stat.FPS, stat.Speed) + resp, err := http.Post(u.Path, "text/plain", bytes.NewBuffer([]byte(metric))) + if err != nil { + log.Println(err) + } + log.Println(metric) + defer resp.Body.Close() + case <-time.After(4 * time.Second): + c++ + continue + } + } + }() ProcessManagement: for { select { @@ -57,7 +103,11 @@ func (p *Job) Playout(cfg *Config) { log.Println(ctrlMsg) continue case <-time.After(time.Until(p.StopAt)): - cmd.Process.Kill() + log.Printf("Kill %v", cmd.Process.Pid) + err := cmd.Process.Kill() + if err != nil { + log.Printf("Failed to kill %v: %v", cmd.Process.Pid, err) + } break ProcessManagement } } @@ -65,9 +115,11 @@ func (p *Job) Playout(cfg *Config) { if err != nil { log.Println(err) } + out, _ := cmd.CombinedOutput() + log.Println(string(out)) log.Println(cmd.ProcessState.ExitCode()) log.Printf("Finish %v", p.ID) - break + return } } } diff --git a/status/status.go b/status/status.go new file mode 100644 index 0000000..454a628 --- /dev/null +++ b/status/status.go @@ -0,0 +1,90 @@ +package status + +import ( + "bufio" + "fmt" + "github.com/fsnotify/fsnotify" + "io" + "log" + "os" + "strconv" + "strings" +) + +type Status struct { + Frame int + FPS float64 + Bitrate float64 + Dupframes int + Speed float64 +} + +//nolint:funlen +func FFmpegStatusWatcher(c chan<- Status, path string) error { + var ffmpegStatus Status + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + reader := bufio.NewReader(file) + + defer watcher.Close() + defer close(c) + + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if event.Op&fsnotify.Write == fsnotify.Write { + for { + line, err := reader.ReadString('\n') + if err == io.EOF { + break + } else if err != nil { + log.Println(err) + } + splits := strings.Split(line, "=") + key := strings.TrimSpace(splits[0]) + value := strings.TrimSpace(splits[1]) + switch key { + case "frame": + ffmpegStatus.Frame, err = strconv.Atoi(value) + case "fps": + ffmpegStatus.FPS, err = strconv.ParseFloat(value, 64) + case "bitrate": + ffmpegStatus.Bitrate, err = strconv.ParseFloat(strings.TrimRight(value, "kbits/s"), 64) + case "out_time_ms": + // test["Progress.Out_time_ms"], err = strconv.Atoi(value) + continue + case "dup_frames": + ffmpegStatus.Dupframes, err = strconv.Atoi(value) + case "speed": + ffmpegStatus.Speed, err = strconv.ParseFloat(strings.TrimRight(value, "x"), 64) + } + if err != nil { + fmt.Println(err) + } + } + c <- ffmpegStatus + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("error:", err) + } + } + }() + + err = watcher.Add(path) + return err +} diff --git a/store/store.go b/store/store.go index 6391d68..3bb4076 100644 --- a/store/store.go +++ b/store/store.go @@ -16,7 +16,7 @@ type Store struct { sync.RWMutex } -func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string) (*Store, error) { +func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string, prometheus string) (*Store, error) { playouts := make(map[int]*playout.Job) var d time.Duration @@ -25,9 +25,10 @@ func NewStore(o []string, defaultDuration string, playoutScriptPath string, play log.Fatal("Failed to set Default Duration: ", err) } pcfg := playout.Config{ - PlayoutScriptPath: playoutScriptPath, - PlayoutScript: playoutScript, - ProgressDir: tmpDir, + PlayoutScriptPath: playoutScriptPath, + PlayoutScript: playoutScript, + ProgressDir: tmpDir, + PrometheusPushGateway: prometheus, } store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o, Config: &pcfg}