feat: add api and ndi device monitor with webrtc preview pipeline

This commit is contained in:
gari 2025-04-13 10:42:42 +02:00
parent f6be41cb9e
commit c3a187d580
7 changed files with 768 additions and 9 deletions

67
internal/api/api.go Normal file
View file

@ -0,0 +1,67 @@
package api
import (
"github.com/go-gst/go-gst/gst"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
)
type API struct {
deviceMonitors map[string]DeviceMonitor
previewPipelines map[string]*PreviewPipeline
}
type DeviceMonitor interface {
GetDevices() []Device
GetDevice(name string) Device
GetDeviceType() string
}
type Device interface {
GetDisplayName() string
GetDeviceType() string
GetProperties() map[string]interface{}
GetOutput() (string, *gst.Element, *gst.Element, error)
}
func New() *API {
return &API{
deviceMonitors: make(map[string]DeviceMonitor),
previewPipelines: make(map[string]*PreviewPipeline),
}
}
func (api *API) RegisterRoutes(g *echo.Group) {
g.Add("GET", "/devices", api.GetDevices)
g.Add("GET", "/preview/:devicemonitor-id/:source", api.previewHandler)
}
func (api *API) RegisterDeviceMonitor(monitors ...DeviceMonitor) {
for _, monitor := range monitors {
api.deviceMonitors[uuid.NewString()] = monitor
}
}
type device struct {
MonitorID string
DeviceType string
DeviceName string
Properties map[string]interface{}
}
func (api *API) GetDevices(c echo.Context) error {
devices := make([]device, 0)
for id, monitor := range api.deviceMonitors {
for _, d := range monitor.GetDevices() {
devices = append(devices, device{
MonitorID: id,
DeviceType: d.GetDeviceType(),
DeviceName: d.GetDisplayName(),
Properties: d.GetProperties(),
})
}
}
return c.JSON(200, devices)
}

362
internal/api/preview.go Normal file
View file

