refactor: improve pipeline state management and logging for ndi device

This commit is contained in:
gari 2025-04-13 11:55:53 +02:00
parent 35a8e20244
commit 7147f3f13b
2 changed files with 77 additions and 11 deletions

View file

@ -427,19 +427,34 @@ func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) {
return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err)
}
if err := pipeline.SetState(gst.StatePlaying); err != nil {
return nil, fmt.Errorf("setting pipeline state: %w", err)
}
// Construct the struct before setting state, so we have it in case of errors during state change
// Store the outputID so we can potentially clean up the source proxy later if needed
previewPipeline := &PreviewPipeline{
outputID: outputID,
pipeline: pipeline,
webrtcbin: webRtcBin,
}
// Connect signals before setting state
log.Debug().Str("pipeline", pipeline.GetName()).Msg("Connecting signals")
if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil {
// Attempt cleanup if signal connection fails
pipeline.SetState(gst.StateNull)
// TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID
return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err)
}
// Now set the preview pipeline to playing
log.Info().Str("pipeline", pipeline.GetName()).Msg("Setting preview pipeline state to PLAYING")
if err := pipeline.SetState(gst.StatePlaying); err != nil {
// Attempt cleanup
pipeline.SetState(gst.StateNull)
// TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID
return nil, fmt.Errorf("setting preview pipeline state to playing: %w", err)
}
log.Info().Str("pipeline", pipeline.GetName()).Msg("Preview pipeline state set to PLAYING")
return previewPipeline, nil
}

View file

@ -145,17 +145,36 @@ func (d *Device) createPipeline() error {
}
func (d *Device) onNdiSrcDemuxerPadAdded(element *gst.Element, pad *gst.Pad) {
// This is called when ndisrcdemux adds a source pad (e.g., "video", "audio")
// We need to link this source pad to the sink pad of the corresponding tee.
log.Debug().Str("pad", pad.GetName()).Msg("NDI demuxer pad added, linking to tee sink")
var teeSinkPad *gst.Pad
var teeName string
switch pad.GetName() {
case "video":
if plr := d.videoTee.GetStaticPad("sink").Link(pad); plr != gst.PadLinkOK {
log.Error().Str("pad", pad.GetName()).Msgf("linking video pad to video tee: %s", plr)
}
teeSinkPad = d.videoTee.GetStaticPad("sink")
teeName = d.videoTee.GetName()
case "audio":
if plr := d.audioTee.GetStaticPad("sink").Link(pad); plr != gst.PadLinkOK {
log.Error().Str("pad", pad.GetName()).Msgf("linking audio pad to audio tee: %s", plr)
}
teeSinkPad = d.audioTee.GetStaticPad("sink")
teeName = d.audioTee.GetName()
default:
log.Warn().Str("pad", pad.GetName()).Msg("Ignoring unknown pad from ndisrcdemux")
return
}
if teeSinkPad == nil {
log.Error().Str("pad", pad.GetName()).Str("tee", teeName).Msg("Failed to get static sink pad from tee")
return
}
// Link the demuxer's source pad to the tee's sink pad
log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Attempting to link NDI demuxer pad to tee")
if plr := pad.Link(teeSinkPad); plr != gst.PadLinkOK {
log.Error().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msgf("Linking NDI demuxer pad to tee failed: %s", plr)
} else {
log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Successfully linked NDI demuxer pad to tee")
}
}
func (d *Device) GetOutput() (string, *gst.Element, *gst.Element, error) {
@ -224,9 +243,41 @@ func (d *Device) setPipelineState(state gst.State) error {
return nil
}
if err := d.pipeline.SetState(state); err != nil {
return fmt.Errorf("setting pipeline state: %w", err)
log.Info().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("Setting pipeline state")
stateChangeReturn := d.pipeline.SetState(state)
if stateChangeReturn == gst.StateChangeFailure {
log.Error().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("SetState returned failure")
return fmt.Errorf("setting pipeline state to %s failed", state.String())
}
if stateChangeReturn == gst.StateChangeNoPreroll {
log.Warn().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("SetState returned no-preroll")
// No-preroll might be acceptable depending on the pipeline, especially for live sources.
}
// Wait briefly for the state change to complete, especially important for PLAYING.
// Use a reasonable timeout (e.g., 2 seconds)
_, currentState, pendingState := d.pipeline.GetState(2 * gst.Second)
log.Debug().
Str("pipeline", d.pipeline.GetName()).
Str("target_state", state.String()).
Str("current_state", currentState.String()).
Str("pending_state", pendingState.String()).
Msg("Pipeline state after SetState attempt")
if currentState != state && pendingState != state {
// If neither current nor pending matches the target state after timeout, log a warning.
// This might indicate a problem, but we won't return an error yet,
// as some state transitions might take longer or settle asynchronously.
log.Warn().
Str("pipeline", d.pipeline.GetName()).
Str("target_state", state.String()).
Str("actual_state", currentState.String()).
Msg("Pipeline did not reach target state within timeout")
} else {
log.Info().Str("pipeline", d.pipeline.GetName()).Str("reached_state", state.String()).Msg("Pipeline state change successful or pending")
}
return nil
}