diff --git a/gstreamer/gstreamer.go b/gstreamer/gstreamer.go index 4c16d00..9b633c4 100644 --- a/gstreamer/gstreamer.go +++ b/gstreamer/gstreamer.go @@ -1,6 +1,7 @@ package gstreamer import ( + "context" "errors" "github.com/tinyzimmer/go-glib/glib" "github.com/tinyzimmer/go-gst/gst" @@ -10,22 +11,37 @@ var gstreamer *Gstreamer func Init() *Gstreamer { gst.Init(nil) - glib.NewMainLoop(glib.MainContextDefault(), true) - gstreamer = &Gstreamer{} + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) + go mainLoop.Run() + decklink := &Decklink{ + Slots: map[string]string{"one": ""}, + } + gstreamer = &Gstreamer{ + ctx: context.Background(), + Pipelines: map[string]*Pipeline{}, + Decklink: decklink, + } return gstreamer } func (g *Gstreamer) DeletePipeline(id string) { g.Lock() - if pipeline, err := g.GetPipeline(id); err == nil { + defer g.Unlock() + if pipeline, err := g.getPipeline(id); err == nil { delete(g.Pipelines, id) pipeline.ctxCancel() } - g.Unlock() + return } func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) { g.RLock() + defer g.RUnlock() + return g.getPipeline(id) +} + +//only use this if ressource is locked +func (g *Gstreamer) getPipeline(id string) (*Pipeline, error) { if pipeline, ok := g.Pipelines[id]; ok { return pipeline, nil } diff --git a/gstreamer/input.go b/gstreamer/input.go index 95f6e11..21e719a 100644 --- a/gstreamer/input.go +++ b/gstreamer/input.go @@ -7,6 +7,7 @@ import ( ) func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error { + elements := map[string]element{} e, err := gst.NewElementWithName(i.Type, i.Name) if err != nil { @@ -26,23 +27,28 @@ func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error // ==================== //==================== - // DECODEBIN - e, err = gst.NewElementWithName("decodebin", fmt.Sprintf("%s-decodebin", i.Name)) + // RAWVIDEOPARSE + e, err = gst.NewElementWithName("rawvideoparse", fmt.Sprintf("%s-rawvideoparse", i.Name)) if err != nil { - log.Error().Msgf("could not create decodebin for %s: %v\n", i.Name, err) - return fmt.Errorf("could not create decodebin for %s", i.Name) + log.Error().Msgf("could not create rawvideoparse for %s: %v\n", i.Name, err) + return fmt.Errorf("could not create rawvideoparse for %s", i.Name) } + setPropertyWrapper(e, "format", "bgra") + setPropertyWrapper(e, "width", 1280) + setPropertyWrapper(e, "height", 720) + setPropertyWrapper(e, "framerate", "50") err = pipeline.Add(e) if err != nil { - log.Error().Msgf("could not add decodebin for %s to pipeline: %v\n", i.Name, err) - return fmt.Errorf("could not add decodebin for %s to pipeline", i.Name) + log.Error().Msgf("could not add rawvideoparse for %s to pipeline: %v\n", i.Name, err) + return fmt.Errorf("could not add rawvideoparse for %s to pipeline", i.Name) } - decodebin := element{element: e, name: fmt.Sprintf("%s-decodebin", i.Name), prev: i.Name, next: fmt.Sprintf("%s-videoconvert", i.Name)} - elements[fmt.Sprintf("%s-decodebin", i.Name)] = decodebin - err = input.linkElementWrapper(&decodebin, log) + rawvideoparse := element{element: e, name: fmt.Sprintf("%s-rawvideoparse", i.Name), prev: i.Name, next: fmt.Sprintf("%s-videoconvert", i.Name)} + elements[fmt.Sprintf("%s-rawvideoparse", i.Name)] = rawvideoparse + + err = input.linkElementWrapper(&rawvideoparse, log) if err != nil { - log.Error().Msgf("could not link %s to %s: %v\n", i.Name, fmt.Sprintf("%s-decodebin", i.Name), err) - return fmt.Errorf("could not link %s to %s", i.Name, fmt.Sprintf("%s-decodebin", i.Name)) + log.Error().Msgf("could not link %s to %s: %v", fmt.Sprintf("%s-rawvideoparse", i.Name), i.Name, err) + return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-rawvideoparse", i.Name), i.Name) } //==================== @@ -58,12 +64,12 @@ func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error log.Error().Msgf("could not add videoconvert for %s to pipeline: %v\n", i.Name, err) return fmt.Errorf("could not add videoconvert for %s to pipeline", i.Name) } - videoconvert := element{element: e, name: fmt.Sprintf("%s-videoconvert", i.Name), prev: fmt.Sprintf("%s-decodebin", i.Name), next: fmt.Sprintf("%s-videoscale", i.Name)} + videoconvert := element{element: e, name: fmt.Sprintf("%s-videoconvert", i.Name), prev: fmt.Sprintf("%s-rawvideoparse", i.Name), next: fmt.Sprintf("%s-videoscale", i.Name)} elements[fmt.Sprintf("%s-videoconvert", i.Name)] = videoconvert - err = input.linkElementWrapper(&videoconvert, log) + err = rawvideoparse.linkElementWrapper(&videoconvert, log) if err != nil { - log.Error().Msgf("could not link %s to %s: %v\n", i.Name, fmt.Sprintf("%s-videoconvert", i.Name), err) - return fmt.Errorf("could not link %s to %s", i.Name, fmt.Sprintf("%s-videoconvert", i.Name)) + log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoconvert", i.Name), fmt.Sprintf("%s-decodebin", i.Name), err) + return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoconvert", i.Name), fmt.Sprintf("%s-decodebin", i.Name)) } //===================== diff --git a/gstreamer/output.go b/gstreamer/output.go index ba3188c..b6a22d9 100644 --- a/gstreamer/output.go +++ b/gstreamer/output.go @@ -23,8 +23,8 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log elements[fmt.Sprintf("%s-videoconvert", o.Name)] = videoconvert err = lastElement.linkElementWrapper(&videoconvert, log) if err != nil { - log.Error().Msgf("could not link %s to %s: %v\n", o.Name, fmt.Sprintf("%s-videoconvert", o.Name), err) - return fmt.Errorf("could not link %s to %s", o.Name, fmt.Sprintf("%s-videoconvert", o.Name)) + log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoconvert", o.Name), lastElement.name, err) + return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoconvert", o.Name), lastElement.name) } e, err = gst.NewElementWithName("videoscale", fmt.Sprintf("%s-videoscale", o.Name)) @@ -41,8 +41,8 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log elements[fmt.Sprintf("%s-videoscale", o.Name)] = videoscale err = videoconvert.linkElementWrapper(&videoscale, log) if err != nil { - log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoconvert", o.Name), fmt.Sprintf("%s-videoscale", o.Name), err) - return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoconvert", o.Name), fmt.Sprintf("%s-videoscale", o.Name)) + log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videoconvert", o.Name), err) + return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videoconvert", o.Name)) } e, err = gst.NewElementWithName("videorate", fmt.Sprintf("%s-videorate", o.Name)) @@ -59,8 +59,8 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log elements[fmt.Sprintf("%s-videorate", o.Name)] = videorate err = videoscale.linkElementWrapper(&videorate, log) if err != nil { - log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videorate", o.Name), err) - return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videorate", o.Name)) + log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videorate", o.Name), fmt.Sprintf("%s-videoscale", o.Name), err) + return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videorate", o.Name), fmt.Sprintf("%s-videoscale", o.Name)) } e, err = gst.NewElementWithName(o.Type, o.Name) @@ -78,6 +78,11 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log } output := element{element: e, name: o.Name, prev: fmt.Sprintf("%s-videoscale", o.Name)} elements[o.Name] = output + err = videorate.linkElementWrapper(&output, log) + if err != nil { + log.Error().Msgf("could not link %s to %s: %v\n", o.Name, fmt.Sprintf("%s-videorate", o.Name), err) + return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videorate", o.Name)) + } o.elements = elements diff --git a/gstreamer/pipeline.go b/gstreamer/pipeline.go index b93e285..42dfedd 100644 --- a/gstreamer/pipeline.go +++ b/gstreamer/pipeline.go @@ -11,14 +11,17 @@ import ( func (p *Pipeline) Create() (chan PipelineUpdates, error) { gstreamerLogger := log.With().Str("Component", "gstreamer").Str("Pipeline", p.Name).Logger() + gstreamerLogger.Info().Msg("Creating pipeline") p.logger = gstreamerLogger pipeline, err := gst.NewPipeline(p.Name) if err != nil { return nil, err } + p.pipeline = pipeline elements := map[string]element{} + gstreamerLogger.Debug().Msg("Creating input") err = p.InputElement.create(pipeline, gstreamerLogger) if err != nil { return nil, err @@ -35,10 +38,20 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) { element: overlayElement, name: overlay.getName(), } - lastElement.linkElementWrapper(e, gstreamerLogger) + err = pipeline.Add(overlayElement) + if err != nil { + log.Error().Msgf("could not add %s to pipeline: %v", overlay.getName(), err) + return nil, err + } + err = lastElement.linkElementWrapper(e, gstreamerLogger) + if err != nil { + log.Error().Msgf("could not link %s to %s: %v", overlay.getName(), lastElement.name, err) + return nil, err + } lastElement = e elements[overlay.getName()] = *lastElement } + gstreamerLogger.Debug().Msg("Creating output") err = p.OutputElement.create(pipeline, lastElement, gstreamerLogger) if err != nil { return nil, err @@ -46,19 +59,22 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) { for name, e := range p.OutputElement.elements { elements[name] = e } + p.elements = elements pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop + p.logger.Debug().Msg("EOS received") pipeline.BlockSetState(gst.StateNull) case gst.MessageError: // Error messages are always fatal err := msg.ParseError() - gstreamerLogger.Error().Msg(err.Error()) + p.logger.Error().Msg(err.Error()) if debug := err.DebugString(); debug != "" { - gstreamerLogger.Debug().Msg(debug) + p.logger.Debug().Msg(debug) } + pipeline.BlockSetState(gst.StateNull) case gst.MessageStreamStart: - gstreamerLogger.Info().Msgf("STARTING Playout of %s", p.Name) + p.logger.Info().Msgf("STARTING Playout of %s", p.Name) clock := pipeline.GetPipelineClock() now := clock.GetTime() for _, overlay := range p.Overlays { @@ -69,17 +85,6 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) { clock.NewSingleShotID(now + overlay.getBlendOutTime()).WaitAsync(overlay.hide()) } } - - case gst.MessageStateChanged: - _, newState := msg.ParseStateChanged() - if newState == gst.StatePlaying && msg.Source() == p.Name { - gstreamerLogger.Debug().Msgf("Pipeline %s started", p.Name) - p.resizeAllTextOverlays(gstreamerLogger) - } - default: - // All messages implement a Stringer. However, this is - // typically an expensive thing to do and should be avoided. - gstreamerLogger.Debug().Msgf(msg.String()) } return true }) @@ -97,9 +102,9 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) { //TODO check if output is decklink func (p *Pipeline) Start() error { - output, err := gstreamer.getDecklinkOutput(p.Name) - setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) - err = p.pipeline.SetState(gst.StatePlaying) + //output, err := gstreamer.getDecklinkOutput(p.Name) + //setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) + err := p.pipeline.SetState(gst.StatePlaying) if err != nil { p.logger.Error().Msgf("Failed to set pipeline %s to playing: %v", p.Name, err) return fmt.Errorf("Failed to set pipeline %s to playing: %v", p.Name, err) @@ -116,10 +121,10 @@ func (p *Pipeline) Stop() error { return nil } -func (p *Pipeline) resizeAllTextOverlays(log zerolog.Logger) { +func (p *Pipeline) resizeAllTextOverlays() { for _, overlay := range p.Overlays { if x, ok := overlay.(*TextOverlay); ok { - go x.resizeTextOverlay(log) + go x.resizeTextOverlay(p.logger) } } } diff --git a/gstreamer/types.go b/gstreamer/types.go index 4b56405..d4ca7c2 100644 --- a/gstreamer/types.go +++ b/gstreamer/types.go @@ -12,6 +12,7 @@ type Gstreamer struct { Pipelines map[string]*Pipeline *Decklink ctx context.Context + sync.RWMutex } type Pipeline struct { diff --git a/gstreamer/utils.go b/gstreamer/utils.go index 224f77a..2cc9344 100644 --- a/gstreamer/utils.go +++ b/gstreamer/utils.go @@ -32,14 +32,16 @@ func (element1 *element) linkElementWrapper(element2 *element, log zerolog.Logge } } if len(caps) > 0 { + log.Debug().Msgf("Linking %s to %s with caps %s", element1.name, element2.name, caps) capabilities := gst.NewCapsFromString(caps) err = element1.element.LinkFiltered(element2.element, capabilities) } else { err = element1.element.Link(element2.element) + } if err != nil { - log.Error().Msgf("could not link %s to %s: %v\n", element2.name, element1.name, err) - return fmt.Errorf("could not link %s to %s", element2.name, element1.name) + log.Error().Msgf("could not link %s to %s: %v", element1.name, element2.name, err) + return err } element1.next, element2.prev = element2.name, element1.name