@ -0,0 +1,362 @@
package api
import (
"encoding/json"
"fmt"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog/log"
"net/http"
)
const (
previewWidth = 640
previewHeight = 360
previewBitrate = 600 // in kbps
previewFPS = 25
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type PreviewPipeline struct {
outputID string
pipeline *gst.Pipeline
webrtcbin *gst.Element
}
type SignalMessage struct {
Type string `json:"type"`
SDP string `json:"sdp,omitempty"`
ICE *IceCandidate `json:"ice,omitempty"`
}
type IceCandidate struct {
Candidate string `json:"candidate"`
SDPMid string `json:"sdpMid"`
SDPMLineIndex uint16 `json:"sdpMLineIndex"`
}
func (api *API) previewHandler(c echo.Context) error {
devicemonitorId := c.Param("devicemonitor-id")
source := c.Param("source")
streamID := fmt.Sprintf("%s_%s", devicemonitorId, source)
//debug
for id, _ := range api.deviceMonitors {
devicemonitorId = id
}
log.Debug().Str("streamID", streamID).Msg("Preview stream requested")
deviceMonitor, ok := api.deviceMonitors[devicemonitorId]
if !ok {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId))
}
sourceDevice := deviceMonitor.GetDevice(source)
if sourceDevice == nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source))
}
previewPipeline, ok := api.previewPipelines[streamID]
var err error
if !ok {
previewPipeline, err = api.createPreviewPipeline(sourceDevice)
if err != nil {
log.Error().Err(err).Msg("Failed to create preview pipeline")
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline")
}
// TODO lock map while writing
api.previewPipelines[streamID] = previewPipeline
}
if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil {
log.Error().Err(err).Msg("Failed to set pipeline to playing")
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing")
}
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
log.Error().Err(err).Msg("Upgrade error")
return err
}
defer conn.Close()
previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) {
log.Debug().Msg("🧠 Negotiation needed (ignored)")
})
previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex int, candidate string) {
ice := SignalMessage{
Type: "ice",
ICE: &IceCandidate{
Candidate: candidate,
SDPMLineIndex: uint16(mlineindex),
},
}
msg, _ := json.Marshal(ice)
conn.WriteMessage(websocket.TextMessage, msg)
})
previewPipeline.webrtcbin.Connect("on-sdp-answer", func(self *gst.Element, answer string) {
log.Info().Msg("📨 Sending SDP answer back to browser")
response := SignalMessage{
Type: "answer",
SDP: answer,
}
msg, _ := json.Marshal(response)
conn.WriteMessage(websocket.TextMessage, msg)
})
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Error().Err(err).Msg("WebSocket read error")
break
}
var signal SignalMessage
if err := json.Unmarshal(message, &signal); err != nil {
log.Warn().Err(err).Msg("Failed to unmarshal signal message")
continue
}
if signal.Type == "offer" {
log.Info().Msg("📥 Received SDP offer from browser")
// Pass the SDP offer to webrtcbin
previewPipeline.webrtcbin.Emit("set-remote-description", "offer", signal.SDP)
// Create an answer
previewPipeline.webrtcbin.Emit("create-answer")
log.Debug().Msg("➡️ Sent SDP offer into webrtcbin and requested answer")
}
if signal.Type == "ice" && signal.ICE != nil {
log.Debug().Str("candidate", signal.ICE.Candidate).Msg(" Adding ICE candidate")
previewPipeline.webrtcbin.Emit("add-ice-candidate", signal.ICE.SDPMLineIndex, signal.ICE.Candidate)
}
}
return nil
}
func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) {
pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName()))
if err != nil {
return nil, fmt.Errorf("creating pipeline: %w", err)
}
videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src")
if err != nil {
return nil, fmt.Errorf("creating video proxy src: %w", err)
}
if err := pipeline.Add(videoProxySrc); err != nil {
return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err)
}
videoConvertScale, err := gst.NewElement("videoconvertscale")
if err != nil {
return nil, fmt.Errorf("creating video convert scale: %w", err)
}
if err := pipeline.Add(videoConvertScale); err != nil {
return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err)
}
if err := videoProxySrc.Link(videoConvertScale); err != nil {
return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err)
}
videoRate, err := gst.NewElement("videorate")
if err != nil {
return nil, fmt.Errorf("creating video rate: %w", err)
}
if err := pipeline.Add(videoRate); err != nil {
return nil, fmt.Errorf("adding video rate to pipeline: %w", err)
}
if err := videoConvertScale.Link(videoRate); err != nil {
return nil, fmt.Errorf("linking video convert scale to rate: %w", err)
}
convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS))
log.Debug().Str("caps", convertCaps.String()).Msg("Video convert caps")
videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{
"bitrate": previewBitrate,
"speed-preset": 1, // ultrafast
"tune": 0x00000004, // zero-latency
})
if err != nil {
return nil, fmt.Errorf("creating video encoder: %w", err)
}
if err := pipeline.Add(videoEncoder); err != nil {
return nil, fmt.Errorf("adding video encoder to pipeline: %w", err)
}
if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil {
return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err)
}
videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{
"aggregate-mode": 1, // "zero-latency"
"pt": 96,
})
if err != nil {
return nil, fmt.Errorf("creating video payloader: %w", err)
}
if err := pipeline.Add(videoPayloader); err != nil {
return nil, fmt.Errorf("adding video payloader to pipeline: %w", err)
}
if err := videoEncoder.Link(videoPayloader); err != nil {
return nil, fmt.Errorf("linking video encoder to payloader: %w", err)
}
multiQueue, err := gst.NewElement("multiqueue")
if err != nil {
return nil, fmt.Errorf("creating multiqueue: %w", err)
}
if err := pipeline.Add(multiQueue); err != nil {
return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err)
}
videoPayloaderSrcPad := videoPayloader.GetStaticPad("src")
videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK {
return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String())
}
webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{
"stun-server": "stun://stun.l.google.com:19302",
})
if err != nil {
return nil, fmt.Errorf("creating webrtcbin: %w", err)
}
if err := pipeline.Add(webRtcBin); err != nil {
return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err)
}
audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src")
if err != nil {
return nil, fmt.Errorf("creating audio proxy src: %w", err)
}
if err := pipeline.Add(audioProxySrc); err != nil {
return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err)
}
audioConvert, err := gst.NewElement("audioconvert")
if err != nil {
return nil, fmt.Errorf("creating audio convert: %w", err)
}
if err := pipeline.Add(audioConvert); err != nil {
return nil, fmt.Errorf("adding audio convert to pipeline: %w", err)
}
if err := audioProxySrc.Link(audioConvert); err != nil {
return nil, fmt.Errorf("linking audio proxy src to convert: %w", err)
}
audioRate, err := gst.NewElement("audiorate")
if err != nil {
return nil, fmt.Errorf("creating audio rate: %w", err)
}
if err := pipeline.Add(audioRate); err != nil {
return nil, fmt.Errorf("adding audio rate to pipeline: %w", err)
}
if err := audioConvert.Link(audioRate); err != nil {
return nil, fmt.Errorf("linking audio convert to rate: %w", err)
}
audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000")
audioEncoder, err := gst.NewElement("opusenc")
if err != nil {
return nil, fmt.Errorf("creating audio encoder: %w", err)
}
if err := pipeline.Add(audioEncoder); err != nil {
return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err)
}
if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil {
return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err)
}
audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{
"pt": 97,
})
if err != nil {
return nil, fmt.Errorf("creating audio payloader: %w", err)
}
if err := pipeline.Add(audioPayloader); err != nil {
return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err)
}
if err := audioEncoder.Link(audioPayloader); err != nil {
return nil, fmt.Errorf("linking audio encoder to payloader: %w", err)
}
audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
audioPayloaderSrcPad := audioPayloader.GetStaticPad("src")
if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK {
return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String())
}
outputID, videoSrc, audioSrc, err := d.GetOutput()
if err != nil {
return nil, fmt.Errorf("getting output: %w", err)
}
if err := linkProxySink(videoProxySrc, videoSrc); err != nil {
return nil, fmt.Errorf("setting video proxy src proxysink: %w", err)
}
if err := linkProxySink(audioProxySrc, audioSrc); err != nil {
return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err)
}
if err := pipeline.SetState(gst.StatePlaying); err != nil {
return nil, fmt.Errorf("setting pipeline state: %w", err)
}
previewPipeline := &PreviewPipeline{
outputID: outputID,
pipeline: pipeline,
webrtcbin: webRtcBin,
}
if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil {
return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err)
}
return previewPipeline, nil
}
func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) {
log.Debug().Msg("Multiqueue pad added")
if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK {
log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad")
return
}
}
// https://github.com/go-gst/go-gst/issues/165
func linkProxySink(src, sink *gst.Element) error {
propType, err := src.GetPropertyType("proxysink")
if err != nil {
return fmt.Errorf("failed to get proxysink property type: %w", err)
}
val, err := glib.ValueInit(propType)
if err != nil {
return fmt.Errorf("failed to init proxysink property value: %w", err)
}
val.SetInstance(sink.GetPrivate())
return src.SetPropertyValue("proxysink", val)
}