package gstreamer import ( "context" "fmt" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/tinyzimmer/go-gst/gst" "sync" ) func (p *Pipeline) Create() (chan PipelineUpdates, error) { gstreamerLogger := log.With().Str("Component", "gstreamer").Str("Pipeline", p.Name).Logger() gstreamerLogger.Info().Msg("Creating pipeline") p.logger = gstreamerLogger pipeline, err := gst.NewPipeline(p.Name) if err != nil { return nil, err } p.pipeline = pipeline elements := map[string]element{} gstreamerLogger.Debug().Msg("Creating input") err = p.InputElement.create(pipeline, gstreamerLogger) if err != nil { return nil, err } for name, e := range p.InputElement.elements { elements[name] = e } lastElement := p.InputElement.getLastElement() for _, overlay := range p.Overlays { gstreamerLogger.Debug().Str("Overlay", overlay.getName()).Msg("Adding overlay") overlayElement, _ := overlay.create(gstreamerLogger) e := &element{ element: overlayElement, name: overlay.getName(), } err = pipeline.Add(overlayElement) if err != nil { log.Error().Msgf("could not add %s to pipeline: %v", overlay.getName(), err) return nil, err } err = lastElement.linkElementWrapper(e, gstreamerLogger) if err != nil { log.Error().Msgf("could not link %s to %s: %v", overlay.getName(), lastElement.name, err) return nil, err } lastElement = e elements[overlay.getName()] = *lastElement } gstreamerLogger.Debug().Msg("Creating output") err = p.OutputElement.create(pipeline, lastElement, gstreamerLogger) if err != nil { return nil, err } for name, e := range p.OutputElement.elements { elements[name] = e } p.elements = elements pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop p.logger.Debug().Msg("EOS received") pipeline.BlockSetState(gst.StateNull) case gst.MessageError: // Error messages are always fatal err := msg.ParseError() p.logger.Error().Msg(err.Error()) if debug := err.DebugString(); debug != "" { p.logger.Debug().Msg(debug) } pipeline.BlockSetState(gst.StateNull) case gst.MessageStreamStart: p.logger.Info().Msgf("STARTING Playout of %s", p.Name) clock := pipeline.GetPipelineClock() now := clock.GetTime() for _, overlay := range p.Overlays { if overlay.getBlendInTime() != -1 { clock.NewSingleShotID(now + overlay.getBlendInTime()).WaitAsync(overlay.show()) } if overlay.getBlendOutTime() != -1 { clock.NewSingleShotID(now + overlay.getBlendOutTime()).WaitAsync(overlay.hide()) } } } return true }) p.ctx, p.ctxCancel = context.WithCancel(gstreamer.ctx) updateChannel := make(chan PipelineUpdates) go p.overlayUpdater(updateChannel, gstreamerLogger) p.PipelineUpdates = updateChannel gstreamer.AddPipeline(p) return updateChannel, nil } //TODO check if output is decklink func (p *Pipeline) Start() error { p.Lock() defer p.Unlock() //output, err := gstreamer.getDecklinkOutput(p.Name) //setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) err := p.pipeline.SetState(gst.StatePlaying) if err != nil { p.logger.Error().Msgf("Failed to set pipeline %s to playing: %v", p.Name, err) return fmt.Errorf("Failed to set pipeline %s to playing: %v", p.Name, err) } return nil } func (p *Pipeline) Stop() error { p.Lock() defer p.Unlock() err := p.pipeline.SetState(gst.StateNull) if err != nil { p.logger.Error().Msgf("Failed to set pipeline %s to null: %v", p.Name, err) return fmt.Errorf("Failed to set pipeline %s to null: %v", p.Name, err) } return nil } func (p *Pipeline) resizeAllTextOverlays() { for _, overlay := range p.Overlays { if x, ok := overlay.(*TextOverlay); ok { go x.resizeTextOverlay(p.logger) } } } func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolog.Logger) { for { select { case <-p.ctx.Done(): return case updates := <-updateChannel: log.Info().Msgf("Updating overlays with %d text overlays", len(updates.Overlays)) var wg sync.WaitGroup p.Lock() for _, overlay := range updates.Overlays { wg.Add(1) go func(overlay Overlay) { defer wg.Done() o, ok := p.Overlays[overlay.getName()] if !ok { log.Error().Msgf("Could not find overlay %s", overlay.getName()) return } overlay.setElement(o.getElement()) overlay.update(log) p.Overlays[overlay.getName()] = overlay }(overlay) } wg.Wait() p.Unlock() p.notifySubscribers() log.Debug().Msg("Overlays updated") } } } func (p *Pipeline) SubscribeToUpdates(updateFunc NotifyOnPipelineUpdate) { p.Lock() p.notifyOnUpdate = append(p.notifyOnUpdate, updateFunc) p.Unlock() } func (p *Pipeline) notifySubscribers() { p.RLock() for _, updateFunc := range p.notifyOnUpdate { go updateFunc(p) } p.RUnlock() }