ffmpeg-playout/main.go

127 lines
2.9 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"git.entr0py.de/garionion/ffmpeg-playout/api"
"git.entr0py.de/garionion/ffmpeg-playout/playout"
"git.entr0py.de/garionion/ffmpeg-playout/store"
"github.com/ilyakaznacheev/cleanenv"
"google.golang.org/grpc"
"log"
"net"
"time"
)
type server struct {
api.UnimplementedPlayoutServer
*store.Store
}
type Config struct {
Outputs []string `yaml:"outputs"`
Address string `yaml:"address" env:"ADDRESS" env-default:":3000"`
DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"`
PlayoutScript string `yaml:"playoutscript"`
PlayoutScriptPath string `yaml:"playoutscriptpath" env:""`
ProgressDir string `yaml:"tmpdir"`
OutputFormat string `yaml:"outputFormat"`
PrometheusPushGateway string `yaml:"prometheusPushGateway"`
}
func (g *server) SchedulePlayout(ctx context.Context, job *api.Job) (*api.ScheduledJob, error) {
var p *playout.Job
if job.Source == "" {
return nil, errors.New("Got Empty Source. I can't play »Nothing«")
}
var output string
newPlayoutJob := new(playout.Job)
g.Store.RLock()
olPlayoutJob, playoutExists := g.Store.Playouts[job.ID]
g.Store.RUnlock()
if playoutExists {
p = olPlayoutJob
} else {
p = newPlayoutJob
p.ControlChannel = make(chan string)
}
p.ID = job.ID
p.Source = job.Source
p.Version = job.Version
p.StartAt = job.StartAt.AsTime()
p.StopAt = job.StopAt.AsTime()
if p.StopAt.IsZero() {
p.StopAt = p.StartAt.Add(g.Store.DefaultDuration)
}
if playoutExists {
p.ControlChannel <- "reschedule"
output = p.Output
} else {
var err error
g.Store.Lock()
output, err = g.Store.AddPlayout(p)
g.Store.Unlock()
if err != nil {
return nil, fmt.Errorf("can not schedule playout: %s", err)
}
}
scheduledJob := &api.ScheduledJob{
ID: p.ID,
Port: output,
Version: p.Version,
}
if !playoutExists {
go func() {
log.Printf("Start Scheduling %v", p.ID)
Waiting:
for {
select {
case ctrlMsg := <-p.ControlChannel:
log.Printf("%d Control Message: %s", p.ID, ctrlMsg)
continue
case <-time.After(time.Until(p.StartAt)):
break Waiting
}
}
p.Playout(g.Store.Config)
g.Store.DeletePlayout(p.ID)
}()
}
return scheduledJob, nil
}
var cfg Config
func main() {
// TODO configure config file path via cmd parameter
if err := cleanenv.ReadConfig("config.yml", &cfg); err != nil {
log.Fatal("No configfile: ", err)
}
s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway, cfg.OutputFormat)
if err != nil {
log.Fatal("Failed to init Store: ", err.Error())
}
server := &server{
Store: s,
}
ln, err := net.Listen("tcp", cfg.Address)
if err != nil {
log.Fatal(err)
}
g := grpc.NewServer()
api.RegisterPlayoutServer(g, server)
log.Fatalf("failed to serve: %v", g.Serve(ln))
}