more controlling, more config options

This commit is contained in:
garionion 2020-12-26 17:20:18 +01:00
parent 0d9ce5656f
commit 9ed2982253
7 changed files with 181 additions and 68 deletions

View file

@ -6,3 +6,6 @@ outputs:
address: ":3000" address: ":3000"
default_duration: "15m" default_duration: "15m"
playoutscript: "decklink_playout.sh"
playoutscriptpath: "/path/to/ffmpeg-scripts/playout"
tmpdir: "/tmp"

2
go.mod
View file

@ -6,8 +6,8 @@ require (
github.com/andybalholm/brotli v1.0.1 // indirect github.com/andybalholm/brotli v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9
github.com/gofiber/fiber/v2 v2.1.3 github.com/gofiber/fiber/v2 v2.1.3
github.com/google/uuid v1.1.2
github.com/ilyakaznacheev/cleanenv v1.2.5 github.com/ilyakaznacheev/cleanenv v1.2.5
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.11.2 // indirect github.com/klauspost/compress v1.11.2 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect

17
go.sum
View file

@ -4,20 +4,33 @@ github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDa
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc=
github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/gofiber/fiber/v2 v2.1.3 h1:d2fkRf6fkLa1uXgzXqN5iqAydjytoocS83hm1o00Ocg= github.com/gofiber/fiber/v2 v2.1.3 h1:d2fkRf6fkLa1uXgzXqN5iqAydjytoocS83hm1o00Ocg=
github.com/gofiber/fiber/v2 v2.1.3/go.mod h1:MMiSv1HrDkN8Pv7NeVDYK+T/lwXOEKAvPBbLvJPCEfA= github.com/gofiber/fiber/v2 v2.1.3/go.mod h1:MMiSv1HrDkN8Pv7NeVDYK+T/lwXOEKAvPBbLvJPCEfA=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/ilyakaznacheev/cleanenv v1.2.5 h1:/SlcF9GaIvefWqFJzsccGG/NJdoaAwb7Mm7ImzhO3DM= github.com/ilyakaznacheev/cleanenv v1.2.5 h1:/SlcF9GaIvefWqFJzsccGG/NJdoaAwb7Mm7ImzhO3DM=
github.com/ilyakaznacheev/cleanenv v1.2.5/go.mod h1:/i3yhzwZ3s7hacNERGFwvlhwXMDcaqwIzmayEhbRplk= github.com/ilyakaznacheev/cleanenv v1.2.5/go.mod h1:/i3yhzwZ3s7hacNERGFwvlhwXMDcaqwIzmayEhbRplk=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg= github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg=
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ= github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.16.0 h1:9zAqOYLl8Tuy3E5R6ckzGDJ1g8+pw15oQp2iL9Jl6gQ= github.com/valyala/fasthttp v1.16.0 h1:9zAqOYLl8Tuy3E5R6ckzGDJ1g8+pw15oQp2iL9Jl6gQ=

74
main.go
View file

@ -8,20 +8,35 @@ import (
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/cors"
"github.com/ilyakaznacheev/cleanenv" "github.com/ilyakaznacheev/cleanenv"
jsoniter "github.com/json-iterator/go"
"log" "log"
"time" "time"
) )
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Config struct { type Config struct {
Outputs []string `yaml:"outputs"` Outputs []string `yaml:"outputs"`
Address string `yaml:"address" env:"ADDRESS" env-default:":3000"` Address string `yaml:"address" env:"ADDRESS" env-default:":3000"`
DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"` DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"`
PlayoutScript string `yaml:"playoutscript"`
PlayoutScriptPath string `yaml:"playoutscriptpath" env:""`
ProgressDir string `yaml:"tmpdir"`
} }
type Job struct { type Job struct {
StartAt string `json:"startAt,omitempty"` StartAt time.Time `json:"startAt,omitempty"`
StopAt string `json:"stopAt,omitempty"` StopAt time.Time `json:"stopAt,omitempty"`
Source string `json:"source"` Source string `json:"source"`
ID int `json:"id"`
Version string `json:"version"`
}
type ScheduledJob struct {
ID int `json:"id"`
Port string `json:"port"`
Room string `json:"room"`
Version string `json:"version"`
} }
func schedulePlayout(s *store.Store) fiber.Handler { func schedulePlayout(s *store.Store) fiber.Handler {
@ -29,45 +44,56 @@ func schedulePlayout(s *store.Store) fiber.Handler {
return func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error {
var p playout.Job var p playout.Job
job := new(Job) job := new(Job)
var err error
if err := c.BodyParser(job); err != nil { jsonErr := json.Unmarshal(c.Body(), job)
log.Println("got defective request: ", err) if jsonErr != nil {
log.Println("got defective request: ", jsonErr)
c.SendStatus(400) c.SendStatus(400)
return err return jsonErr
} }
if job.Source == "" { if job.Source == "" {
c.SendStatus(400) c.SendStatus(400)
return errors.New("Got Empty Source. I can't play »Nothing«") return errors.New("Got Empty Source. I can't play »Nothing«")
} }
p.ID = job.ID
p.Source = job.Source p.Source = job.Source
p.Version = job.Version
if p.StartAt, err = time.Parse(time.RFC3339, job.StartAt); err != nil { p.StartAt = job.StartAt
log.Println("got strange date: ", job.StartAt, "; error:", err) if job.StopAt.IsZero() {
c.SendStatus(400)
return err
}
if job.StopAt != "" {
if p.StopAt, err = time.Parse(time.RFC3339, job.StopAt); err != nil {
log.Println("got strange date: ", job.StopAt, "; error:", err)
c.SendStatus(400)
return err
}
} else {
p.StopAt = p.StartAt.Add(s.DefaultDuration) p.StopAt = p.StartAt.Add(s.DefaultDuration)
} else {
p.StopAt = job.StopAt
} }
var output string
s.RLock()
oldPlayout, playoutExists := s.Playouts[job.ID]
s.RUnlock()
if playoutExists {
oldPlayout.ControlChannel <- "reschedule"
output = oldPlayout.Output
} else {
var err error
s.Lock() s.Lock()
uid, err := s.AddPlayout(&p) output, err = s.AddPlayout(&p)
s.Unlock() s.Unlock()
if err != nil { if err != nil {
c.SendStatus(500) c.SendStatus(500)
return fmt.Errorf("can not schedule playout: %s", err) return fmt.Errorf("can not schedule playout: %s", err)
} }
c.SendString(uid.String()) }
go p.Playout() c.JSON(ScheduledJob{
ID: p.ID,
Port: output,
Version: p.Version,
})
go func() {
p.Playout(s.Config)
s.DeletePlayout(p.ID)
}()
// staus 409 when no schedule possible // staus 409 when no schedule possible
@ -83,7 +109,7 @@ func main() {
log.Fatal("No configfile: ", err) log.Fatal("No configfile: ", err)
} }
s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration) s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir)
if err != nil { if err != nil {
log.Fatal("Failed to init Store: ", err.Error()) log.Fatal("Failed to init Store: ", err.Error())
} }

View file

@ -3,18 +3,14 @@ package playout
import ( import (
"fmt" "fmt"
"log" "log"
"os/exec"
"path"
"time" "time"
) )
type PlayoutType int
const (
rtmpPlayout PlayoutType = iota
icecastPlayout
)
type Job struct { type Job struct {
PlayoutType PlayoutType ID int
Version string
StartAt time.Time StartAt time.Time
StopAt time.Time StopAt time.Time
Source string Source string
@ -22,14 +18,56 @@ type Job struct {
ControlChannel chan string ControlChannel chan string
} }
func (p *Job) Playout() { type Config struct {
PlayoutScriptPath string
PlayoutScript string
ProgressDir string
}
func (p *Job) Playout(cfg *Config) {
// TODO delete playout Job from store after finishing/aborting playout // TODO delete playout Job from store after finishing/aborting playout
for {
log.Println(p.StartAt)
log.Println(time.Until(p.StartAt)) log.Println(time.Until(p.StartAt))
select { select {
case ctrlMsg := <-p.ControlChannel: case ctrlMsg := <-p.ControlChannel:
fmt.Println(ctrlMsg) fmt.Println(ctrlMsg)
return continue
case <-time.After(time.Until(p.StartAt)): case <-time.After(time.Until(p.StartAt)):
log.Println("Start Playout") log.Println("Start Playout")
progressPath := path.Join(cfg.ProgressDir, string(p.ID))
playoutScript := path.Join(cfg.PlayoutScriptPath, cfg.PlayoutScript)
cmd := exec.Command(playoutScript, //nolint:gosec
fmt.Sprintf("-i %v", p.Source),
fmt.Sprintf("-o %v", p.Output),
fmt.Sprintf("-p %v", progressPath),
)
cmd.Dir = cfg.PlayoutScriptPath
err := cmd.Start()
if err != nil {
log.Printf("Failed to start Playout %v: %v", p.ID, err)
}
pid := cmd.Process.Pid
log.Printf("PID for %v: %v", p.ID, pid)
ProcessManagement:
for {
select {
case ctrlMsg := <-p.ControlChannel:
log.Println(ctrlMsg)
continue
case <-time.After(time.Until(p.StopAt)):
cmd.Process.Kill()
break ProcessManagement
}
}
err = cmd.Wait()
if err != nil {
log.Println(err)
}
log.Println(cmd.ProcessState.ExitCode())
log.Printf("Finish %v", p.ID)
break
}
} }
} }

View file

@ -3,35 +3,38 @@ package store
import ( import (
"errors" "errors"
"ffmpeg-playout/playout" "ffmpeg-playout/playout"
"fmt"
"github.com/google/uuid"
"log" "log"
"sync" "sync"
"time" "time"
) )
type Store struct { type Store struct {
Playouts map[uuid.UUID]*playout.Job Playouts map[int]*playout.Job
DefaultDuration time.Duration DefaultDuration time.Duration
Outputs []string Outputs []string
*playout.Config
sync.RWMutex sync.RWMutex
} }
func NewStore(o []string, defaultDuration string) (*Store, error) { func NewStore(o []string, defaultDuration string, playoutScriptPath string, playoutScript string, tmpDir string) (*Store, error) {
playouts := make(map[uuid.UUID]*playout.Job) playouts := make(map[int]*playout.Job)
var d time.Duration var d time.Duration
var err error var err error
if d, err = time.ParseDuration(defaultDuration); err != nil { if d, err = time.ParseDuration(defaultDuration); err != nil {
log.Fatal("Failed to set Default Duration: ", err) log.Fatal("Failed to set Default Duration: ", err)
} }
pcfg := playout.Config{
store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o} PlayoutScriptPath: playoutScriptPath,
PlayoutScript: playoutScript,
ProgressDir: tmpDir,
}
store := &Store{Playouts: playouts, DefaultDuration: d, Outputs: o, Config: &pcfg}
return store, nil return store, nil
} }
func (s *Store) AddPlayout(p *playout.Job) (uuid.UUID, error) { func (s *Store) AddPlayout(p *playout.Job) (string, error) {
outputs := s.Outputs outputs := s.Outputs
for _, value := range s.Playouts { for _, value := range s.Playouts {
if len(outputs) != 0 && if len(outputs) != 0 &&
@ -49,15 +52,17 @@ func (s *Store) AddPlayout(p *playout.Job) (uuid.UUID, error) {
if len(outputs) != 0 { if len(outputs) != 0 {
output = outputs[0] output = outputs[0]
} else { } else {
return uuid.Nil, errors.New("no output available") return "", errors.New("no output available")
}
uid, err := uuid.NewUUID()
if err != nil {
return uuid.Nil, fmt.Errorf("couldn't generate uuid: %s", err.Error())
} }
p.Output = output p.Output = output
s.Playouts[uid] = p s.Playouts[p.ID] = p
return uid, nil return output, nil
}
func (s *Store) DeletePlayout(id int) {
s.Lock()
delete(s.Playouts, id)
s.Unlock()
} }
//https://stackoverflow.com/a/15323988/10997297 //https://stackoverflow.com/a/15323988/10997297

28
types.go Normal file
View file

@ -0,0 +1,28 @@
package main
import (
"fmt"
"strconv"
"time"
)
type Timestamp time.Time
func (t *Timestamp) MarshalJSON() ([]byte, error) {
ts := time.Time(*t).Unix()
stamp := fmt.Sprint(ts)
return []byte(stamp), nil
}
func (t *Timestamp) UnmarshalJSON(b []byte) error {
ts, err := strconv.Atoi(string(b))
if err != nil {
return err
}
*t = Timestamp(time.Unix(int64(ts), 0))
return nil
}
func (t *Timestamp) String() string {
return time.Time(*t).String()
}