From 7147f3f13b92e4d31706c39f97f274f78568301a Mon Sep 17 00:00:00 2001 From: gari Date: Sun, 13 Apr 2025 11:55:53 +0200 Subject: [PATCH] refactor: improve pipeline state management and logging for ndi device --- internal/api/preview.go | 21 +++++++++++-- internal/ndi/device.go | 67 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/internal/api/preview.go b/internal/api/preview.go index e747300..1d26aa0 100644 --- a/internal/api/preview.go +++ b/internal/api/preview.go @@ -427,19 +427,34 @@ func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) { return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err) } - if err := pipeline.SetState(gst.StatePlaying); err != nil { - return nil, fmt.Errorf("setting pipeline state: %w", err) - } + // Construct the struct before setting state, so we have it in case of errors during state change + // Store the outputID so we can potentially clean up the source proxy later if needed previewPipeline := &PreviewPipeline{ outputID: outputID, pipeline: pipeline, webrtcbin: webRtcBin, } + // Connect signals before setting state + log.Debug().Str("pipeline", pipeline.GetName()).Msg("Connecting signals") if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil { + // Attempt cleanup if signal connection fails + pipeline.SetState(gst.StateNull) + // TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err) } + // Now set the preview pipeline to playing + log.Info().Str("pipeline", pipeline.GetName()).Msg("Setting preview pipeline state to PLAYING") + if err := pipeline.SetState(gst.StatePlaying); err != nil { + // Attempt cleanup + pipeline.SetState(gst.StateNull) + // TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID + return nil, fmt.Errorf("setting preview pipeline state to playing: %w", err) + } + log.Info().Str("pipeline", pipeline.GetName()).Msg("Preview pipeline state set to PLAYING") + + return previewPipeline, nil } diff --git a/internal/ndi/device.go b/internal/ndi/device.go index c8132a4..ece1977 100644 --- a/internal/ndi/device.go +++ b/internal/ndi/device.go @@ -145,17 +145,36 @@ func (d *Device) createPipeline() error { } func (d *Device) onNdiSrcDemuxerPadAdded(element *gst.Element, pad *gst.Pad) { + // This is called when ndisrcdemux adds a source pad (e.g., "video", "audio") + // We need to link this source pad to the sink pad of the corresponding tee. + log.Debug().Str("pad", pad.GetName()).Msg("NDI demuxer pad added, linking to tee sink") + + var teeSinkPad *gst.Pad + var teeName string switch pad.GetName() { case "video": - if plr := d.videoTee.GetStaticPad("sink").Link(pad); plr != gst.PadLinkOK { - log.Error().Str("pad", pad.GetName()).Msgf("linking video pad to video tee: %s", plr) - } + teeSinkPad = d.videoTee.GetStaticPad("sink") + teeName = d.videoTee.GetName() case "audio": - if plr := d.audioTee.GetStaticPad("sink").Link(pad); plr != gst.PadLinkOK { - log.Error().Str("pad", pad.GetName()).Msgf("linking audio pad to audio tee: %s", plr) - } + teeSinkPad = d.audioTee.GetStaticPad("sink") + teeName = d.audioTee.GetName() + default: + log.Warn().Str("pad", pad.GetName()).Msg("Ignoring unknown pad from ndisrcdemux") + return } + if teeSinkPad == nil { + log.Error().Str("pad", pad.GetName()).Str("tee", teeName).Msg("Failed to get static sink pad from tee") + return + } + + // Link the demuxer's source pad to the tee's sink pad + log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Attempting to link NDI demuxer pad to tee") + if plr := pad.Link(teeSinkPad); plr != gst.PadLinkOK { + log.Error().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msgf("Linking NDI demuxer pad to tee failed: %s", plr) + } else { + log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Successfully linked NDI demuxer pad to tee") + } } func (d *Device) GetOutput() (string, *gst.Element, *gst.Element, error) { @@ -224,9 +243,41 @@ func (d *Device) setPipelineState(state gst.State) error { return nil } - if err := d.pipeline.SetState(state); err != nil { - return fmt.Errorf("setting pipeline state: %w", err) + log.Info().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("Setting pipeline state") + stateChangeReturn := d.pipeline.SetState(state) + if stateChangeReturn == gst.StateChangeFailure { + log.Error().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("SetState returned failure") + return fmt.Errorf("setting pipeline state to %s failed", state.String()) } + if stateChangeReturn == gst.StateChangeNoPreroll { + log.Warn().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("SetState returned no-preroll") + // No-preroll might be acceptable depending on the pipeline, especially for live sources. + } + + + // Wait briefly for the state change to complete, especially important for PLAYING. + // Use a reasonable timeout (e.g., 2 seconds) + _, currentState, pendingState := d.pipeline.GetState(2 * gst.Second) + log.Debug(). + Str("pipeline", d.pipeline.GetName()). + Str("target_state", state.String()). + Str("current_state", currentState.String()). + Str("pending_state", pendingState.String()). + Msg("Pipeline state after SetState attempt") + + if currentState != state && pendingState != state { + // If neither current nor pending matches the target state after timeout, log a warning. + // This might indicate a problem, but we won't return an error yet, + // as some state transitions might take longer or settle asynchronously. + log.Warn(). + Str("pipeline", d.pipeline.GetName()). + Str("target_state", state.String()). + Str("actual_state", currentState.String()). + Msg("Pipeline did not reach target state within timeout") + } else { + log.Info().Str("pipeline", d.pipeline.GetName()).Str("reached_state", state.String()).Msg("Pipeline state change successful or pending") + } + return nil }