diff --git a/go.mod b/go.mod index d5cd066..bd584e2 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/andybalholm/brotli v1.0.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/gofiber/fiber/v2 v2.1.3 + github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d github.com/ilyakaznacheev/cleanenv v1.2.5 github.com/json-iterator/go v1.1.10 github.com/klauspost/compress v1.11.2 // indirect diff --git a/go.sum b/go.sum index 0c9917e..0c31e31 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/gofiber/fiber/v2 v2.1.3 h1:d2fkRf6fkLa1uXgzXqN5iqAydjytoocS83hm1o00Ocg= github.com/gofiber/fiber/v2 v2.1.3/go.mod h1:MMiSv1HrDkN8Pv7NeVDYK+T/lwXOEKAvPBbLvJPCEfA= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d h1:Q2+KsA/1GLC9xyLsDun3/EOJ+83rY/IHRsO1DToPrdo= +github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d/go.mod h1:RInr+B3/Tx70hYm0rpNPMTD7vH0pBG5ny/JsHAs2KcQ= github.com/ilyakaznacheev/cleanenv v1.2.5 h1:/SlcF9GaIvefWqFJzsccGG/NJdoaAwb7Mm7ImzhO3DM= github.com/ilyakaznacheev/cleanenv v1.2.5/go.mod h1:/i3yhzwZ3s7hacNERGFwvlhwXMDcaqwIzmayEhbRplk= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= diff --git a/main.go b/main.go index 187c6f3..f31174b 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "github.com/ilyakaznacheev/cleanenv" jsoniter "github.com/json-iterator/go" "log" + "net" "time" ) @@ -22,6 +23,7 @@ type Config struct { PlayoutScript string `yaml:"playoutscript"` PlayoutScriptPath string `yaml:"playoutscriptpath" env:""` ProgressDir string `yaml:"tmpdir"` + OutputFormat string `yaml:"outputFormat"` PrometheusPushGateway string `yaml:"prometheusPushGateway"` } @@ -91,10 +93,23 @@ func schedulePlayout(s *store.Store) fiber.Handler { Port: output, Version: p.Version, }) - go func() { - p.Playout(s.Config) - s.DeletePlayout(p.ID) - }() + if !playoutExists { + go func() { + log.Printf("Start Scheduling %v", p.ID) + Waiting: + for { + select { + case ctrlMsg := <-p.ControlChannel: + fmt.Println(ctrlMsg) + continue + case <-time.After(time.Until(p.StartAt)): + break Waiting + } + } + p.Playout(s.Config) + s.DeletePlayout(p.ID) + }() + } // staus 409 when no schedule possible @@ -110,7 +125,7 @@ func main() { log.Fatal("No configfile: ", err) } - s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway) + s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway, cfg.OutputFormat) if err != nil { log.Fatal("Failed to init Store: ", err.Error()) } @@ -123,5 +138,9 @@ func main() { }) app.Post("/schedulePlayout", schedulePlayout(s)) - log.Fatal(app.Listen(cfg.Address)) + ln, err := net.Listen("tcp", cfg.Address) + if err != nil { + log.Fatal(err) + } + log.Fatal(app.Listener(ln)) } diff --git a/playout/playout.go b/playout/playout.go index 9c799e0..ba28b77 100644 --- a/playout/playout.go +++ b/playout/playout.go @@ -4,6 +4,7 @@ import ( "bytes" "ffmpeg-playout/status" "fmt" + "github.com/grafov/bcast" "log" "net/http" "net/url" @@ -11,6 +12,7 @@ import ( "os/exec" "path" "strconv" + "syscall" "time" ) @@ -29,96 +31,118 @@ type Config struct { PlayoutScript string ProgressDir string PrometheusPushGateway string + OutputFormat string +} + +func monitorFFmpeg(cfg *Config, progressPath string, f *bcast.Member, output string) { + stats := make(chan status.Status) + c := 0 + for { + if c > 15 { + return + } + if _, err := os.Stat(progressPath); os.IsNotExist(err) { + time.Sleep(1 * time.Second) + c++ + continue + } + err := status.FFmpegStatusWatcher(stats, progressPath) + if err != nil { + log.Println(err) + continue + } else { + break + } + } + u, _ := url.Parse(cfg.PrometheusPushGateway) + u.Path = path.Join(u.Path, "metrics", "job", "Decklink_Outputs", output) + c = 0 + + for c < 4 { + select { + case <-f.Read: + return + case stat := <-stats: + metric := fmt.Sprintf("frame %v\n dup_frames %v\n bitrate %v\n fps %v\n speed %v\n", stat.Frame, stat.Dupframes, stat.Bitrate, stat.FPS, stat.Speed) + resp, err := http.Post(u.String(), "text/plain", bytes.NewBuffer([]byte(metric))) + if err != nil { + // log.Println(err) + continue + } + resp.Body.Close() + case <-time.After(4 * time.Second): + c++ + break + } + } + f.Send("finished") + f.Close() +} + +func killProcess(cmd *exec.Cmd, id int) { + log.Printf("Kill %v", cmd.Process.Pid) + err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + if err != nil { + log.Printf("Failed to kill %v: %v", cmd.Process.Pid, err) + } + log.Println(cmd.ProcessState.ExitCode()) + log.Printf("Finish %d", id) } //nolint:funlen func (p *Job) Playout(cfg *Config) { - // TODO delete playout Job from store after finishing/aborting playout + progressPath := path.Join(cfg.ProgressDir, strconv.Itoa(p.ID)) + playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript) for { - select { - case ctrlMsg := <-p.ControlChannel: - fmt.Println(ctrlMsg) - continue - case <-time.After(time.Until(p.StartAt)): - log.Printf("Start Playout for %v", p.ID) - progressPath := path.Join(cfg.ProgressDir, strconv.Itoa(p.ID)) - playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript) - cmd := exec.Command(playoutScript, //nolint:gosec - fmt.Sprintf("-i %v", p.Source), - fmt.Sprintf("-o %v", p.Output), - fmt.Sprintf("-p %v", progressPath), - ) - cmd.Dir = cfg.PlayoutScriptPath + log.Printf("Start Playout for %v", p.ID) + cmd := exec.Command(playoutScript, //nolint:gosec + "-i", p.Source, + "-o", p.Output, + "-f", cfg.OutputFormat, + "-p", progressPath, + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Dir = cfg.PlayoutScriptPath + var outbuf, errbuf bytes.Buffer + cmd.Stdout = &outbuf + cmd.Stderr = &errbuf - err := cmd.Start() - if err != nil { - log.Printf("Failed to start Playout %v: %v", p.ID, err) - } - pid := cmd.Process.Pid - log.Printf("PID for %v: %v", p.ID, pid) - log.Println(cmd.Args) - go func() { - stats := make(chan status.Status) - c := 0 - for { - if c > 15 { - return - } - if _, err := os.Stat("/path/to/whatever"); os.IsNotExist(err) { - time.Sleep(1 * time.Second) - c++ - continue - } - err := status.FFmpegStatusWatcher(stats, progressPath) - if err != nil { - log.Println(err) - continue - } else { - break - } - } - u, _ := url.Parse(cfg.PrometheusPushGateway) - u.Path = path.Join(u.Path, "metrics", "job", "Decklink_Outputs", p.Output) - c = 0 - for c < 4 { - select { - case stat := <-stats: - metric := fmt.Sprintf("frame %v\n dup_frames %v\n bitrate %v\n fps %v\n speed %v\n", stat.Frame, stat.Dupframes, stat.Bitrate, stat.FPS, stat.Speed) - resp, err := http.Post(u.Path, "text/plain", bytes.NewBuffer([]byte(metric))) - if err != nil { - log.Println(err) - } - log.Println(metric) - defer resp.Body.Close() - case <-time.After(4 * time.Second): - c++ - continue - } - } - }() - ProcessManagement: - for { - select { - case ctrlMsg := <-p.ControlChannel: - log.Println(ctrlMsg) - continue - case <-time.After(time.Until(p.StopAt)): - log.Printf("Kill %v", cmd.Process.Pid) - err := cmd.Process.Kill() - if err != nil { - log.Printf("Failed to kill %v: %v", cmd.Process.Pid, err) - } - break ProcessManagement - } - } - err = cmd.Wait() + err := cmd.Start() + if err != nil { + log.Printf("Failed to start Playout %v: %v", p.ID, err) + } + defer killProcess(cmd, p.ID) + pid := cmd.Process.Pid + log.Printf("PID for %v: %v", p.ID, pid) + log.Println(cmd.Args) + + finishChannel := bcast.NewGroup() + go finishChannel.Broadcast(0) + go func(cmd *exec.Cmd, f *bcast.Member) { + err := cmd.Wait() if err != nil { log.Println(err) } - out, _ := cmd.CombinedOutput() - log.Println(string(out)) - log.Println(cmd.ProcessState.ExitCode()) - log.Printf("Finish %v", p.ID) + f.Send("finished") + f.Close() + }(cmd, finishChannel.Join()) + + go monitorFFmpeg(cfg, progressPath, finishChannel.Join(), p.Output) + fChannelMember := finishChannel.Join() + ProcessManagement: + for { + select { + case <-fChannelMember.Read: + break ProcessManagement + case ctrlMsg := <-p.ControlChannel: + log.Println(ctrlMsg) + break + case <-time.After(time.Until(p.StopAt)): + break ProcessManagement + } + } + + if time.Now().After(p.StopAt) { return } } diff --git a/status/status.go b/status/status.go index 454a628..8d4e824 100644 --- a/status/status.go +++ b/status/status.go @@ -2,13 +2,13 @@ package status import ( "bufio" - "fmt" "github.com/fsnotify/fsnotify" "io" "log" "os" "strconv" "strings" + "time" ) type Status struct { @@ -61,21 +61,24 @@ func FFmpegStatusWatcher(c chan<- Status, path string) error { case "fps": ffmpegStatus.FPS, err = strconv.ParseFloat(value, 64) case "bitrate": - ffmpegStatus.Bitrate, err = strconv.ParseFloat(strings.TrimRight(value, "kbits/s"), 64) + ffmpegStatus.Bitrate, err = strconv.ParseFloat(strings.TrimSuffix(value, "kbits/s"), 64) case "out_time_ms": // test["Progress.Out_time_ms"], err = strconv.Atoi(value) continue case "dup_frames": ffmpegStatus.Dupframes, err = strconv.Atoi(value) case "speed": - ffmpegStatus.Speed, err = strconv.ParseFloat(strings.TrimRight(value, "x"), 64) + ffmpegStatus.Speed, err = strconv.ParseFloat(strings.TrimSuffix(value, "x"), 64) } if err != nil { - fmt.Println(err) + log.Println(err) } } c <- ffmpegStatus } + case <-time.After(5 * time.Second): + log.Println("Timeout of Status Reading") + return case err, ok := <-watcher.Errors: if !ok { return diff --git a/store/store.go b/store/store.go index 3bb4076..5dcba74 100644 --- a/store/store.go +++ b/store/store.go @@ -16,7 +16,7 @@ type Store struct { sync.RWMutex } -func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string, prometheus string) (*Store, error) { +func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string, prometheus string, format string) (*Store, error) { playouts := make(map[int]*playout.Job) var d time.Duration @@ -28,6 +28,7 @@ func NewStore(o []string, defaultDuration string, playoutScriptPath string, play PlayoutScriptPath: playoutScriptPath, PlayoutScript: playoutScript, ProgressDir: tmpDir, + OutputFormat: format, PrometheusPushGateway: prometheus, } store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o, Config: &pcfg}