157 lines
4.4 KiB
Go
157 lines
4.4 KiB
Go
package gstreamer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/tinyzimmer/go-gst/gst"
|
|
"sync"
|
|
)
|
|
|
|
func (p *Pipeline) Create() (chan PipelineUpdates, error) {
|
|
gstreamerLogger := log.With().Str("Component", "gstreamer").Str("Pipeline", p.Name).Logger()
|
|
p.logger = gstreamerLogger
|
|
pipeline, err := gst.NewPipeline(p.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
elements := map[string]element{}
|
|
|
|
err = p.InputElement.create(pipeline, gstreamerLogger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for name, e := range p.InputElement.elements {
|
|
elements[name] = e
|
|
}
|
|
lastElement := p.InputElement.getLastElement()
|
|
|
|
for _, overlay := range p.Overlays {
|
|
gstreamerLogger.Debug().Str("Overlay", overlay.getName()).Msg("Adding overlay")
|
|
overlayElement, _ := overlay.create(gstreamerLogger)
|
|
e := &element{
|
|
element: overlayElement,
|
|
name: overlay.getName(),
|
|
}
|
|
lastElement.linkElementWrapper(e, gstreamerLogger)
|
|
lastElement = e
|
|
elements[overlay.getName()] = *lastElement
|
|
}
|
|
err = p.OutputElement.create(pipeline, lastElement, gstreamerLogger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for name, e := range p.OutputElement.elements {
|
|
elements[name] = e
|
|
}
|
|
|
|
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
|
|
switch msg.Type() {
|
|
case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
|
|
pipeline.BlockSetState(gst.StateNull)
|
|
case gst.MessageError: // Error messages are always fatal
|
|
err := msg.ParseError()
|
|
gstreamerLogger.Error().Msg(err.Error())
|
|
if debug := err.DebugString(); debug != "" {
|
|
gstreamerLogger.Debug().Msg(debug)
|
|
}
|
|
case gst.MessageStreamStart:
|
|
gstreamerLogger.Info().Msgf("STARTING Playout of %s", p.Name)
|
|
clock := pipeline.GetPipelineClock()
|
|
now := clock.GetTime()
|
|
for _, overlay := range p.Overlays {
|
|
if overlay.getBlendInTime() != -1 {
|
|
clock.NewSingleShotID(now + overlay.getBlendInTime()).WaitAsync(overlay.show())
|
|
}
|
|
if overlay.getBlendOutTime() != -1 {
|
|
clock.NewSingleShotID(now + overlay.getBlendOutTime()).WaitAsync(overlay.hide())
|
|
}
|
|
}
|
|
|
|
case gst.MessageStateChanged:
|
|
_, newState := msg.ParseStateChanged()
|
|
if newState == gst.StatePlaying && msg.Source() == p.Name {
|
|
gstreamerLogger.Debug().Msgf("Pipeline %s started", p.Name)
|
|
p.resizeAllTextOverlays(gstreamerLogger)
|
|
}
|
|
default:
|
|
// All messages implement a Stringer. However, this is
|
|
// typically an expensive thing to do and should be avoided.
|
|
gstreamerLogger.Debug().Msgf(msg.String())
|
|
}
|
|
return true
|
|
})
|
|
|
|
p.ctx, p.ctxCancel = context.WithCancel(gstreamer.ctx)
|
|
updateChannel := make(chan PipelineUpdates)
|
|
go p.overlayUpdater(updateChannel, gstreamerLogger)
|
|
p.PipelineUpdates = updateChannel
|
|
gstreamer.Lock()
|
|
gstreamer.Pipelines[p.ID] = p
|
|
gstreamer.Unlock()
|
|
|
|
return updateChannel, nil
|
|
}
|
|
|
|
//TODO check if output is decklink
|
|
func (p *Pipeline) Start() error {
|
|
output, err := gstreamer.getDecklinkOutput(p.Name)
|
|
setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output)
|
|
err = p.pipeline.SetState(gst.StatePlaying)
|
|
if err != nil {
|
|
p.logger.Error().Msgf("Failed to set pipeline %s to playing: %v", p.Name, err)
|
|
return fmt.Errorf("Failed to set pipeline %s to playing: %v", p.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Pipeline) Stop() error {
|
|
err := p.pipeline.SetState(gst.StateNull)
|
|
if err != nil {
|
|
p.logger.Error().Msgf("Failed to set pipeline %s to null: %v", p.Name, err)
|
|
return fmt.Errorf("Failed to set pipeline %s to null: %v", p.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Pipeline) resizeAllTextOverlays(log zerolog.Logger) {
|
|
for _, overlay := range p.Overlays {
|
|
if x, ok := overlay.(*TextOverlay); ok {
|
|
go x.resizeTextOverlay(log)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolog.Logger) {
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case updates := <-updateChannel:
|
|
log.Info().Msgf("Updating overlays with %d text overlays", len(updates.Overlays))
|
|
|
|
var wg sync.WaitGroup
|
|
for _, overlay := range updates.Overlays {
|
|
wg.Add(1)
|
|
go func(overlay Overlay) {
|
|
defer wg.Done()
|
|
o, ok := p.Overlays[overlay.getName()]
|
|
if !ok {
|
|
log.Error().Msgf("Could not find overlay %s", overlay.getName())
|
|
return
|
|
}
|
|
overlay.setElement(o.getElement())
|
|
overlay.update(log)
|
|
|
|
}(overlay)
|
|
|
|
}
|
|
wg.Wait()
|
|
log.Debug().Msg("Overlays updated")
|
|
//TODO channel done
|
|
}
|
|
}
|
|
|
|
}
|