package playout import ( "bytes" "fmt" "git.entr0py.de/garionion/ffmpeg-playout/status" "github.com/grafov/bcast" "log" "net/http" "net/url" "os" "os/exec" "path" "strconv" "syscall" "time" ) type Job struct { ID int64 Version string StartAt time.Time StopAt time.Time Source string Output string ControlChannel chan string } type Config struct { PlayoutScriptPath string PlayoutScript string ProgressDir string PrometheusPushGateway string OutputFormat string } func monitorFFmpeg(cfg *Config, progressPath string, f *bcast.Member, output string) { stats := make(chan status.Status) c := 0 for { if c > 15 { return } if _, err := os.Stat(progressPath); os.IsNotExist(err) { time.Sleep(1 * time.Second) c++ continue } err := status.FFmpegStatusWatcher(stats, progressPath) if err != nil { log.Println(err) continue } else { break } } u, _ := url.Parse(cfg.PrometheusPushGateway) u.Path = path.Join(u.Path, "metrics", "job", "Decklink_Outputs", output) c = 0 for c < 4 { select { case <-f.Read: return case stat := <-stats: metric := fmt.Sprintf("frame %v\n dup_frames %v\n bitrate %v\n fps %v\n speed %v\n", stat.Frame, stat.Dupframes, stat.Bitrate, stat.FPS, stat.Speed) resp, err := http.Post(u.String(), "text/plain", bytes.NewBuffer([]byte(metric))) if err != nil { // log.Println(err) continue } resp.Body.Close() case <-time.After(4 * time.Second): c++ break } } f.Send("finished") f.Close() } func killProcess(cmd *exec.Cmd, id int64) { log.Printf("Kill %v", cmd.Process.Pid) err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) if err != nil { log.Printf("Failed to kill %v: %v", cmd.Process.Pid, err) } log.Println(cmd.ProcessState.ExitCode()) log.Printf("Finish %d", id) } //nolint:funlen func (p *Job) Playout(cfg *Config) { progressPath := path.Join(cfg.ProgressDir, strconv.FormatInt(p.ID, 10)) playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript) for { log.Printf("Start Playout for %v", p.ID) log.Println(p.StartAt) log.Println(p.StopAt) log.Println(time.Until(p.StopAt)) cmd := exec.Command(playoutScript, //nolint:gosec "-i", p.Source, "-o", p.Output, "-f", cfg.OutputFormat, "-p", progressPath, ) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} cmd.Dir = cfg.PlayoutScriptPath var outbuf, errbuf bytes.Buffer cmd.Stdout = &outbuf cmd.Stderr = &errbuf err := cmd.Start() if err != nil { log.Printf("Failed to start Playout %v: %v", p.ID, err) } defer killProcess(cmd, p.ID) pid := cmd.Process.Pid log.Printf("PID for %v: %v", p.ID, pid) log.Println(cmd.Args) finishChannel := bcast.NewGroup() go finishChannel.Broadcast(0) go func(cmd *exec.Cmd, f *bcast.Member) { err := cmd.Wait() if err != nil { log.Println(err) } f.Send("finished") f.Close() }(cmd, finishChannel.Join()) go monitorFFmpeg(cfg, progressPath, finishChannel.Join(), p.Output) fChannelMember := finishChannel.Join() ProcessManagement: for { select { case <-fChannelMember.Read: break ProcessManagement case ctrlMsg := <-p.ControlChannel: log.Printf("%d Control Message: %s", p.ID, ctrlMsg) break case <-time.After(time.Until(p.StopAt)): break ProcessManagement } } if time.Now().After(p.StopAt) { return } } }