package main import ( "context" "errors" "fmt" "git.entr0py.de/garionion/ffmpeg-playout/api" "git.entr0py.de/garionion/ffmpeg-playout/playout" "git.entr0py.de/garionion/ffmpeg-playout/store" "github.com/ilyakaznacheev/cleanenv" "google.golang.org/grpc" "log" "net" "time" ) type server struct { api.UnimplementedPlayoutServer *store.Store } type Config struct { Outputs []string `yaml:"outputs"` Address string `yaml:"address" env:"ADDRESS" env-default:":3000"` DefaultDuration string `yaml:"default_duration" env:"DEFAULT_DURATION" env-default:"15"` PlayoutScript string `yaml:"playoutscript"` PlayoutScriptPath string `yaml:"playoutscriptpath" env:""` ProgressDir string `yaml:"tmpdir"` OutputFormat string `yaml:"outputFormat"` PrometheusPushGateway string `yaml:"prometheusPushGateway"` } func (g *server) SchedulePlayout(ctx context.Context, job *api.Job) (*api.ScheduledJob, error) { var p *playout.Job if job.Source == "" { return nil, errors.New("Got Empty Source. I can't play »Nothing«") } var output string newPlayoutJob := new(playout.Job) g.Store.RLock() olPlayoutJob, playoutExists := g.Store.Playouts[job.ID] g.Store.RUnlock() if playoutExists { p = olPlayoutJob } else { p = newPlayoutJob p.ControlChannel = make(chan string) } p.ID = job.ID p.Source = job.Source p.Version = job.Version p.StartAt = job.StartAt.AsTime() p.StopAt = job.StopAt.AsTime() if p.StopAt.IsZero() { p.StopAt = p.StartAt.Add(g.Store.DefaultDuration) } if playoutExists { p.ControlChannel <- "reschedule" output = p.Output } else { var err error g.Store.Lock() output, err = g.Store.AddPlayout(p) g.Store.Unlock() if err != nil { return nil, fmt.Errorf("can not schedule playout: %s", err) } } scheduledJob := &api.ScheduledJob{ ID: p.ID, Port: output, Version: p.Version, } if !playoutExists { go func() { log.Printf("Start Scheduling %v", p.ID) Waiting: for { select { case ctrlMsg := <-p.ControlChannel: log.Printf("%d Control Message: %s", p.ID, ctrlMsg) continue case <-time.After(time.Until(p.StartAt)): break Waiting } } p.Playout(g.Store.Config) g.Store.DeletePlayout(p.ID) }() } return scheduledJob, nil } var cfg Config func main() { // TODO configure config file path via cmd parameter if err := cleanenv.ReadConfig("config.yml", &cfg); err != nil { log.Fatal("No configfile: ", err) } s, err := store.NewStore(cfg.Outputs, cfg.DefaultDuration, cfg.PlayoutScriptPath, cfg.PlayoutScript, cfg.ProgressDir, cfg.PrometheusPushGateway, cfg.OutputFormat) if err != nil { log.Fatal("Failed to init Store: ", err.Error()) } server := &server{ Store: s, } ln, err := net.Listen("tcp", cfg.Address) if err != nil { log.Fatal(err) } g := grpc.NewServer() api.RegisterPlayoutServer(g, server) log.Fatalf("failed to serve: %v", g.Serve(ln)) }