add logging, some bugfixing

This commit is contained in:
Garionion 2022-02-07 11:20:50 +01:00
parent 239869bfd7
commit bcdf42b277
6 changed files with 82 additions and 47 deletions

View file

@ -1,6 +1,7 @@
package gstreamer package gstreamer
import ( import (
"context"
"errors" "errors"
"github.com/tinyzimmer/go-glib/glib" "github.com/tinyzimmer/go-glib/glib"
"github.com/tinyzimmer/go-gst/gst" "github.com/tinyzimmer/go-gst/gst"
@ -10,22 +11,37 @@ var gstreamer *Gstreamer
func Init() *Gstreamer { func Init() *Gstreamer {
gst.Init(nil) gst.Init(nil)
glib.NewMainLoop(glib.MainContextDefault(), true) mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false)
gstreamer = &Gstreamer{} go mainLoop.Run()
decklink := &Decklink{
Slots: map[string]string{"one": ""},
}
gstreamer = &Gstreamer{
ctx: context.Background(),
Pipelines: map[string]*Pipeline{},
Decklink: decklink,
}
return gstreamer return gstreamer
} }
func (g *Gstreamer) DeletePipeline(id string) { func (g *Gstreamer) DeletePipeline(id string) {
g.Lock() g.Lock()
if pipeline, err := g.GetPipeline(id); err == nil { defer g.Unlock()
if pipeline, err := g.getPipeline(id); err == nil {
delete(g.Pipelines, id) delete(g.Pipelines, id)
pipeline.ctxCancel() pipeline.ctxCancel()
} }
g.Unlock() return
} }
func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) { func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) {
g.RLock() g.RLock()
defer g.RUnlock()
return g.getPipeline(id)
}
//only use this if ressource is locked
func (g *Gstreamer) getPipeline(id string) (*Pipeline, error) {
if pipeline, ok := g.Pipelines[id]; ok { if pipeline, ok := g.Pipelines[id]; ok {
return pipeline, nil return pipeline, nil
} }

View file

@ -7,6 +7,7 @@ import (
) )
func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error { func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error {
elements := map[string]element{} elements := map[string]element{}
e, err := gst.NewElementWithName(i.Type, i.Name) e, err := gst.NewElementWithName(i.Type, i.Name)
if err != nil { if err != nil {
@ -26,23 +27,28 @@ func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error
// ==================== // ====================
//==================== //====================
// DECODEBIN // RAWVIDEOPARSE
e, err = gst.NewElementWithName("decodebin", fmt.Sprintf("%s-decodebin", i.Name)) e, err = gst.NewElementWithName("rawvideoparse", fmt.Sprintf("%s-rawvideoparse", i.Name))
if err != nil { if err != nil {
log.Error().Msgf("could not create decodebin for %s: %v\n", i.Name, err) log.Error().Msgf("could not create rawvideoparse for %s: %v\n", i.Name, err)
return fmt.Errorf("could not create decodebin for %s", i.Name) return fmt.Errorf("could not create rawvideoparse for %s", i.Name)
} }
setPropertyWrapper(e, "format", "bgra")
setPropertyWrapper(e, "width", 1280)
setPropertyWrapper(e, "height", 720)
setPropertyWrapper(e, "framerate", "50")
err = pipeline.Add(e) err = pipeline.Add(e)
if err != nil { if err != nil {
log.Error().Msgf("could not add decodebin for %s to pipeline: %v\n", i.Name, err) log.Error().Msgf("could not add rawvideoparse for %s to pipeline: %v\n", i.Name, err)
return fmt.Errorf("could not add decodebin for %s to pipeline", i.Name) return fmt.Errorf("could not add rawvideoparse for %s to pipeline", i.Name)
} }
decodebin := element{element: e, name: fmt.Sprintf("%s-decodebin", i.Name), prev: i.Name, next: fmt.Sprintf("%s-videoconvert", i.Name)} rawvideoparse := element{element: e, name: fmt.Sprintf("%s-rawvideoparse", i.Name), prev: i.Name, next: fmt.Sprintf("%s-videoconvert", i.Name)}
elements[fmt.Sprintf("%s-decodebin", i.Name)] = decodebin elements[fmt.Sprintf("%s-rawvideoparse", i.Name)] = rawvideoparse
err = input.linkElementWrapper(&decodebin, log)
err = input.linkElementWrapper(&rawvideoparse, log)
if err != nil { if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", i.Name, fmt.Sprintf("%s-decodebin", i.Name), err) log.Error().Msgf("could not link %s to %s: %v", fmt.Sprintf("%s-rawvideoparse", i.Name), i.Name, err)
return fmt.Errorf("could not link %s to %s", i.Name, fmt.Sprintf("%s-decodebin", i.Name)) return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-rawvideoparse", i.Name), i.Name)
} }
//==================== //====================
@ -58,12 +64,12 @@ func (i *InputElement) create(pipeline *gst.Pipeline, log zerolog.Logger) error
log.Error().Msgf("could not add videoconvert for %s to pipeline: %v\n", i.Name, err) log.Error().Msgf("could not add videoconvert for %s to pipeline: %v\n", i.Name, err)
return fmt.Errorf("could not add videoconvert for %s to pipeline", i.Name) return fmt.Errorf("could not add videoconvert for %s to pipeline", i.Name)
} }
videoconvert := element{element: e, name: fmt.Sprintf("%s-videoconvert", i.Name), prev: fmt.Sprintf("%s-decodebin", i.Name), next: fmt.Sprintf("%s-videoscale", i.Name)} videoconvert := element{element: e, name: fmt.Sprintf("%s-videoconvert", i.Name), prev: fmt.Sprintf("%s-rawvideoparse", i.Name), next: fmt.Sprintf("%s-videoscale", i.Name)}
elements[fmt.Sprintf("%s-videoconvert", i.Name)] = videoconvert elements[fmt.Sprintf("%s-videoconvert", i.Name)] = videoconvert
err = input.linkElementWrapper(&videoconvert, log) err = rawvideoparse.linkElementWrapper(&videoconvert, log)
if err != nil { if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", i.Name, fmt.Sprintf("%s-videoconvert", i.Name), err) log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoconvert", i.Name), fmt.Sprintf("%s-decodebin", i.Name), err)
return fmt.Errorf("could not link %s to %s", i.Name, fmt.Sprintf("%s-videoconvert", i.Name)) return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoconvert", i.Name), fmt.Sprintf("%s-decodebin", i.Name))
} }
//===================== //=====================

