ffmpeg-playout/main.go

127 lines
2.9 KiB
Go
Raw Normal View History

2020-11-05 18:34:54 +01:00
package main
import (
2020-12-29 17:34:34 +01:00
"context"
2020-11-05 18:34:54 +01:00
"errors"
"fmt"
2021-05-22 11:37:14 +02:00
"git.entr0py.de/garionion/ffmpeg-playout/api"
"git.entr0py.de/garionion/ffmpeg-playout/playout"
"git.entr0py.de/garionion/ffmpeg-playout/store"
2020-11-05 18:34:54 +01:00
"github.com/ilyakaznacheev/cleanenv"
2020-12-29 17:34:34 +01:00
"google.golang.org/grpc"
2020-11-05 18:34:54 +01:00
"log"
2020-12-28 22:20:35 +01:00
"net"
2020-11-05 18:34:54 +01:00
"time"
)
2020-12-29 17:34:34 +01:00
type server struct {
api.UnimplementedPlayoutServer
*store.Store
}
2020-12-26 17:20:18 +01:00
2020-11-05 18:34:54 +01:00
type Config struct {
2020-12-27 16:52:31 +01:00
Outputs []string `yaml:"outputs"`
Address string `yaml:"address" env:"ADDRESS" env-default:":3000"`
DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"`
PlayoutScript string `yaml:"playoutscript"`
PlayoutScriptPath string `yaml:"playoutscriptpath" env:""`
ProgressDir string `yaml:"tmpdir"`
2020-12-28 22:20:35 +01:00
OutputFormat string `yaml:"outputFormat"`
2020-12-27 16:52:31 +01:00
PrometheusPushGateway string `yaml:"prometheusPushGateway"`
2020-11-05 18:34:54 +01:00
}
2020-12-29 22:31:47 +01:00
func (g *server) SchedulePlayout(ctx context.Context, job *api.Job) (*api.ScheduledJob, error) {
2020-12-29 17:34:34 +01:00
var p *playout.Job
2020-11-05 18:34:54 +01:00
2020-12-29 17:34:34 +01:00
if job.Source == "" {
return nil, errors.New("Got Empty Source. I can't play »Nothing«")
}
2020-11-05 18:34:54 +01:00
2020-12-29 17:34:34 +01:00
var output string
newPlayoutJob := new(playout.Job)
g.Store.RLock()
olPlayoutJob, playoutExists := g.Store.Playouts[job.ID]
g.Store.RUnlock()
if playoutExists {
p = olPlayoutJob
} else {
p = newPlayoutJob
p.ControlChannel = make(chan string)
}
2020-11-05 18:34:54 +01:00
2020-12-29 17:34:34 +01:00
p.ID = job.ID
p.Source = job.Source
p.Version = job.Version
2020-12-29 22:31:47 +01:00
p.StartAt = job.StartAt.AsTime()
p.StopAt = job.StopAt.AsTime()
2020-12-29 17:34:34 +01:00
if p.StopAt.IsZero() {
p.StopAt = p.StartAt.Add(g.Store.DefaultDuration)
}
2020-12-29 13:01:30 +01:00
2020-12-29 17:34:34 +01:00
if playoutExists {
p.ControlChannel <- "reschedule"
output = p.Output
} else {
var err error
g.Store.Lock()
output, err = g.Store.AddPlayout(p)
g.Store.Unlock()
if err != nil {
return nil, fmt.Errorf("can not schedule playout: %s", err)
2020-11-05 18:34:54 +01:00
}
2020-12-29 17:34:34 +01:00
}
2020-11-05 18:34:54 +01:00
2020-12-29 17:34:34 +01:00
scheduledJob := &api.ScheduledJob{
ID: p.ID,
Port: output,
Version: p.Version,
}
2020-11-05 18:34:54 +01:00
2020-12-29 17:34:34 +01:00
if !playoutExists {
go func() {
log.Printf("Start Scheduling %v", p.ID)
Waiting:
for {
select {
case ctrlMsg := <-p.ControlChannel:
log.Printf("%d Control Message: %s", p.ID, ctrlMsg)
continue
case <-time.After(time.Until(p.StartAt)):
break Waiting
2020-12-28 22:20:35 +01:00
}
2020-12-29 17:34:34 +01:00
}
p.Playout(g.Store.Config)
g.Store.DeletePlayout(p.ID)
}()
2020-11-05 18:34:54 +01:00
}
2020-12-29 17:34:34 +01:00
return scheduledJob, nil
2020-11-05 18:34:54 +01:00
}
var cfg Config
func main() {
// TODO configure config file path via cmd parameter
if err := cleanenv.ReadConfig("config.yml", &cfg); err != nil {
log.Fatal("No configfile: ", err)
}
2020-12-28 22:20:35 +01:00
s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway, cfg.OutputFormat)
2020-11-05 18:34:54 +01:00
if err != nil {
log.Fatal("Failed to init Store: ", err.Error())
}
2020-12-29 17:34:34 +01:00
server := &server{
Store: s,
}
2020-11-05 18:34:54 +01:00
2020-12-28 22:20:35 +01:00
ln, err := net.Listen("tcp", cfg.Address)
if err != nil {
log.Fatal(err)
}
2020-12-29 17:34:34 +01:00
g := grpc.NewServer()
api.RegisterPlayoutServer(g, server)
log.Fatalf("failed to serve: %v", g.Serve(ln))
2020-11-05 18:34:54 +01:00
}