From 9ed29822533b98bd1589496f563361200a3e43be Mon Sep 17 00:00:00 2001 From: garionion Date: Sat, 26 Dec 2020 17:20:18 +0100 Subject: [PATCH] more controlling, more config options --- config.example.yml | 5 ++- go.mod | 2 +- go.sum | 17 ++++++++- main.go | 92 +++++++++++++++++++++++++++++----------------- playout/playout.go | 70 +++++++++++++++++++++++++++-------- store/store.go | 35 ++++++++++-------- types.go | 28 ++++++++++++++ 7 files changed, 181 insertions(+), 68 deletions(-) create mode 100644 types.go diff --git a/config.example.yml b/config.example.yml index 68a5403..54c7bf3 100644 --- a/config.example.yml +++ b/config.example.yml @@ -5,4 +5,7 @@ outputs: - "4" address: ":3000" -default_duration: "15m" \ No newline at end of file +default_duration: "15m" +playoutscript: "decklink_playout.sh" +playoutscriptpath: "/path/to/ffmpeg-scripts/playout" +tmpdir: "/tmp" \ No newline at end of file diff --git a/go.mod b/go.mod index 06dcf4f..d5cd066 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/andybalholm/brotli v1.0.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/gofiber/fiber/v2 v2.1.3 - github.com/google/uuid v1.1.2 github.com/ilyakaznacheev/cleanenv v1.2.5 + github.com/json-iterator/go v1.1.10 github.com/klauspost/compress v1.11.2 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect diff --git a/go.sum b/go.sum index be9ed56..0c9917e 100644 --- a/go.sum +++ b/go.sum @@ -4,20 +4,33 @@ github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDa github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gofiber/fiber/v2 v2.1.3 h1:d2fkRf6fkLa1uXgzXqN5iqAydjytoocS83hm1o00Ocg= github.com/gofiber/fiber/v2 v2.1.3/go.mod h1:MMiSv1HrDkN8Pv7NeVDYK+T/lwXOEKAvPBbLvJPCEfA= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/ilyakaznacheev/cleanenv v1.2.5 h1:/SlcF9GaIvefWqFJzsccGG/NJdoaAwb7Mm7ImzhO3DM= github.com/ilyakaznacheev/cleanenv v1.2.5/go.mod h1:/i3yhzwZ3s7hacNERGFwvlhwXMDcaqwIzmayEhbRplk= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ= github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.16.0 h1:9zAqOYLl8Tuy3E5R6ckzGDJ1g8+pw15oQp2iL9Jl6gQ= diff --git a/main.go b/main.go index 7a6cab4..84f5154 100644 --- a/main.go +++ b/main.go @@ -8,20 +8,35 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/ilyakaznacheev/cleanenv" + jsoniter "github.com/json-iterator/go" "log" "time" ) +var json = jsoniter.ConfigCompatibleWithStandardLibrary + type Config struct { - Outputs []string `yaml:"outputs"` - Address string `yaml:"address" env:"ADDRESS" env-default:":3000"` - DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"` + Outputs []string `yaml:"outputs"` + Address string `yaml:"address" env:"ADDRESS" env-default:":3000"` + DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"` + PlayoutScript string `yaml:"playoutscript"` + PlayoutScriptPath string `yaml:"playoutscriptpath" env:""` + ProgressDir string `yaml:"tmpdir"` } type Job struct { - StartAt string `json:"startAt,omitempty"` - StopAt string `json:"stopAt,omitempty"` - Source string `json:"source"` + StartAt time.Time `json:"startAt,omitempty"` + StopAt time.Time `json:"stopAt,omitempty"` + Source string `json:"source"` + ID int `json:"id"` + Version string `json:"version"` +} + +type ScheduledJob struct { + ID int `json:"id"` + Port string `json:"port"` + Room string `json:"room"` + Version string `json:"version"` } func schedulePlayout(s *store.Store) fiber.Handler { @@ -29,45 +44,56 @@ func schedulePlayout(s *store.Store) fiber.Handler { return func(c *fiber.Ctx) error { var p playout.Job job := new(Job) - var err error - if err := c.BodyParser(job); err != nil { - log.Println("got defective request: ", err) + + jsonErr := json.Unmarshal(c.Body(), job) + if jsonErr != nil { + log.Println("got defective request: ", jsonErr) c.SendStatus(400) - return err + return jsonErr } if job.Source == "" { c.SendStatus(400) return errors.New("Got Empty Source. I can't play »Nothing«") } + + p.ID = job.ID p.Source = job.Source - - if p.StartAt, err = time.Parse(time.RFC3339, job.StartAt); err != nil { - log.Println("got strange date: ", job.StartAt, "; error:", err) - c.SendStatus(400) - return err - } - - if job.StopAt != "" { - if p.StopAt, err = time.Parse(time.RFC3339, job.StopAt); err != nil { - log.Println("got strange date: ", job.StopAt, "; error:", err) - c.SendStatus(400) - return err - } - } else { + p.Version = job.Version + p.StartAt = job.StartAt + if job.StopAt.IsZero() { p.StopAt = p.StartAt.Add(s.DefaultDuration) + } else { + p.StopAt = job.StopAt } - s.Lock() - uid, err := s.AddPlayout(&p) - s.Unlock() - if err != nil { - c.SendStatus(500) - return fmt.Errorf("can not schedule playout: %s", err) + var output string + s.RLock() + oldPlayout, playoutExists := s.Playouts[job.ID] + s.RUnlock() + if playoutExists { + oldPlayout.ControlChannel <- "reschedule" + output = oldPlayout.Output + } else { + var err error + s.Lock() + output, err = s.AddPlayout(&p) + s.Unlock() + if err != nil { + c.SendStatus(500) + return fmt.Errorf("can not schedule playout: %s", err) + } } - c.SendString(uid.String()) - go p.Playout() + c.JSON(ScheduledJob{ + ID: p.ID, + Port: output, + Version: p.Version, + }) + go func() { + p.Playout(s.Config) + s.DeletePlayout(p.ID) + }() // staus 409 when no schedule possible @@ -83,7 +109,7 @@ func main() { log.Fatal("No configfile: ", err) } - s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration) + s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir) if err != nil { log.Fatal("Failed to init Store: ", err.Error()) } diff --git a/playout/playout.go b/playout/playout.go index e3a3aba..042d6a5 100644 --- a/playout/playout.go +++ b/playout/playout.go @@ -3,18 +3,14 @@ package playout import ( "fmt" "log" + "os/exec" + "path" "time" ) -type PlayoutType int - -const ( - rtmpPlayout PlayoutType = iota - icecastPlayout -) - type Job struct { - PlayoutType PlayoutType + ID int + Version string StartAt time.Time StopAt time.Time Source string @@ -22,14 +18,56 @@ type Job struct { ControlChannel chan string } -func (p *Job) Playout() { +type Config struct { + PlayoutScriptPath string + PlayoutScript string + ProgressDir string +} + +func (p *Job) Playout(cfg *Config) { // TODO delete playout Job from store after finishing/aborting playout - log.Println(time.Until(p.StartAt)) - select { - case ctrlMsg := <-p.ControlChannel: - fmt.Println(ctrlMsg) - return - case <-time.After(time.Until(p.StartAt)): - log.Println("Start Playout") + for { + log.Println(p.StartAt) + log.Println(time.Until(p.StartAt)) + select { + case ctrlMsg := <-p.ControlChannel: + fmt.Println(ctrlMsg) + continue + case <-time.After(time.Until(p.StartAt)): + log.Println("Start Playout") + progressPath := path.Join(cfg.ProgressDir, string(p.ID)) + playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript) + cmd := exec.Command(playoutScript, //nolint:gosec + fmt.Sprintf("-i %v", p.Source), + fmt.Sprintf("-o %v", p.Output), + fmt.Sprintf("-p %v", progressPath), + ) + cmd.Dir = cfg.PlayoutScriptPath + + err := cmd.Start() + if err != nil { + log.Printf("Failed to start Playout %v: %v", p.ID, err) + } + pid := cmd.Process.Pid + log.Printf("PID for %v: %v", p.ID, pid) + ProcessManagement: + for { + select { + case ctrlMsg := <-p.ControlChannel: + log.Println(ctrlMsg) + continue + case <-time.After(time.Until(p.StopAt)): + cmd.Process.Kill() + break ProcessManagement + } + } + err = cmd.Wait() + if err != nil { + log.Println(err) + } + log.Println(cmd.ProcessState.ExitCode()) + log.Printf("Finish %v", p.ID) + break + } } } diff --git a/store/store.go b/store/store.go index 03ac321..6391d68 100644 --- a/store/store.go +++ b/store/store.go @@ -3,35 +3,38 @@ package store import ( "errors" "ffmpeg-playout/playout" - "fmt" - "github.com/google/uuid" "log" "sync" "time" ) type Store struct { - Playouts map[uuid.UUID]*playout.Job + Playouts map[int]*playout.Job DefaultDuration time.Duration Outputs []string + *playout.Config sync.RWMutex } -func NewStore(o []string, defaultDuration string) (*Store, error) { - playouts := make(map[uuid.UUID]*playout.Job) +func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string) (*Store, error) { + playouts := make(map[int]*playout.Job) var d time.Duration var err error if d, err = time.ParseDuration(defaultDuration); err != nil { log.Fatal("Failed to set Default Duration: ", err) } - - store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o} + pcfg := playout.Config{ + PlayoutScriptPath: playoutScriptPath, + PlayoutScript: playoutScript, + ProgressDir: tmpDir, + } + store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o, Config: &pcfg} return store, nil } -func (s *Store) AddPlayout(p *playout.Job) (uuid.UUID, error) { +func (s *Store) AddPlayout(p *playout.Job) (string, error) { outputs := s.Outputs for _, value := range s.Playouts { if len(outputs) != 0 && @@ -49,15 +52,17 @@ func (s *Store) AddPlayout(p *playout.Job) (uuid.UUID, error) { if len(outputs) != 0 { output = outputs[0] } else { - return uuid.Nil, errors.New("no output available") - } - uid, err := uuid.NewUUID() - if err != nil { - return uuid.Nil, fmt.Errorf("couldn't generate uuid: %s", err.Error()) + return "", errors.New("no output available") } p.Output = output - s.Playouts[uid] = p - return uid, nil + s.Playouts[p.ID] = p + return output, nil +} + +func (s *Store) DeletePlayout(id int) { + s.Lock() + delete(s.Playouts, id) + s.Unlock() } //https://stackoverflow.com/a/15323988/10997297 diff --git a/types.go b/types.go new file mode 100644 index 0000000..1572320 --- /dev/null +++ b/types.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "strconv" + "time" +) + +type Timestamp time.Time + +func (t *Timestamp) MarshalJSON() ([]byte, error) { + ts := time.Time(*t).Unix() + stamp := fmt.Sprint(ts) + return []byte(stamp), nil +} + +func (t *Timestamp) UnmarshalJSON(b []byte) error { + ts, err := strconv.Atoi(string(b)) + if err != nil { + return err + } + *t = Timestamp(time.Unix(int64(ts), 0)) + return nil +} + +func (t *Timestamp) String() string { + return time.Time(*t).String() +}