add monitoring via prometheus

This commit is contained in:
garionion 2020-12-27 16:52:31 +01:00
parent 9ed2982253
commit 3c82aa0308
5 changed files with 166 additions and 21 deletions

View file

@ -8,4 +8,5 @@ address: ":3000"
default_duration: "15m"
playoutscript: "decklink_playout.sh"
playoutscriptpath: "/path/to/ffmpeg-scripts/playout"
tmpdir: "/tmp"
tmpdir: "/tmp"
prometheusPushGateway: "http://localhost:9091"

15
main.go
View file

@ -16,12 +16,13 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Config struct {
Outputs []string `yaml:"outputs"`
Address string `yaml:"address" env:"ADDRESS" env-default:":3000"`
DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"`
PlayoutScript string `yaml:"playoutscript"`
PlayoutScriptPath string `yaml:"playoutscriptpath" env:""`
ProgressDir string `yaml:"tmpdir"`
Outputs []string `yaml:"outputs"`
Address string `yaml:"address" env:"ADDRESS" env-default:":3000"`
DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"`
PlayoutScript string `yaml:"playoutscript"`
PlayoutScriptPath string `yaml:"playoutscriptpath" env:""`
ProgressDir string `yaml:"tmpdir"`
PrometheusPushGateway string `yaml:"prometheusPushGateway"`
}
type Job struct {
@ -109,7 +110,7 @@ func main() {
log.Fatal("No configfile: ", err)
}
s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir)
s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway)
if err != nil {
log.Fatal("Failed to init Store: ", err.Error())
}

View file

@ -1,10 +1,16 @@
package playout
import (
"bytes"
"ffmpeg-playout/status"
"fmt"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strconv"
"time"
)
@ -19,23 +25,23 @@ type Job struct {
}
type Config struct {
PlayoutScriptPath string
PlayoutScript string
ProgressDir string
PlayoutScriptPath string
PlayoutScript string
ProgressDir string
PrometheusPushGateway string
}
//nolint:funlen
func (p *Job) Playout(cfg *Config) {
// TODO delete playout Job from store after finishing/aborting playout
for {
log.Println(p.StartAt)
log.Println(time.Until(p.StartAt))
select {
case ctrlMsg := <-p.ControlChannel:
fmt.Println(ctrlMsg)
continue
case <-time.After(time.Until(p.StartAt)):
log.Println("Start Playout")
progressPath := path.Join(cfg.ProgressDir, string(p.ID))
log.Printf("Start Playout for %v", p.ID)
progressPath := path.Join(cfg.ProgressDir, strconv.Itoa(p.ID))
playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript)
cmd := exec.Command(playoutScript, //nolint:gosec
fmt.Sprintf("-i %v", p.Source),
@ -50,6 +56,46 @@ func (p *Job) Playout(cfg *Config) {
}
pid := cmd.Process.Pid
log.Printf("PID for %v: %v", p.ID, pid)
log.Println(cmd.Args)
go func() {
stats := make(chan status.Status)
c := 0
for {
if c > 15 {
return
}
if _, err := os.Stat("/path/to/whatever"); os.IsNotExist(err) {
time.Sleep(1 * time.Second)
c++
continue
}
err := status.FFmpegStatusWatcher(stats, progressPath)
if err != nil {
log.Println(err)
continue
} else {
break
}
}
u, _ := url.Parse(cfg.PrometheusPushGateway)
u.Path = path.Join(u.Path, "metrics", "job", "Decklink_Outputs", p.Output)
c = 0
for c < 4 {
select {
case stat := <-stats:
metric := fmt.Sprintf("frame %v\n dup_frames %v\n bitrate %v\n fps %v\n speed %v\n", stat.Frame, stat.Dupframes, stat.Bitrate, stat.FPS, stat.Speed)
resp, err := http.Post(u.Path, "text/plain", bytes.NewBuffer([]byte(metric)))
if err != nil {
log.Println(err)
}
log.Println(metric)
defer resp.Body.Close()
case <-time.After(4 * time.Second):
c++
continue
}
}
}()
ProcessManagement:
for {
select {
@ -57,7 +103,11 @@ func (p *Job) Playout(cfg *Config) {
log.Println(ctrlMsg)
continue
case <-time.After(time.Until(p.StopAt)):
cmd.Process.Kill()
log.Printf("Kill %v", cmd.Process.Pid)
err := cmd.Process.Kill()
if err != nil {
log.Printf("Failed to kill %v: %v", cmd.Process.Pid, err)
}
break ProcessManagement
}
}
@ -65,9 +115,11 @@ func (p *Job) Playout(cfg *Config) {
if err != nil {
log.Println(err)
}
out, _ := cmd.CombinedOutput()
log.Println(string(out))
log.Println(cmd.ProcessState.ExitCode())
log.Printf("Finish %v", p.ID)
break
return
}
}
}

90
status/status.go Normal file
View file

@ -0,0 +1,90 @@
package status
import (
"bufio"
"fmt"
"github.com/fsnotify/fsnotify"
"io"
"log"
"os"
"strconv"
"strings"
)
type Status struct {
Frame int
FPS float64
Bitrate float64
Dupframes int
Speed float64
}
//nolint:funlen
func FFmpegStatusWatcher(c chan<- Status, path string) error {
var ffmpegStatus Status
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
defer watcher.Close()
defer close(c)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
} else if err != nil {
log.Println(err)
}
splits := strings.Split(line, "=")
key := strings.TrimSpace(splits[0])
value := strings.TrimSpace(splits[1])
switch key {
case "frame":
ffmpegStatus.Frame, err = strconv.Atoi(value)
case "fps":
ffmpegStatus.FPS, err = strconv.ParseFloat(value, 64)
case "bitrate":
ffmpegStatus.Bitrate, err = strconv.ParseFloat(strings.TrimRight(value, "kbits/s"), 64)
case "out_time_ms":
// test["Progress.Out_time_ms"], err = strconv.Atoi(value)
continue
case "dup_frames":
ffmpegStatus.Dupframes, err = strconv.Atoi(value)
case "speed":
ffmpegStatus.Speed, err = strconv.ParseFloat(strings.TrimRight(value, "x"), 64)
}
if err != nil {
fmt.Println(err)
}
}
c <- ffmpegStatus
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add(path)
return err
}

View file

@ -16,7 +16,7 @@ type Store struct {
sync.RWMutex
}
func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string) (*Store, error) {
func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string, prometheus string) (*Store, error) {
playouts := make(map[int]*playout.Job)
var d time.Duration
@ -25,9 +25,10 @@ func NewStore(o []string, defaultDuration string, playoutScriptPath string, play
log.Fatal("Failed to set Default Duration: ", err)
}
pcfg := playout.Config{
PlayoutScriptPath: playoutScriptPath,
PlayoutScript: playoutScript,
ProgressDir: tmpDir,
PlayoutScriptPath: playoutScriptPath,
PlayoutScript: playoutScript,
ProgressDir: tmpDir,
PrometheusPushGateway: prometheus,
}
store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o, Config: &pcfg}