diff --git a/internal/api/preview.go b/internal/api/preview.go index 1d26aa0..cea93ca 100644 --- a/internal/api/preview.go +++ b/internal/api/preview.go @@ -95,24 +95,24 @@ func (api *API) previewHandler(c echo.Context) error { // on-negotiation-needed: Triggered when webrtcbin needs to create an offer or answer. // In our case (server receiving an offer), this fires after the remote description is set. - previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) { + if _, err := previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) { log.Info().Msg("Negotiation needed, creating answer") // Create Answer - promise, err := self.Emit("create-answer") + promise := gst.NewPromise() + _, err := self.Emit("create-answer", nil, promise) if err != nil { log.Error().Err(err).Msg("Failed to emit create-answer") return } - if promise == nil { - log.Error().Msg("Emit create-answer returned nil promise") - return - } // Handle the promise result (asynchronously) - promise.(*gst.Promise).Interrupt() // Interrupt any previous waits - promise.(*gst.Promise).Await(c.Request().Context()) // Wait for the answer to be created - reply := promise.(*gst.Promise).GetReply() + promise.Interrupt() // Interrupt any previous waits + reply, err := promise.Await(c.Request().Context()) // Wait for the answer to be created + if err != nil { + log.Error().Err(err).Msg("Failed to await create-answer promise") + return + } if reply == nil { log.Error().Msg("Promise reply for create-answer was nil") return @@ -132,16 +132,14 @@ func (api *API) previewHandler(c echo.Context) error { log.Debug().Str("sdp", answer.SDP().String()).Msg("Answer created") // Set Local Description - promise, err = self.Emit("set-local-description", answer) + descPromise := gst.NewPromise() + _, err = self.Emit("set-local-description", answer, descPromise) if err != nil { log.Error().Err(err).Msg("Failed to emit set-local-description") return } - if promise == nil { - log.Error().Msg("Emit set-local-description returned nil promise") - return - } - promise.(*gst.Promise).Interrupt() // Interrupt any previous waits + + descPromise.Interrupt() // Interrupt any previous waits log.Info().Msg("Set local description (answer)") @@ -159,10 +157,12 @@ func (api *API) previewHandler(c echo.Context) error { if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { log.Error().Err(err).Msg("Failed to send answer over WebSocket") } - }) + }); err != nil { + log.Error().Err(err).Msg("Failed to connect on-negotiation-needed signal") + } // on-ice-candidate: Triggered when webrtcbin generates a new ICE candidate. - previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string) { + if _, err := previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string, _ any) { log.Debug().Uint("mlineindex", mlineindex).Str("candidate", candidate).Msg("🧊 Generated ICE candidate") // Note: The 'sdpMid' is often derived from the mlineindex or context, // but the signal itself only provides mlineindex and candidate string directly. @@ -183,7 +183,9 @@ func (api *API) previewHandler(c echo.Context) error { if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { log.Error().Err(err).Msg("Failed to send ICE candidate over WebSocket") } - }) + }); err != nil { + log.Error().Err(err).Msg("Failed to connect on-ice-candidate signal") + } // --- WebSocket Message Loop --- for { @@ -454,7 +456,6 @@ func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) { } log.Info().Str("pipeline", pipeline.GetName()).Msg("Preview pipeline state set to PLAYING") - return previewPipeline, nil } diff --git a/internal/gstreamer/debug.go b/internal/gstreamer/debug.go index 74fc828..8adf583 100644 --- a/internal/gstreamer/debug.go +++ b/internal/gstreamer/debug.go @@ -2,23 +2,8 @@ package gstreamer import ( "github.com/go-gst/go-gst/gst" - "github.com/goccy/go-graphviz/cgraph" ) func gstreamerBinToDot(bin *gst.Bin) string { return bin.DebugBinToDotData(gst.DebugGraphShowAll) } - -func setFontSize(graph *cgraph.Graph, fontSize float64) { - graph.SetFontSize(20) - graph.Set("arrowSize", "3") - for node, err := graph.FirstNode(); err == nil && node != nil; node, err = graph.NextNode(node) { - node.SetFontSize(fontSize) - for edge, err := graph.FirstEdge(node); err == nil && edge != nil; edge, err = graph.NextEdge(edge, node) { - edge.SetFontSize(fontSize) - } - } - //for subGraph, err := graph.FirstSubGraph(); err == nil && subGraph != nil; subGraph, err = graph.NextSubGraph() { - // setFontSize(subGraph, fontSize) - //} -} diff --git a/internal/ndi/device.go b/internal/ndi/device.go index ece1977..08af6fd 100644 --- a/internal/ndi/device.go +++ b/internal/ndi/device.go @@ -5,6 +5,8 @@ import ( "github.com/go-gst/go-gst/gst" "github.com/google/uuid" "github.com/rs/zerolog/log" + "os" + "path/filepath" "sync" ) @@ -92,6 +94,8 @@ func (d *Device) createPipeline() error { d.Lock() defer d.Unlock() + log.Debug().Str("name", d.name).Msg("Creating pipeline for device") + pipeline, err := gst.NewPipeline("ndi-" + d.name) if err != nil { return fmt.Errorf("creating pipeline: %w", err) @@ -99,6 +103,10 @@ func (d *Device) createPipeline() error { pipeline.ForceClock(gst.ObtainSystemClock().Clock) input := d.gstDevice.CreateElement("") + if err := pipeline.Add(input); err != nil { + return fmt.Errorf("adding input to pipeline: %w", err) + } + inputQueue, err := gst.NewElementWithProperties("queue2", map[string]interface{}{ "name": "input-queue", @@ -106,6 +114,13 @@ func (d *Device) createPipeline() error { "max-size-bytes": 0, "max-size-time": 40000000, // 40ms }) + if err != nil { + return fmt.Errorf("creating input-queue: %w", err) + } + if err := pipeline.Add(inputQueue); err != nil { + return fmt.Errorf("adding input-queue to pipeline: %w", err) + } + if err := input.Link(inputQueue); err != nil { return fmt.Errorf("linking input to input input-queue: %w", err) } @@ -114,6 +129,9 @@ func (d *Device) createPipeline() error { if err != nil { return fmt.Errorf("creating ndisrcdemux: %w", err) } + if err := pipeline.Add(ndiDemuxer); err != nil { + return fmt.Errorf("adding ndisrcdemux to pipeline: %w", err) + } if err := inputQueue.Link(ndiDemuxer); err != nil { return fmt.Errorf("linking input-queue to ndisrcdemux: %w", err) @@ -123,14 +141,16 @@ func (d *Device) createPipeline() error { if err != nil { return fmt.Errorf("creating video-tee: %w", err) } + if err := pipeline.Add(videoTee); err != nil { + return fmt.Errorf("adding video-tee to pipeline: %w", err) + } audioTee, err := gst.NewElementWithName("tee", "audio-tee") if err != nil { return fmt.Errorf("creating audio-tee: %w", err) } - - if err := pipeline.AddMany(input, ndiDemuxer, videoTee, audioTee); err != nil { - return fmt.Errorf("adding elements to pipeline: %w", err) + if err := pipeline.Add(audioTee); err != nil { + return fmt.Errorf("adding audio-tee to pipeline: %w", err) } d.pipeline = pipeline @@ -140,6 +160,18 @@ func (d *Device) createPipeline() error { if _, err := ndiDemuxer.Connect("pad-added", d.onNdiSrcDemuxerPadAdded); err != nil { return fmt.Errorf("connecting pad-added signal: %w", err) } + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("getting current working directory: %w", err) + } + log.Debug().Str("cwd", cwd).Msg("dumping pipeline to dot file") + filename := filepath.Join(cwd, "dumpDot", "ndi-pipeline.dot") + dot := pipeline.DebugBinToDotData(gst.DebugGraphShowAll) + if err := os.WriteFile(filename, []byte(dot), 0644); err != nil { + return fmt.Errorf("writing dot file: %w", err) + } + + log.Debug().Str("name", d.name).Msg("Pipeline created") return nil } @@ -244,40 +276,10 @@ func (d *Device) setPipelineState(state gst.State) error { } log.Info().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("Setting pipeline state") - stateChangeReturn := d.pipeline.SetState(state) - if stateChangeReturn == gst.StateChangeFailure { - log.Error().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("SetState returned failure") - return fmt.Errorf("setting pipeline state to %s failed", state.String()) + if err := d.pipeline.SetState(state); err != nil { + log.Error().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Err(err).Msg("SetState failed") + return fmt.Errorf("setting pipeline state to %s: %w", state.String(), err) } - if stateChangeReturn == gst.StateChangeNoPreroll { - log.Warn().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("SetState returned no-preroll") - // No-preroll might be acceptable depending on the pipeline, especially for live sources. - } - - - // Wait briefly for the state change to complete, especially important for PLAYING. - // Use a reasonable timeout (e.g., 2 seconds) - _, currentState, pendingState := d.pipeline.GetState(2 * gst.Second) - log.Debug(). - Str("pipeline", d.pipeline.GetName()). - Str("target_state", state.String()). - Str("current_state", currentState.String()). - Str("pending_state", pendingState.String()). - Msg("Pipeline state after SetState attempt") - - if currentState != state && pendingState != state { - // If neither current nor pending matches the target state after timeout, log a warning. - // This might indicate a problem, but we won't return an error yet, - // as some state transitions might take longer or settle asynchronously. - log.Warn(). - Str("pipeline", d.pipeline.GetName()). - Str("target_state", state.String()). - Str("actual_state", currentState.String()). - Msg("Pipeline did not reach target state within timeout") - } else { - log.Info().Str("pipeline", d.pipeline.GetName()).Str("reached_state", state.String()).Msg("Pipeline state change successful or pending") - } - return nil } diff --git a/webRTC-test/index.html b/webRTC-test/index.html new file mode 100644 index 0000000..93b14de --- /dev/null +++ b/webRTC-test/index.html @@ -0,0 +1,15 @@ + + +
+ + +