127 lines
2.9 KiB
Go
127 lines
2.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/Garionion/ffmpeg-playout/api"
|
|
"github.com/Garionion/ffmpeg-playout/playout"
|
|
"github.com/Garionion/ffmpeg-playout/store"
|
|
"github.com/ilyakaznacheev/cleanenv"
|
|
"google.golang.org/grpc"
|
|
"log"
|
|
"net"
|
|
"time"
|
|
)
|
|
|
|
type server struct {
|
|
api.UnimplementedPlayoutServer
|
|
*store.Store
|
|
}
|
|
|
|
type Config struct {
|
|
Outputs []string `yaml:"outputs"`
|
|
Address string `yaml:"address" env:"ADDRESS" env-default:":3000"`
|
|
DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"`
|
|
PlayoutScript string `yaml:"playoutscript"`
|
|
PlayoutScriptPath string `yaml:"playoutscriptpath" env:""`
|
|
ProgressDir string `yaml:"tmpdir"`
|
|
OutputFormat string `yaml:"outputFormat"`
|
|
PrometheusPushGateway string `yaml:"prometheusPushGateway"`
|
|
}
|
|
|
|
func (g *server) SchedulePlayout(ctx context.Context, job *api.Job) (*api.ScheduledJob, error) {
|
|
var p *playout.Job
|
|
|
|
if job.Source == "" {
|
|
return nil, errors.New("Got Empty Source. I can't play »Nothing«")
|
|
}
|
|
|
|
var output string
|
|
newPlayoutJob := new(playout.Job)
|
|
g.Store.RLock()
|
|
olPlayoutJob, playoutExists := g.Store.Playouts[job.ID]
|
|
g.Store.RUnlock()
|
|
if playoutExists {
|
|
p = olPlayoutJob
|
|
} else {
|
|
p = newPlayoutJob
|
|
p.ControlChannel = make(chan string)
|
|
}
|
|
|
|
p.ID = job.ID
|
|
p.Source = job.Source
|
|
p.Version = job.Version
|
|
p.StartAt = job.StartAt.AsTime()
|
|
p.StopAt = job.StopAt.AsTime()
|
|
if p.StopAt.IsZero() {
|
|
p.StopAt = p.StartAt.Add(g.Store.DefaultDuration)
|
|
}
|
|
|
|
if playoutExists {
|
|
p.ControlChannel <- "reschedule"
|
|
output = p.Output
|
|
} else {
|
|
var err error
|
|
|
|
g.Store.Lock()
|
|
output, err = g.Store.AddPlayout(p)
|
|
g.Store.Unlock()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can not schedule playout: %s", err)
|
|
}
|
|
}
|
|
|
|
scheduledJob := &api.ScheduledJob{
|
|
ID: p.ID,
|
|
Port: output,
|
|
Version: p.Version,
|
|
}
|
|
|
|
if !playoutExists {
|
|
go func() {
|
|
log.Printf("Start Scheduling %v", p.ID)
|
|
Waiting:
|
|
for {
|
|
select {
|
|
case ctrlMsg := <-p.ControlChannel:
|
|
log.Printf("%d Control Message: %s", p.ID, ctrlMsg)
|
|
continue
|
|
case <-time.After(time.Until(p.StartAt)):
|
|
break Waiting
|
|
}
|
|
}
|
|
p.Playout(g.Store.Config)
|
|
g.Store.DeletePlayout(p.ID)
|
|
}()
|
|
}
|
|
|
|
return scheduledJob, nil
|
|
}
|
|
|
|
var cfg Config
|
|
|
|
func main() {
|
|
// TODO configure config file path via cmd parameter
|
|
if err := cleanenv.ReadConfig("config.yml", &cfg); err != nil {
|
|
log.Fatal("No configfile: ", err)
|
|
}
|
|
|
|
s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway, cfg.OutputFormat)
|
|
if err != nil {
|
|
log.Fatal("Failed to init Store: ", err.Error())
|
|
}
|
|
server := &server{
|
|
Store: s,
|
|
}
|
|
|
|
ln, err := net.Listen("tcp", cfg.Address)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
g := grpc.NewServer()
|
|
api.RegisterPlayoutServer(g, server)
|
|
log.Fatalf("failed to serve: %v", g.Serve(ln))
|
|
}
|