470 lines
15 KiB
Go
470 lines
15 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/go-gst/go-glib/glib"
|
|
"github.com/go-gst/go-gst/gst"
|
|
"github.com/go-gst/go-gst/gst/gstsdp"
|
|
"github.com/go-gst/go-gst/gst/gstwebrtc"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/rs/zerolog/log"
|
|
"net/http"
|
|
)
|
|
|
|
const (
|
|
previewWidth = 640
|
|
previewHeight = 360
|
|
previewBitrate = 600 // in kbps
|
|
previewFPS = 25
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
}
|
|
|
|
type PreviewPipeline struct {
|
|
outputID string
|
|
|
|
pipeline *gst.Pipeline
|
|
webrtcbin *gst.Element
|
|
}
|
|
|
|
type SignalMessage struct {
|
|
Type string `json:"type"`
|
|
SDP string `json:"sdp,omitempty"`
|
|
ICE *IceCandidate `json:"ice,omitempty"`
|
|
}
|
|
|
|
type IceCandidate struct {
|
|
Candidate string `json:"candidate"`
|
|
SDPMid string `json:"sdpMid"`
|
|
SDPMLineIndex uint16 `json:"sdpMLineIndex"`
|
|
}
|
|
|
|
func (api *API) previewHandler(c echo.Context) error {
|
|
devicemonitorId := c.Param("devicemonitor-id")
|
|
source := c.Param("source")
|
|
streamID := fmt.Sprintf("%s_%s", devicemonitorId, source)
|
|
|
|
//debug
|
|
for id, _ := range api.deviceMonitors {
|
|
devicemonitorId = id
|
|
}
|
|
|
|
log.Debug().Str("streamID", streamID).Msg("Preview stream requested")
|
|
|
|
deviceMonitor, ok := api.deviceMonitors[devicemonitorId]
|
|
if !ok {
|
|
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId))
|
|
}
|
|
|
|
sourceDevice := deviceMonitor.GetDevice(source)
|
|
if sourceDevice == nil {
|
|
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source))
|
|
}
|
|
|
|
previewPipeline, ok := api.previewPipelines[streamID]
|
|
var err error
|
|
if !ok {
|
|
previewPipeline, err = api.createPreviewPipeline(sourceDevice)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create preview pipeline")
|
|
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline")
|
|
}
|
|
|
|
// TODO lock map while writing
|
|
api.previewPipelines[streamID] = previewPipeline
|
|
}
|
|
if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil {
|
|
log.Error().Err(err).Msg("Failed to set pipeline to playing")
|
|
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing")
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Upgrade error")
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
// --- WebRTC Signal Handling ---
|
|
|
|
// on-negotiation-needed: Triggered when webrtcbin needs to create an offer or answer.
|
|
// In our case (server receiving an offer), this fires after the remote description is set.
|
|
previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) {
|
|
log.Info().Msg("Negotiation needed, creating answer")
|
|
|
|
// Create Answer
|
|
promise, err := self.Emit("create-answer")
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to emit create-answer")
|
|
return
|
|
}
|
|
if promise == nil {
|
|
log.Error().Msg("Emit create-answer returned nil promise")
|
|
return
|
|
}
|
|
|
|
// Handle the promise result (asynchronously)
|
|
promise.(*gst.Promise).Interrupt() // Interrupt any previous waits
|
|
promise.(*gst.Promise).Await(c.Request().Context()) // Wait for the answer to be created
|
|
reply := promise.(*gst.Promise).GetReply()
|
|
if reply == nil {
|
|
log.Error().Msg("Promise reply for create-answer was nil")
|
|
return
|
|
}
|
|
|
|
answerValue, err := reply.GetValue("answer")
|
|
if err != nil || answerValue == nil {
|
|
log.Error().Msg("Failed to get answer from promise reply")
|
|
return
|
|
}
|
|
answer, ok := answerValue.(*gstwebrtc.SessionDescription)
|
|
if !ok || answer == nil {
|
|
log.Error().Msg("Answer value is not a WebRTCSessionDescription")
|
|
return
|
|
}
|
|
|
|
log.Debug().Str("sdp", answer.SDP().String()).Msg("Answer created")
|
|
|
|
// Set Local Description
|
|
promise, err = self.Emit("set-local-description", answer)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to emit set-local-description")
|
|
return
|
|
}
|
|
if promise == nil {
|
|
log.Error().Msg("Emit set-local-description returned nil promise")
|
|
return
|
|
}
|
|
promise.(*gst.Promise).Interrupt() // Interrupt any previous waits
|
|
|
|
log.Info().Msg("Set local description (answer)")
|
|
|
|
// Send Answer back to the client
|
|
log.Info().Msg("Sending SDP answer back to browser")
|
|
response := SignalMessage{
|
|
Type: "answer",
|
|
SDP: answer.SDP().String(),
|
|
}
|
|
msg, err := json.Marshal(response)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to marshal answer")
|
|
return
|
|
}
|
|
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
log.Error().Err(err).Msg("Failed to send answer over WebSocket")
|
|
}
|
|
})
|
|
|
|
// on-ice-candidate: Triggered when webrtcbin generates a new ICE candidate.
|
|
previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string) {
|
|
log.Debug().Uint("mlineindex", mlineindex).Str("candidate", candidate).Msg("🧊 Generated ICE candidate")
|
|
// Note: The 'sdpMid' is often derived from the mlineindex or context,
|
|
// but the signal itself only provides mlineindex and candidate string directly.
|
|
// The JS side seems to handle this structure correctly.
|
|
ice := SignalMessage{
|
|
Type: "ice",
|
|
ICE: &IceCandidate{
|
|
Candidate: candidate,
|
|
SDPMLineIndex: uint16(mlineindex), // Assuming JS expects uint16
|
|
// sdpMid might need to be derived if required, but often index is enough
|
|
},
|
|
}
|
|
msg, err := json.Marshal(ice)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to marshal ICE candidate")
|
|
return
|
|
}
|
|
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
log.Error().Err(err).Msg("Failed to send ICE candidate over WebSocket")
|
|
}
|
|
})
|
|
|
|
// --- WebSocket Message Loop ---
|
|
for {
|
|
_, message, err := conn.ReadMessage()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("WebSocket read error")
|
|
break
|
|
}
|
|
var signal SignalMessage
|
|
if err := json.Unmarshal(message, &signal); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to unmarshal signal message")
|
|
log.Warn().Err(err).Str("message", string(message)).Msg("Failed to unmarshal signal message")
|
|
continue
|
|
}
|
|
|
|
switch signal.Type {
|
|
case "offer":
|
|
log.Info().Msg("Received SDP offer from browser")
|
|
//log.Debug().Str("sdp", signal.SDP).Msg("Offer SDP")
|
|
|
|
// Create WebRTCSessionDescription for the offer
|
|
offerMsg, err := gstsdp.ParseSDPMessage(signal.SDP)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to parse SDP message")
|
|
continue
|
|
}
|
|
offerDesc := gstwebrtc.NewSessionDescription(gstwebrtc.SDP_TYPE_OFFER, offerMsg)
|
|
|
|
promise := gst.NewPromise()
|
|
// Set Remote Description
|
|
// This will trigger on-negotiation-needed if successful and state allows
|
|
if _, err := previewPipeline.webrtcbin.Emit("set-remote-description", offerDesc, promise); err != nil {
|
|
log.Error().Err(err).Msg("Failed to emit set-remote-description")
|
|
continue
|
|
}
|
|
if promise == nil {
|
|
log.Error().Msg("Emit set-remote-description returned nil promise")
|
|
continue
|
|
}
|
|
//promise.Interrupt() // Interrupt previous waits if any
|
|
promise.Await(c.Request().Context()) // Wait for the remote description to be set
|
|
|
|
log.Info().Msg("Set remote description (offer)")
|
|
// Answer creation is now handled in on-negotiation-needed
|
|
|
|
case "ice":
|
|
if signal.ICE == nil {
|
|
log.Warn().Msg("Received ICE signal with nil ICE field")
|
|
continue
|
|
}
|
|
log.Debug().Str("candidate", signal.ICE.Candidate).Uint16("mlineindex", signal.ICE.SDPMLineIndex).Str("sdpMid", signal.ICE.SDPMid).Msg("Received ICE candidate from browser")
|
|
|
|
// Add ICE Candidate
|
|
// Note: The signal takes mlineindex (uint) and candidate (string).
|
|
// sdpMid is usually associated but not directly passed here.
|
|
_, err := previewPipeline.webrtcbin.Emit("add-ice-candidate", uint(signal.ICE.SDPMLineIndex), signal.ICE.Candidate)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to emit add-ice-candidate")
|
|
} else {
|
|
log.Debug().Msg("Added ICE candidate")
|
|
}
|
|
|
|
default:
|
|
log.Warn().Str("type", signal.Type).Msg("Received unknown signal type")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) {
|
|
pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName()))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating pipeline: %w", err)
|
|
}
|
|
|
|
videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating video proxy src: %w", err)
|
|
}
|
|
if err := pipeline.Add(videoProxySrc); err != nil {
|
|
return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err)
|
|
}
|
|
|
|
videoConvertScale, err := gst.NewElement("videoconvertscale")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating video convert scale: %w", err)
|
|
}
|
|
if err := pipeline.Add(videoConvertScale); err != nil {
|
|
return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err)
|
|
}
|
|
|
|
if err := videoProxySrc.Link(videoConvertScale); err != nil {
|
|
return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err)
|
|
}
|
|
|
|
videoRate, err := gst.NewElement("videorate")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating video rate: %w", err)
|
|
}
|
|
if err := pipeline.Add(videoRate); err != nil {
|
|
return nil, fmt.Errorf("adding video rate to pipeline: %w", err)
|
|
}
|
|
|
|
if err := videoConvertScale.Link(videoRate); err != nil {
|
|
return nil, fmt.Errorf("linking video convert scale to rate: %w", err)
|
|
}
|
|
|
|
convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS))
|
|
log.Debug().Str("caps", convertCaps.String()).Msg("Video convert caps")
|
|
|
|
videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{
|
|
"bitrate": previewBitrate,
|
|
"speed-preset": 1, // ultrafast
|
|
"tune": 0x00000004, // zero-latency
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating video encoder: %w", err)
|
|
}
|
|
if err := pipeline.Add(videoEncoder); err != nil {
|
|
return nil, fmt.Errorf("adding video encoder to pipeline: %w", err)
|
|
}
|
|
|
|
if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil {
|
|
return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err)
|
|
}
|
|
|
|
videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{
|
|
"aggregate-mode": 1, // "zero-latency"
|
|
"pt": 96,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating video payloader: %w", err)
|
|
}
|
|
if err := pipeline.Add(videoPayloader); err != nil {
|
|
return nil, fmt.Errorf("adding video payloader to pipeline: %w", err)
|
|
}
|
|
|
|
if err := videoEncoder.Link(videoPayloader); err != nil {
|
|
return nil, fmt.Errorf("linking video encoder to payloader: %w", err)
|
|
}
|
|
|
|
multiQueue, err := gst.NewElement("multiqueue")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating multiqueue: %w", err)
|
|
}
|
|
if err := pipeline.Add(multiQueue); err != nil {
|
|
return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err)
|
|
}
|
|
|
|
videoPayloaderSrcPad := videoPayloader.GetStaticPad("src")
|
|
videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
|
|
|
|
if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK {
|
|
return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String())
|
|
}
|
|
|
|
webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{
|
|
"stun-server": "stun://stun.l.google.com:19302",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating webrtcbin: %w", err)
|
|
}
|
|
if err := pipeline.Add(webRtcBin); err != nil {
|
|
return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err)
|
|
}
|
|
|
|
audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating audio proxy src: %w", err)
|
|
}
|
|
if err := pipeline.Add(audioProxySrc); err != nil {
|
|
return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err)
|
|
}
|
|
|
|
audioConvert, err := gst.NewElement("audioconvert")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating audio convert: %w", err)
|
|
}
|
|
if err := pipeline.Add(audioConvert); err != nil {
|
|
return nil, fmt.Errorf("adding audio convert to pipeline: %w", err)
|
|
}
|
|
|
|
if err := audioProxySrc.Link(audioConvert); err != nil {
|
|
return nil, fmt.Errorf("linking audio proxy src to convert: %w", err)
|
|
}
|
|
|
|
audioRate, err := gst.NewElement("audiorate")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating audio rate: %w", err)
|
|
}
|
|
if err := pipeline.Add(audioRate); err != nil {
|
|
return nil, fmt.Errorf("adding audio rate to pipeline: %w", err)
|
|
}
|
|
|
|
if err := audioConvert.Link(audioRate); err != nil {
|
|
return nil, fmt.Errorf("linking audio convert to rate: %w", err)
|
|
}
|
|
|
|
audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000")
|
|
|
|
audioEncoder, err := gst.NewElement("opusenc")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating audio encoder: %w", err)
|
|
}
|
|
if err := pipeline.Add(audioEncoder); err != nil {
|
|
return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err)
|
|
}
|
|
|
|
if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil {
|
|
return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err)
|
|
}
|
|
|
|
audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{
|
|
"pt": 97,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating audio payloader: %w", err)
|
|
}
|
|
if err := pipeline.Add(audioPayloader); err != nil {
|
|
return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err)
|
|
}
|
|
|
|
if err := audioEncoder.Link(audioPayloader); err != nil {
|
|
return nil, fmt.Errorf("linking audio encoder to payloader: %w", err)
|
|
}
|
|
|
|
audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
|
|
audioPayloaderSrcPad := audioPayloader.GetStaticPad("src")
|
|
if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK {
|
|
return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String())
|
|
}
|
|
|
|
outputID, videoSrc, audioSrc, err := d.GetOutput()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting output: %w", err)
|
|
}
|
|
if err := linkProxySink(videoProxySrc, videoSrc); err != nil {
|
|
return nil, fmt.Errorf("setting video proxy src proxysink: %w", err)
|
|
}
|
|
if err := linkProxySink(audioProxySrc, audioSrc); err != nil {
|
|
return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err)
|
|
}
|
|
|
|
if err := pipeline.SetState(gst.StatePlaying); err != nil {
|
|
return nil, fmt.Errorf("setting pipeline state: %w", err)
|
|
}
|
|
previewPipeline := &PreviewPipeline{
|
|
outputID: outputID,
|
|
pipeline: pipeline,
|
|
webrtcbin: webRtcBin,
|
|
}
|
|
|
|
if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil {
|
|
return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err)
|
|
}
|
|
|
|
return previewPipeline, nil
|
|
}
|
|
|
|
func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) {
|
|
log.Debug().Msg("Multiqueue pad added")
|
|
if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK {
|
|
log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad")
|
|
return
|
|
}
|
|
}
|
|
|
|
// https://github.com/go-gst/go-gst/issues/165
|
|
func linkProxySink(src, sink *gst.Element) error {
|
|
propType, err := src.GetPropertyType("proxysink")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get proxysink property type: %w", err)
|
|
}
|
|
|
|
val, err := glib.ValueInit(propType)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to init proxysink property value: %w", err)
|
|
}
|
|
|
|
val.SetInstance(sink.GetPrivate())
|
|
|
|
return src.SetPropertyValue("proxysink", val)
|
|
}
|