View file

@ -23,8 +23,8 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log
elements[fmt.Sprintf("%s-videoconvert", o.Name)] = videoconvert elements[fmt.Sprintf("%s-videoconvert", o.Name)] = videoconvert
err = lastElement.linkElementWrapper(&videoconvert, log) err = lastElement.linkElementWrapper(&videoconvert, log)
if err != nil { if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", o.Name, fmt.Sprintf("%s-videoconvert", o.Name), err) log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoconvert", o.Name), lastElement.name, err)
return fmt.Errorf("could not link %s to %s", o.Name, fmt.Sprintf("%s-videoconvert", o.Name)) return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoconvert", o.Name), lastElement.name)
} }
e, err = gst.NewElementWithName("videoscale", fmt.Sprintf("%s-videoscale", o.Name)) e, err = gst.NewElementWithName("videoscale", fmt.Sprintf("%s-videoscale", o.Name))
@ -41,8 +41,8 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log
elements[fmt.Sprintf("%s-videoscale", o.Name)] = videoscale elements[fmt.Sprintf("%s-videoscale", o.Name)] = videoscale
err = videoconvert.linkElementWrapper(&videoscale, log) err = videoconvert.linkElementWrapper(&videoscale, log)
if err != nil { if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoconvert", o.Name), fmt.Sprintf("%s-videoscale", o.Name), err) log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videoconvert", o.Name), err)
return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoconvert", o.Name), fmt.Sprintf("%s-videoscale", o.Name)) return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videoconvert", o.Name))
} }
e, err = gst.NewElementWithName("videorate", fmt.Sprintf("%s-videorate", o.Name)) e, err = gst.NewElementWithName("videorate", fmt.Sprintf("%s-videorate", o.Name))
@ -59,8 +59,8 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log
elements[fmt.Sprintf("%s-videorate", o.Name)] = videorate elements[fmt.Sprintf("%s-videorate", o.Name)] = videorate
err = videoscale.linkElementWrapper(&videorate, log) err = videoscale.linkElementWrapper(&videorate, log)
if err != nil { if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videorate", o.Name), err) log.Error().Msgf("could not link %s to %s: %v\n", fmt.Sprintf("%s-videorate", o.Name), fmt.Sprintf("%s-videoscale", o.Name), err)
return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videorate", o.Name)) return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videorate", o.Name), fmt.Sprintf("%s-videoscale", o.Name))
} }
e, err = gst.NewElementWithName(o.Type, o.Name) e, err = gst.NewElementWithName(o.Type, o.Name)
@ -78,6 +78,11 @@ func (o *OutputElement) create(pipeline *gst.Pipeline, lastElement *element, log
} }
output := element{element: e, name: o.Name, prev: fmt.Sprintf("%s-videoscale", o.Name)} output := element{element: e, name: o.Name, prev: fmt.Sprintf("%s-videoscale", o.Name)}
elements[o.Name] = output elements[o.Name] = output
err = videorate.linkElementWrapper(&output, log)
if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", o.Name, fmt.Sprintf("%s-videorate", o.Name), err)
return fmt.Errorf("could not link %s to %s", fmt.Sprintf("%s-videoscale", o.Name), fmt.Sprintf("%s-videorate", o.Name))
}
o.elements = elements o.elements = elements

