package gstreamer import ( "context" "fmt" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/tinyzimmer/go-gst/gst" "sync" ) func (p *Pipeline) Create() (chan PipelineUpdates, error) { gstreamerLogger := log.With().Str("Component", "gstreamer").Str("Pipeline", p.Name).Logger() p.logger = gstreamerLogger pipeline, err := gst.NewPipeline(p.Name) if err != nil { return nil, err } elements := map[string]element{} err = p.InputElement.create(pipeline, gstreamerLogger) if err != nil { return nil, err } for name, e := range p.InputElement.elements { elements[name] = e } lastElement := p.InputElement.getLastElement() for _, overlay := range p.Overlays { gstreamerLogger.Debug().Str("Overlay", overlay.getName()).Msg("Adding overlay") overlayElement, _ := overlay.create(gstreamerLogger) e := &element{ element: overlayElement, name: overlay.getName(), } lastElement.linkElementWrapper(e, gstreamerLogger) lastElement = e elements[overlay.getName()] = *lastElement } err = p.OutputElement.create(pipeline, lastElement, gstreamerLogger) if err != nil { return nil, err } for name, e := range p.OutputElement.elements { elements[name] = e } pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop pipeline.BlockSetState(gst.StateNull) case gst.MessageError: // Error messages are always fatal err := msg.ParseError() gstreamerLogger.Error().Msg(err.Error()) if debug := err.DebugString(); debug != "" { gstreamerLogger.Debug().Msg(debug) } case gst.MessageStreamStart: gstreamerLogger.Info().Msgf("STARTING Playout of %s", p.Name) clock := pipeline.GetPipelineClock() now := clock.GetTime() for _, overlay := range p.Overlays { if overlay.getBlendInTime() != -1 { clock.NewSingleShotID(now + overlay.getBlendInTime()).WaitAsync(overlay.show()) } if overlay.getBlendOutTime() != -1 { clock.NewSingleShotID(now + overlay.getBlendOutTime()).WaitAsync(overlay.hide()) } } case gst.MessageStateChanged: _, newState := msg.ParseStateChanged() if newState == gst.StatePlaying && msg.Source() == p.Name { gstreamerLogger.Debug().Msgf("Pipeline %s started", p.Name) p.resizeAllTextOverlays(gstreamerLogger) } default: // All messages implement a Stringer. However, this is // typically an expensive thing to do and should be avoided. gstreamerLogger.Debug().Msgf(msg.String()) } return true }) p.ctx, p.ctxCancel = context.WithCancel(gstreamer.ctx) updateChannel := make(chan PipelineUpdates) go p.overlayUpdater(updateChannel, gstreamerLogger) p.PipelineUpdates = updateChannel gstreamer.Lock() gstreamer.Pipelines[p.ID] = p gstreamer.Unlock() return updateChannel, nil } //TODO check if output is decklink func (p *Pipeline) Start() error { output, err := gstreamer.getDecklinkOutput(p.Name) setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) err = p.pipeline.SetState(gst.StatePlaying) if err != nil { p.logger.Error().Msgf("Failed to set pipeline %s to playing: %v", p.Name, err) return fmt.Errorf("Failed to set pipeline %s to playing: %v", p.Name, err) } return nil } func (p *Pipeline) Stop() error { err := p.pipeline.SetState(gst.StateNull) if err != nil { p.logger.Error().Msgf("Failed to set pipeline %s to null: %v", p.Name, err) return fmt.Errorf("Failed to set pipeline %s to null: %v", p.Name, err) } return nil } func (p *Pipeline) resizeAllTextOverlays(log zerolog.Logger) { for _, overlay := range p.Overlays { if x, ok := overlay.(*TextOverlay); ok { go x.resizeTextOverlay(log) } } } func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolog.Logger) { for { select { case <-p.ctx.Done(): return case updates := <-updateChannel: log.Info().Msgf("Updating overlays with %d text overlays", len(updates.Overlays)) var wg sync.WaitGroup for _, overlay := range updates.Overlays { wg.Add(1) go func(overlay Overlay) { defer wg.Done() o, ok := p.Overlays[overlay.getName()] if !ok { log.Error().Msgf("Could not find overlay %s", overlay.getName()) return } overlay.setElement(o.getElement()) overlay.update(log) }(overlay) } wg.Wait() log.Debug().Msg("Overlays updated") //TODO channel done } } }