package api import ( "encoding/json" "fmt" "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" "github.com/go-gst/go-gst/gst/gstsdp" "github.com/go-gst/go-gst/gst/gstwebrtc" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" "github.com/rs/zerolog/log" "net/http" ) const ( previewWidth = 640 previewHeight = 360 previewBitrate = 600 // in kbps previewFPS = 25 ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type PreviewPipeline struct { outputID string pipeline *gst.Pipeline webrtcbin *gst.Element } type SignalMessage struct { Type string `json:"type"` SDP string `json:"sdp,omitempty"` ICE *IceCandidate `json:"ice,omitempty"` } type IceCandidate struct { Candidate string `json:"candidate"` SDPMid string `json:"sdpMid"` SDPMLineIndex uint16 `json:"sdpMLineIndex"` } func (api *API) previewHandler(c echo.Context) error { devicemonitorId := c.Param("devicemonitor-id") source := c.Param("source") streamID := fmt.Sprintf("%s_%s", devicemonitorId, source) //debug for id, _ := range api.deviceMonitors { devicemonitorId = id } log.Debug().Str("streamID", streamID).Msg("Preview stream requested") deviceMonitor, ok := api.deviceMonitors[devicemonitorId] if !ok { return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId)) } sourceDevice := deviceMonitor.GetDevice(source) if sourceDevice == nil { return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source)) } previewPipeline, ok := api.previewPipelines[streamID] var err error if !ok { previewPipeline, err = api.createPreviewPipeline(sourceDevice) if err != nil { log.Error().Err(err).Msg("Failed to create preview pipeline") return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline") } // TODO lock map while writing api.previewPipelines[streamID] = previewPipeline } if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil { log.Error().Err(err).Msg("Failed to set pipeline to playing") return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing") } conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { log.Error().Err(err).Msg("Upgrade error") return err } defer conn.Close() // --- WebRTC Signal Handling --- // on-negotiation-needed: Triggered when webrtcbin needs to create an offer or answer. // In our case (server receiving an offer), this fires after the remote description is set. if _, err := previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) { log.Info().Msg("Negotiation needed, creating answer") // Create Answer promise := gst.NewPromise() _, err := self.Emit("create-answer", nil, promise) if err != nil { log.Error().Err(err).Msg("Failed to emit create-answer") return } // Handle the promise result (asynchronously) promise.Interrupt() // Interrupt any previous waits reply, err := promise.Await(c.Request().Context()) // Wait for the answer to be created if err != nil { log.Error().Err(err).Msg("Failed to await create-answer promise") return } if reply == nil { log.Error().Msg("Promise reply for create-answer was nil") return } answerValue, err := reply.GetValue("answer") if err != nil || answerValue == nil { log.Error().Msg("Failed to get answer from promise reply") return } answer, ok := answerValue.(*gstwebrtc.SessionDescription) if !ok || answer == nil { log.Error().Msg("Answer value is not a WebRTCSessionDescription") return } log.Debug().Str("sdp", answer.SDP().String()).Msg("Answer created") // Set Local Description descPromise := gst.NewPromise() _, err = self.Emit("set-local-description", answer, descPromise) if err != nil { log.Error().Err(err).Msg("Failed to emit set-local-description") return } descPromise.Interrupt() // Interrupt any previous waits log.Info().Msg("Set local description (answer)") // Send Answer back to the client log.Info().Msg("Sending SDP answer back to browser") response := SignalMessage{ Type: "answer", SDP: answer.SDP().String(), } msg, err := json.Marshal(response) if err != nil { log.Error().Err(err).Msg("Failed to marshal answer") return } if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { log.Error().Err(err).Msg("Failed to send answer over WebSocket") } }); err != nil { log.Error().Err(err).Msg("Failed to connect on-negotiation-needed signal") } // on-ice-candidate: Triggered when webrtcbin generates a new ICE candidate. if _, err := previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string, _ any) { log.Debug().Uint("mlineindex", mlineindex).Str("candidate", candidate).Msg("🧊 Generated ICE candidate") // Note: The 'sdpMid' is often derived from the mlineindex or context, // but the signal itself only provides mlineindex and candidate string directly. // The JS side seems to handle this structure correctly. ice := SignalMessage{ Type: "ice", ICE: &IceCandidate{ Candidate: candidate, SDPMLineIndex: uint16(mlineindex), // Assuming JS expects uint16 // sdpMid might need to be derived if required, but often index is enough }, } msg, err := json.Marshal(ice) if err != nil { log.Error().Err(err).Msg("Failed to marshal ICE candidate") return } if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { log.Error().Err(err).Msg("Failed to send ICE candidate over WebSocket") } }); err != nil { log.Error().Err(err).Msg("Failed to connect on-ice-candidate signal") } // --- WebSocket Message Loop --- for { _, message, err := conn.ReadMessage() if err != nil { log.Error().Err(err).Msg("WebSocket read error") break } var signal SignalMessage if err := json.Unmarshal(message, &signal); err != nil { log.Warn().Err(err).Msg("Failed to unmarshal signal message") log.Warn().Err(err).Str("message", string(message)).Msg("Failed to unmarshal signal message") continue } switch signal.Type { case "offer": log.Info().Msg("Received SDP offer from browser") //log.Debug().Str("sdp", signal.SDP).Msg("Offer SDP") // Create WebRTCSessionDescription for the offer offerMsg, err := gstsdp.ParseSDPMessage(signal.SDP) if err != nil { log.Error().Err(err).Msg("Failed to parse SDP message") continue } offerDesc := gstwebrtc.NewSessionDescription(gstwebrtc.SDP_TYPE_OFFER, offerMsg) promise := gst.NewPromise() // Set Remote Description // This will trigger on-negotiation-needed if successful and state allows if _, err := previewPipeline.webrtcbin.Emit("set-remote-description", offerDesc, promise); err != nil { log.Error().Err(err).Msg("Failed to emit set-remote-description") continue } if promise == nil { log.Error().Msg("Emit set-remote-description returned nil promise") continue } //promise.Interrupt() // Interrupt previous waits if any promise.Await(c.Request().Context()) // Wait for the remote description to be set log.Info().Msg("Set remote description (offer)") // Answer creation is now handled in on-negotiation-needed case "ice": if signal.ICE == nil { log.Warn().Msg("Received ICE signal with nil ICE field") continue } log.Debug().Str("candidate", signal.ICE.Candidate).Uint16("mlineindex", signal.ICE.SDPMLineIndex).Str("sdpMid", signal.ICE.SDPMid).Msg("Received ICE candidate from browser") // Add ICE Candidate // Note: The signal takes mlineindex (uint) and candidate (string). // sdpMid is usually associated but not directly passed here. _, err := previewPipeline.webrtcbin.Emit("add-ice-candidate", uint(signal.ICE.SDPMLineIndex), signal.ICE.Candidate) if err != nil { log.Error().Err(err).Msg("Failed to emit add-ice-candidate") } else { log.Debug().Msg("Added ICE candidate") } default: log.Warn().Str("type", signal.Type).Msg("Received unknown signal type") } } return nil } func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) { pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName())) if err != nil { return nil, fmt.Errorf("creating pipeline: %w", err) } videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src") if err != nil { return nil, fmt.Errorf("creating video proxy src: %w", err) } if err := pipeline.Add(videoProxySrc); err != nil { return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err) } videoConvertScale, err := gst.NewElement("videoconvertscale") if err != nil { return nil, fmt.Errorf("creating video convert scale: %w", err) } if err := pipeline.Add(videoConvertScale); err != nil { return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err) } if err := videoProxySrc.Link(videoConvertScale); err != nil { return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err) } videoRate, err := gst.NewElement("videorate") if err != nil { return nil, fmt.Errorf("creating video rate: %w", err) } if err := pipeline.Add(videoRate); err != nil { return nil, fmt.Errorf("adding video rate to pipeline: %w", err) } if err := videoConvertScale.Link(videoRate); err != nil { return nil, fmt.Errorf("linking video convert scale to rate: %w", err) } convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS)) videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{ "bitrate": previewBitrate, "speed-preset": 1, // ultrafast "tune": 0x00000004, // zero-latency }) if err != nil { return nil, fmt.Errorf("creating video encoder: %w", err) } if err := pipeline.Add(videoEncoder); err != nil { return nil, fmt.Errorf("adding video encoder to pipeline: %w", err) } if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil { return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err) } videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{ "aggregate-mode": 1, // "zero-latency" "pt": 96, }) if err != nil { return nil, fmt.Errorf("creating video payloader: %w", err) } if err := pipeline.Add(videoPayloader); err != nil { return nil, fmt.Errorf("adding video payloader to pipeline: %w", err) } if err := videoEncoder.Link(videoPayloader); err != nil { return nil, fmt.Errorf("linking video encoder to payloader: %w", err) } multiQueue, err := gst.NewElement("multiqueue") if err != nil { return nil, fmt.Errorf("creating multiqueue: %w", err) } if err := pipeline.Add(multiQueue); err != nil { return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err) } videoPayloaderSrcPad := videoPayloader.GetStaticPad("src") videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u") if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK { return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String()) } webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{ "stun-server": "stun://stun.l.google.com:19302", }) if err != nil { return nil, fmt.Errorf("creating webrtcbin: %w", err) } if err := pipeline.Add(webRtcBin); err != nil { return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err) } audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src") if err != nil { return nil, fmt.Errorf("creating audio proxy src: %w", err) } if err := pipeline.Add(audioProxySrc); err != nil { return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err) } audioConvert, err := gst.NewElement("audioconvert") if err != nil { return nil, fmt.Errorf("creating audio convert: %w", err) } if err := pipeline.Add(audioConvert); err != nil { return nil, fmt.Errorf("adding audio convert to pipeline: %w", err) } if err := audioProxySrc.Link(audioConvert); err != nil { return nil, fmt.Errorf("linking audio proxy src to convert: %w", err) } audioRate, err := gst.NewElement("audiorate") if err != nil { return nil, fmt.Errorf("creating audio rate: %w", err) } if err := pipeline.Add(audioRate); err != nil { return nil, fmt.Errorf("adding audio rate to pipeline: %w", err) } if err := audioConvert.Link(audioRate); err != nil { return nil, fmt.Errorf("linking audio convert to rate: %w", err) } audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000") audioEncoder, err := gst.NewElement("opusenc") if err != nil { return nil, fmt.Errorf("creating audio encoder: %w", err) } if err := pipeline.Add(audioEncoder); err != nil { return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err) } if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil { return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err) } audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{ "pt": 97, }) if err != nil { return nil, fmt.Errorf("creating audio payloader: %w", err) } if err := pipeline.Add(audioPayloader); err != nil { return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err) } if err := audioEncoder.Link(audioPayloader); err != nil { return nil, fmt.Errorf("linking audio encoder to payloader: %w", err) } audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u") audioPayloaderSrcPad := audioPayloader.GetStaticPad("src") if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK { return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String()) } outputID, videoSrc, audioSrc, err := d.GetOutput() if err != nil { return nil, fmt.Errorf("getting output: %w", err) } if err := linkProxySink(videoProxySrc, videoSrc); err != nil { return nil, fmt.Errorf("setting video proxy src proxysink: %w", err) } if err := linkProxySink(audioProxySrc, audioSrc); err != nil { return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err) } // Construct the struct before setting state, so we have it in case of errors during state change // Store the outputID so we can potentially clean up the source proxy later if needed previewPipeline := &PreviewPipeline{ outputID: outputID, pipeline: pipeline, webrtcbin: webRtcBin, } // Connect signals before setting state log.Debug().Str("pipeline", pipeline.GetName()).Msg("Connecting signals") if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil { // Attempt cleanup if signal connection fails pipeline.SetState(gst.StateNull) // TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err) } // Now set the preview pipeline to playing log.Info().Str("pipeline", pipeline.GetName()).Msg("Setting preview pipeline state to PLAYING") if err := pipeline.SetState(gst.StatePlaying); err != nil { // Attempt cleanup pipeline.SetState(gst.StateNull) // TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID return nil, fmt.Errorf("setting preview pipeline state to playing: %w", err) } log.Info().Str("pipeline", pipeline.GetName()).Msg("Preview pipeline state set to PLAYING") return previewPipeline, nil } func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) { log.Debug().Msg("Multiqueue pad added") if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK { log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad") return } } // https://github.com/go-gst/go-gst/issues/165 func linkProxySink(src, sink *gst.Element) error { propType, err := src.GetPropertyType("proxysink") if err != nil { return fmt.Errorf("failed to get proxysink property type: %w", err) } val, err := glib.ValueInit(propType) if err != nil { return fmt.Errorf("failed to init proxysink property value: %w", err) } val.SetInstance(sink.GetPrivate()) return src.SetPropertyValue("proxysink", val) }