View file

@ -11,14 +11,17 @@ import (
func (p *Pipeline) Create() (chan PipelineUpdates, error) { func (p *Pipeline) Create() (chan PipelineUpdates, error) {
gstreamerLogger := log.With().Str("Component", "gstreamer").Str("Pipeline", p.Name).Logger() gstreamerLogger := log.With().Str("Component", "gstreamer").Str("Pipeline", p.Name).Logger()
gstreamerLogger.Info().Msg("Creating pipeline")
p.logger = gstreamerLogger p.logger = gstreamerLogger
pipeline, err := gst.NewPipeline(p.Name) pipeline, err := gst.NewPipeline(p.Name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.pipeline = pipeline
elements := map[string]element{} elements := map[string]element{}
gstreamerLogger.Debug().Msg("Creating input")
err = p.InputElement.create(pipeline, gstreamerLogger) err = p.InputElement.create(pipeline, gstreamerLogger)
if err != nil { if err != nil {
return nil, err return nil, err
@ -35,10 +38,20 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) {
element: overlayElement, element: overlayElement,
name: overlay.getName(), name: overlay.getName(),
} }
lastElement.linkElementWrapper(e, gstreamerLogger) err = pipeline.Add(overlayElement)
if err != nil {
log.Error().Msgf("could not add %s to pipeline: %v", overlay.getName(), err)
return nil, err
}
err = lastElement.linkElementWrapper(e, gstreamerLogger)
if err != nil {
log.Error().Msgf("could not link %s to %s: %v", overlay.getName(), lastElement.name, err)
return nil, err
}
lastElement = e lastElement = e
elements[overlay.getName()] = *lastElement elements[overlay.getName()] = *lastElement
} }
gstreamerLogger.Debug().Msg("Creating output")
err = p.OutputElement.create(pipeline, lastElement, gstreamerLogger) err = p.OutputElement.create(pipeline, lastElement, gstreamerLogger)
if err != nil { if err != nil {
return nil, err return nil, err
@ -46,19 +59,22 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) {
for name, e := range p.OutputElement.elements { for name, e := range p.OutputElement.elements {
elements[name] = e elements[name] = e
} }
p.elements = elements
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() { switch msg.Type() {
case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
p.logger.Debug().Msg("EOS received")
pipeline.BlockSetState(gst.StateNull) pipeline.BlockSetState(gst.StateNull)
case gst.MessageError: // Error messages are always fatal case gst.MessageError: // Error messages are always fatal
err := msg.ParseError() err := msg.ParseError()
gstreamerLogger.Error().Msg(err.Error()) p.logger.Error().Msg(err.Error())
if debug := err.DebugString(); debug != "" { if debug := err.DebugString(); debug != "" {
gstreamerLogger.Debug().Msg(debug) p.logger.Debug().Msg(debug)
} }
pipeline.BlockSetState(gst.StateNull)
case gst.MessageStreamStart: case gst.MessageStreamStart:
gstreamerLogger.Info().Msgf("STARTING Playout of %s", p.Name) p.logger.Info().Msgf("STARTING Playout of %s", p.Name)
clock := pipeline.GetPipelineClock() clock := pipeline.GetPipelineClock()
now := clock.GetTime() now := clock.GetTime()
for _, overlay := range p.Overlays { for _, overlay := range p.Overlays {
@ -69,17 +85,6 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) {
clock.NewSingleShotID(now + overlay.getBlendOutTime()).WaitAsync(overlay.hide()) clock.NewSingleShotID(now + overlay.getBlendOutTime()).WaitAsync(overlay.hide())
} }
} }
case gst.MessageStateChanged:
_, newState := msg.ParseStateChanged()
if newState == gst.StatePlaying && msg.Source() == p.Name {
gstreamerLogger.Debug().Msgf("Pipeline %s started", p.Name)
p.resizeAllTextOverlays(gstreamerLogger)
}
default:
// All messages implement a Stringer. However, this is
// typically an expensive thing to do and should be avoided.
gstreamerLogger.Debug().Msgf(msg.String())
} }
return true return true
}) })
@ -97,9 +102,9 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) {
//TODO check if output is decklink //TODO check if output is decklink
func (p *Pipeline) Start() error { func (p *Pipeline) Start() error {
output, err := gstreamer.getDecklinkOutput(p.Name) //output, err := gstreamer.getDecklinkOutput(p.Name)
setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) //setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output)
err = p.pipeline.SetState(gst.StatePlaying) err := p.pipeline.SetState(gst.StatePlaying)
if err != nil { if err != nil {
p.logger.Error().Msgf("Failed to set pipeline %s to playing: %v", p.Name, err) p.logger.Error().Msgf("Failed to set pipeline %s to playing: %v", p.Name, err)
return fmt.Errorf("Failed to set pipeline %s to playing: %v", p.Name, err) return fmt.Errorf("Failed to set pipeline %s to playing: %v", p.Name, err)
@ -116,10 +121,10 @@ func (p *Pipeline) Stop() error {
return nil return nil
} }
func (p *Pipeline) resizeAllTextOverlays(log zerolog.Logger) { func (p *Pipeline) resizeAllTextOverlays() {
for _, overlay := range p.Overlays { for _, overlay := range p.Overlays {
if x, ok := overlay.(*TextOverlay); ok { if x, ok := overlay.(*TextOverlay); ok {
go x.resizeTextOverlay(log) go x.resizeTextOverlay(p.logger)
} }
} }
} }

View file

@ -12,6 +12,7 @@ type Gstreamer struct {
Pipelines map[string]*Pipeline Pipelines map[string]*Pipeline
*Decklink *Decklink
ctx context.Context ctx context.Context
sync.RWMutex
} }
type Pipeline struct { type Pipeline struct {

View file

@ -32,14 +32,16 @@ func (element1 *element) linkElementWrapper(element2 *element, log zerolog.Logge
} }
} }
if len(caps) > 0 { if len(caps) > 0 {
log.Debug().Msgf("Linking %s to %s with caps %s", element1.name, element2.name, caps)
capabilities := gst.NewCapsFromString(caps) capabilities := gst.NewCapsFromString(caps)
err = element1.element.LinkFiltered(element2.element, capabilities) err = element1.element.LinkFiltered(element2.element, capabilities)
} else { } else {
err = element1.element.Link(element2.element) err = element1.element.Link(element2.element)
} }
if err != nil { if err != nil {
log.Error().Msgf("could not link %s to %s: %v\n", element2.name, element1.name, err) log.Error().Msgf("could not link %s to %s: %v", element1.name, element2.name, err)
return fmt.Errorf("could not link %s to %s", element2.name, element1.name) return err
} }
element1.next, element2.prev = element2.name, element1.name element1.next, element2.prev = element2.name, element1.name