package gstreamer import ( "fmt" "git.entr0py.de/garionion/catie/internal/config" "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) type OutputType string const ( OutputTypeNdi OutputType = "ndi" OutputTypeWayland OutputType = "waylandsink" ) type Gstreamer struct { logger zerolog.Logger debug bool pipelineCfg config.Pipeline outputCfg []config.Output pipeline *gst.Pipeline mainLoop *glib.MainLoop } func New(pipelineCfg config.Pipeline, outputCfg []config.Output, debug bool) (*Gstreamer, error) { logger := log.Logger.With().Str("module", "gstreamer").Logger() gst.Init(nil) mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) go mainLoop.Run() pipeline, err := gst.NewPipeline("graphix") if err != nil { return nil, fmt.Errorf("creating pipeline: %w", err) } g := &Gstreamer{ logger: logger, debug: debug, pipelineCfg: pipelineCfg, outputCfg: outputCfg, pipeline: pipeline, mainLoop: mainLoop, } go g.pipelineWatcher() logger.Debug().Msg("create initial video source") plainVideoSrc, err := gst.NewElementWithProperties("videotestsrc", map[string]interface{}{ "name": "initSrc", "pattern": 2, // black }) if err != nil { return nil, fmt.Errorf("creating initial videotestsrc: %w", err) } logger.Debug().Msg("adding initial video source to pipeline") if err := pipeline.Add(plainVideoSrc); err != nil { return nil, fmt.Errorf("adding initial video source to pipeline: %w", err) } logger.Debug().Msg("create compositor") compositor, err := gst.NewElementWithProperties("compositor", map[string]interface{}{ "name": "compositor", "ignore-inactive-pads": true, "background": 3, // transparent }) if err != nil { return nil, fmt.Errorf("creating compositor: %w", err) } logger.Debug().Msg("adding compositor to pipeline") if err := pipeline.Add(compositor); err != nil { return nil, fmt.Errorf("adding compositor to pipeline: %w", err) } logger.Debug().Msg("linking initial video source to compositor") caps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=BGRA,width=%d,height=%d", pipelineCfg.Width, pipelineCfg.Height)) if err := plainVideoSrc.LinkFiltered(compositor, caps); err != nil { return nil, fmt.Errorf("linking initial video source to compositor: %w", err) } logger.Debug().Msg("setting initial video source to transparent") pads, err := compositor.GetSinkPads() if err != nil { return nil, fmt.Errorf("getting sink pads from compositor: %w", err) } if len(pads) == 0 { return nil, fmt.Errorf("no compositor sink pads found") } if err := pads[0].SetProperty("alpha", 0.0); err != nil { return nil, fmt.Errorf("setting compositor sink pad alpha: %w", err) } logger.Debug().Msg("creating tee") tee, err := gst.NewElement("tee") if err != nil { return nil, fmt.Errorf("creating tee element: %w", err) } logger.Debug().Msg("adding tee to pipeline") if err := pipeline.Add(tee); err != nil { return nil, fmt.Errorf("adding tee to pipeline: %w", err) } if debug { logger.Debug().Msg("create timeoverlay") timeoverlay, err := gst.NewElement("timeoverlay") if err != nil { return nil, fmt.Errorf("creating timeoverlay element: %w", err) } logger.Debug().Msg("add timeoverlay to pipeline") if err := pipeline.Add(timeoverlay); err != nil { return nil, fmt.Errorf("adding timeoverlay to pipeline: %w", err) } logger.Debug().Msg("link timeoverlay to compositor") if err := compositor.Link(timeoverlay); err != nil { return nil, fmt.Errorf("linking compositor to timeoverlay: %w", err) } logger.Debug().Msg("link tee to timeoverlay") if err := timeoverlay.Link(tee); err != nil { return nil, fmt.Errorf("linking tee to timeoverlay: %w", err) } } else { logger.Debug().Msg("link tee to compositor") if err := compositor.Link(tee); err != nil { return nil, fmt.Errorf("linking compositor to tee: %w", err) } } logger.Debug().Msg("creating outputs") for _, output := range outputCfg { logger.Debug().Str("type", output.Type).Str("target", output.Target).Msg("creating output") switch output.Type { case string(OutputTypeNdi): logger.Debug().Msg("create ndisink") ndiSink, err := gst.NewElementWithProperties("ndisink", map[string]interface{}{ "ndi-name": output.Target, }) if err != nil { return nil, fmt.Errorf("creating ndisink %q element: %w", output.Target, err) } logger.Debug().Msg("adding ndisink to pipeline") if err := pipeline.Add(ndiSink); err != nil { return nil, fmt.Errorf("adding ndisink %q to pipeline: %w", output.Target, err) } logger.Debug().Msg("linking ndisink to tee") teePad := tee.GetRequestPad("src_%u") ndiPad := ndiSink.GetStaticPad("sink") if plr := teePad.Link(ndiPad); plr != gst.PadLinkOK { return nil, fmt.Errorf("linking ndisink %q to tee: PadLinkReturn: %v", output.Target, plr) } case string(OutputTypeWayland): logger.Debug().Msg("create waylandsink") waylandSink, err := gst.NewElement("waylandsink") if err != nil { return nil, fmt.Errorf("creating waylandsink %q element: %w", output.Target, err) } logger.Debug().Msg("adding waylandsink to pipeline") if err := pipeline.Add(waylandSink); err != nil { return nil, fmt.Errorf("adding waylandsink %q to pipeline: %w", output.Target, err) } logger.Debug().Msg("linking waylandsink to tee") teePad := tee.GetRequestPad("src_%u") waylandPad := waylandSink.GetStaticPad("sink") if plr := teePad.Link(waylandPad); plr != gst.PadLinkOK { return nil, fmt.Errorf("linking waylandsink to tee: PadLinkReturn: %v", plr) } } } if debug { logger.Debug().Msg("create png of pipeline") if err := gstreamerBinToPNG(pipeline.Bin, "pipeline.png"); err != nil { logger.Error().Err(err).Msg("creating png of pipeline") } } return g, nil } func (g *Gstreamer) Run() error { g.logger.Info().Msg("starting gstreamer") if err := g.pipeline.SetState(gst.StatePlaying); err != nil { return fmt.Errorf("error setting pipeline to playing: %w", err) } return nil } func (g *Gstreamer) pipelineWatcher() { g.pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop err := g.pipeline.BlockSetState(gst.StateNull) if err != nil { return false } g.mainLoop.Quit() case gst.MessageError: // Error messages are always fatal err := msg.ParseError() g.logger.Error().Err(err) if debug := err.DebugString(); debug != "" { g.logger.Debug().Msg(debug) } g.mainLoop.Quit() default: if g.debug { g.logger.Debug().Msg(msg.String()) } } return true }) } func (g *Gstreamer) setElementProperties(element *gst.Element, properties map[string]interface{}) error { for key, value := range properties { if err := g.setElementProperty(element, key, value); err != nil { return err } } return nil } func (g *Gstreamer) setElementProperty(element *gst.Element, key string, value interface{}) error { if err := element.SetProperty(key, value); err != nil { return fmt.Errorf("error setting property %q: %w", key, err) } return nil }