catie/internal/api/preview.go

466 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"encoding/json"
"fmt"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog/log"
"net/http"
)
const (
previewWidth = 640
previewHeight = 360
previewBitrate = 600 // in kbps
previewFPS = 25
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type PreviewPipeline struct {
outputID string
pipeline *gst.Pipeline
webrtcbin *gst.Element
}
type SignalMessage struct {
Type string `json:"type"`
SDP string `json:"sdp,omitempty"`
ICE *IceCandidate `json:"ice,omitempty"`
}
type IceCandidate struct {
Candidate string `json:"candidate"`
SDPMid string `json:"sdpMid"`
SDPMLineIndex uint16 `json:"sdpMLineIndex"`
}
func (api *API) previewHandler(c echo.Context) error {
devicemonitorId := c.Param("devicemonitor-id")
source := c.Param("source")
streamID := fmt.Sprintf("%s_%s", devicemonitorId, source)
//debug
for id, _ := range api.deviceMonitors {
devicemonitorId = id
}
log.Debug().Str("streamID", streamID).Msg("Preview stream requested")
deviceMonitor, ok := api.deviceMonitors[devicemonitorId]
if !ok {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId))
}
sourceDevice := deviceMonitor.GetDevice(source)
if sourceDevice == nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source))
}
previewPipeline, ok := api.previewPipelines[streamID]
var err error
if !ok {
previewPipeline, err = api.createPreviewPipeline(sourceDevice)
if err != nil {
log.Error().Err(err).Msg("Failed to create preview pipeline")
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline")
}
// TODO lock map while writing
api.previewPipelines[streamID] = previewPipeline
}
if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil {
log.Error().Err(err).Msg("Failed to set pipeline to playing")
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing")
}
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
log.Error().Err(err).Msg("Upgrade error")
return err
}
defer conn.Close()
// --- WebRTC Signal Handling ---
// on-negotiation-needed: Triggered when webrtcbin needs to create an offer or answer.
// In our case (server receiving an offer), this fires after the remote description is set.
previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) {
log.Info().Msg("🧠 Negotiation needed, creating answer...")
// Create Answer
promise, err := self.Emit("create-answer")
if err != nil {
log.Error().Err(err).Msg("Failed to emit create-answer")
return
}
if promise == nil {
log.Error().Msg("Emit create-answer returned nil promise")
return
}
// Handle the promise result (asynchronously)
promise.(*gst.Promise).Interrupt() // Interrupt any previous waits
promise.(*gst.Promise).Wait() // Wait for the answer to be created
reply := promise.(*gst.Promise).GetReply()
if reply == nil {
log.Error().Msg("Promise reply for create-answer was nil")
return
}
answerValue, ok := reply.GetValue("answer")
if !ok || answerValue == nil {
log.Error().Msg("Failed to get answer from promise reply")
return
}
answer, ok := answerValue.(*gst.WebRTCSessionDescription)
if !ok || answer == nil {
log.Error().Msg("Answer value is not a WebRTCSessionDescription")
return
}
log.Debug().Str("sdp", answer.GetSDP().String()).Msg("✅ Answer created")
// Set Local Description
promise, err = self.Emit("set-local-description", answer)
if err != nil {
log.Error().Err(err).Msg("Failed to emit set-local-description")
return
}
if promise == nil {
log.Error().Msg("Emit set-local-description returned nil promise")
return
}
promise.(*gst.Promise).Interrupt() // Interrupt any previous waits
log.Info().Msg("➡️ Set local description (answer)")
// Send Answer back to the client
log.Info().Msg("📨 Sending SDP answer back to browser")
response := SignalMessage{
Type: "answer",
SDP: answer.GetSDP().String(),
}
msg, err := json.Marshal(response)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal answer")
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Error().Err(err).Msg("Failed to send answer over WebSocket")
}
})
// on-ice-candidate: Triggered when webrtcbin generates a new ICE candidate.
previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string) {
log.Debug().Uint("mlineindex", mlineindex).Str("candidate", candidate).Msg("🧊 Generated ICE candidate")
// Note: The 'sdpMid' is often derived from the mlineindex or context,
// but the signal itself only provides mlineindex and candidate string directly.
// The JS side seems to handle this structure correctly.
ice := SignalMessage{
Type: "ice",
ICE: &IceCandidate{
Candidate: candidate,
SDPMLineIndex: uint16(mlineindex), // Assuming JS expects uint16
// sdpMid might need to be derived if required, but often index is enough
},
}
msg, err := json.Marshal(ice)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal ICE candidate")
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Error().Err(err).Msg("Failed to send ICE candidate over WebSocket")
}
})
// --- WebSocket Message Loop ---
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Error().Err(err).Msg("WebSocket read error")
break
}
var signal SignalMessage
if err := json.Unmarshal(message, &signal); err != nil {
log.Warn().Err(err).Msg("Failed to unmarshal signal message")
log.Warn().Err(err).Str("message", string(message)).Msg("Failed to unmarshal signal message")
continue
}
switch signal.Type {
case "offer":
log.Info().Msg("📥 Received SDP offer from browser")
log.Debug().Str("sdp", signal.SDP).Msg("Offer SDP")
// Create WebRTCSessionDescription for the offer
offerDesc, err := gst.NewWebRTCSessionDescription(gst.WebRTCSDPTypeOffer, gst.NewSDPMessageFromString(signal.SDP))
if err != nil {
log.Error().Err(err).Msg("Failed to create offer description from SDP")
continue
}
// Set Remote Description
// This will trigger on-negotiation-needed if successful and state allows
promise, err := previewPipeline.webrtcbin.Emit("set-remote-description", offerDesc)
if err != nil {
log.Error().Err(err).Msg("Failed to emit set-remote-description")
continue
}
if promise == nil {
log.Error().Msg("Emit set-remote-description returned nil promise")
continue
}
promise.(*gst.Promise).Interrupt() // Interrupt previous waits if any
log.Info().Msg("➡️ Set remote description (offer)")
// Answer creation is now handled in on-negotiation-needed
case "ice":
if signal.ICE == nil {
log.Warn().Msg("Received ICE signal with nil ICE field")
continue
}
log.Debug().Str("candidate", signal.ICE.Candidate).Uint16("mlineindex", signal.ICE.SDPMLineIndex).Str("sdpMid", signal.ICE.SDPMid).Msg(" Received ICE candidate from browser")
// Add ICE Candidate
// Note: The signal takes mlineindex (uint) and candidate (string).
// sdpMid is usually associated but not directly passed here.
_, err := previewPipeline.webrtcbin.Emit("add-ice-candidate", uint(signal.ICE.SDPMLineIndex), signal.ICE.Candidate)
if err != nil {
log.Error().Err(err).Msg("Failed to emit add-ice-candidate")
} else {
log.Debug().Msg("Added ICE candidate")
}
default:
log.Warn().Str("type", signal.Type).Msg("Received unknown signal type")
}
}
return nil
}
func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) {
pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName()))
if err != nil {
return nil, fmt.Errorf("creating pipeline: %w", err)
}
videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src")
if err != nil {
return nil, fmt.Errorf("creating video proxy src: %w", err)
}
if err := pipeline.Add(videoProxySrc); err != nil {
return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err)
}
videoConvertScale, err := gst.NewElement("videoconvertscale")
if err != nil {
return nil, fmt.Errorf("creating video convert scale: %w", err)
}
if err := pipeline.Add(videoConvertScale); err != nil {
return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err)
}
if err := videoProxySrc.Link(videoConvertScale); err != nil {
return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err)
}
videoRate, err := gst.NewElement("videorate")
if err != nil {
return nil, fmt.Errorf("creating video rate: %w", err)
}
if err := pipeline.Add(videoRate); err != nil {
return nil, fmt.Errorf("adding video rate to pipeline: %w", err)
}
if err := videoConvertScale.Link(videoRate); err != nil {
return nil, fmt.Errorf("linking video convert scale to rate: %w", err)
}
convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS))
log.Debug().Str("caps", convertCaps.String()).Msg("Video convert caps")
videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{
"bitrate": previewBitrate,
"speed-preset": 1, // ultrafast
"tune": 0x00000004, // zero-latency
})
if err != nil {
return nil, fmt.Errorf("creating video encoder: %w", err)
}
if err := pipeline.Add(videoEncoder); err != nil {
return nil, fmt.Errorf("adding video encoder to pipeline: %w", err)
}
if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil {
return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err)
}
videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{
"aggregate-mode": 1, // "zero-latency"
"pt": 96,
})
if err != nil {
return nil, fmt.Errorf("creating video payloader: %w", err)
}
if err := pipeline.Add(videoPayloader); err != nil {
return nil, fmt.Errorf("adding video payloader to pipeline: %w", err)
}
if err := videoEncoder.Link(videoPayloader); err != nil {
return nil, fmt.Errorf("linking video encoder to payloader: %w", err)
}
multiQueue, err := gst.NewElement("multiqueue")
if err != nil {
return nil, fmt.Errorf("creating multiqueue: %w", err)
}
if err := pipeline.Add(multiQueue); err != nil {
return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err)
}
videoPayloaderSrcPad := videoPayloader.GetStaticPad("src")
videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK {
return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String())
}
webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{
"stun-server": "stun://stun.l.google.com:19302",
})
if err != nil {
return nil, fmt.Errorf("creating webrtcbin: %w", err)
}
if err := pipeline.Add(webRtcBin); err != nil {
return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err)
}
audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src")
if err != nil {
return nil, fmt.Errorf("creating audio proxy src: %w", err)
}
if err := pipeline.Add(audioProxySrc); err != nil {
return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err)
}
audioConvert, err := gst.NewElement("audioconvert")
if err != nil {
return nil, fmt.Errorf("creating audio convert: %w", err)
}
if err := pipeline.Add(audioConvert); err != nil {
return nil, fmt.Errorf("adding audio convert to pipeline: %w", err)
}
if err := audioProxySrc.Link(audioConvert); err != nil {
return nil, fmt.Errorf("linking audio proxy src to convert: %w", err)
}
audioRate, err := gst.NewElement("audiorate")
if err != nil {
return nil, fmt.Errorf("creating audio rate: %w", err)
}
if err := pipeline.Add(audioRate); err != nil {
return nil, fmt.Errorf("adding audio rate to pipeline: %w", err)
}
if err := audioConvert.Link(audioRate); err != nil {
return nil, fmt.Errorf("linking audio convert to rate: %w", err)
}
audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000")
audioEncoder, err := gst.NewElement("opusenc")
if err != nil {
return nil, fmt.Errorf("creating audio encoder: %w", err)
}
if err := pipeline.Add(audioEncoder); err != nil {
return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err)
}
if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil {
return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err)
}
audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{
"pt": 97,
})
if err != nil {
return nil, fmt.Errorf("creating audio payloader: %w", err)
}
if err := pipeline.Add(audioPayloader); err != nil {
return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err)
}
if err := audioEncoder.Link(audioPayloader); err != nil {
return nil, fmt.Errorf("linking audio encoder to payloader: %w", err)
}
audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
audioPayloaderSrcPad := audioPayloader.GetStaticPad("src")
if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK {
return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String())
}
outputID, videoSrc, audioSrc, err := d.GetOutput()
if err != nil {
return nil, fmt.Errorf("getting output: %w", err)
}
if err := linkProxySink(videoProxySrc, videoSrc); err != nil {
return nil, fmt.Errorf("setting video proxy src proxysink: %w", err)
}
if err := linkProxySink(audioProxySrc, audioSrc); err != nil {
return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err)
}
if err := pipeline.SetState(gst.StatePlaying); err != nil {
return nil, fmt.Errorf("setting pipeline state: %w", err)
}
previewPipeline := &PreviewPipeline{
outputID: outputID,
pipeline: pipeline,
webrtcbin: webRtcBin,
}
if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil {
return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err)
}
return previewPipeline, nil
}
func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) {
log.Debug().Msg("Multiqueue pad added")
if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK {
log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad")
return
}
}
// https://github.com/go-gst/go-gst/issues/165
func linkProxySink(src, sink *gst.Element) error {
propType, err := src.GetPropertyType("proxysink")
if err != nil {
return fmt.Errorf("failed to get proxysink property type: %w", err)
}
val, err := glib.ValueInit(propType)
if err != nil {
return fmt.Errorf("failed to init proxysink property value: %w", err)
}
val.SetInstance(sink.GetPrivate())
return src.SetPropertyValue("proxysink", val)
}