Compare commits

...
Sign in to create a new pull request.

8 commits

11 changed files with 1117 additions and 24 deletions

View file

@ -21,6 +21,7 @@
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-ugly gst_all_1.gst-plugins-ugly
libnice
gcc glib pkg-config gcc glib pkg-config
ndi ndi

67
internal/api/api.go Normal file
View file

@ -0,0 +1,67 @@
package api
import (
"github.com/go-gst/go-gst/gst"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
)
type API struct {
deviceMonitors map[string]DeviceMonitor
previewPipelines map[string]*PreviewPipeline
}
type DeviceMonitor interface {
GetDevices() []Device
GetDevice(name string) Device
GetDeviceType() string
}
type Device interface {
GetDisplayName() string
GetDeviceType() string
GetProperties() map[string]interface{}
GetOutput() (string, *gst.Element, *gst.Element, error)
}
func New() *API {
return &API{
deviceMonitors: make(map[string]DeviceMonitor),
previewPipelines: make(map[string]*PreviewPipeline),
}
}
func (api *API) RegisterRoutes(g *echo.Group) {
g.Add("GET", "/devices", api.GetDevices)
g.Add("GET", "/preview/:devicemonitor-id/:source", api.previewHandler)
}
func (api *API) RegisterDeviceMonitor(monitors ...DeviceMonitor) {
for _, monitor := range monitors {
api.deviceMonitors[uuid.NewString()] = monitor
}
}
type device struct {
MonitorID string
DeviceType string
DeviceName string
Properties map[string]interface{}
}
func (api *API) GetDevices(c echo.Context) error {
devices := make([]device, 0)
for id, monitor := range api.deviceMonitors {
for _, d := range monitor.GetDevices() {
devices = append(devices, device{
MonitorID: id,
DeviceType: d.GetDeviceType(),
DeviceName: d.GetDisplayName(),
Properties: d.GetProperties(),
})
}
}
return c.JSON(200, devices)
}

485
internal/api/preview.go Normal file
View file

@ -0,0 +1,485 @@
package api
import (
"encoding/json"
"fmt"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/gstsdp"
"github.com/go-gst/go-gst/gst/gstwebrtc"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog/log"
"net/http"
)
const (
previewWidth = 640
previewHeight = 360
previewBitrate = 600 // in kbps
previewFPS = 25
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type PreviewPipeline struct {
outputID string
pipeline *gst.Pipeline
webrtcbin *gst.Element
}
type SignalMessage struct {
Type string `json:"type"`
SDP string `json:"sdp,omitempty"`
ICE *IceCandidate `json:"ice,omitempty"`
}
type IceCandidate struct {
Candidate string `json:"candidate"`
SDPMid string `json:"sdpMid"`
SDPMLineIndex uint16 `json:"sdpMLineIndex"`
}
func (api *API) previewHandler(c echo.Context) error {
devicemonitorId := c.Param("devicemonitor-id")
source := c.Param("source")
streamID := fmt.Sprintf("%s_%s", devicemonitorId, source)
//debug
for id, _ := range api.deviceMonitors {
devicemonitorId = id
}
log.Debug().Str("streamID", streamID).Msg("Preview stream requested")
deviceMonitor, ok := api.deviceMonitors[devicemonitorId]
if !ok {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId))
}
sourceDevice := deviceMonitor.GetDevice(source)
if sourceDevice == nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source))
}
previewPipeline, ok := api.previewPipelines[streamID]
var err error
if !ok {
previewPipeline, err = api.createPreviewPipeline(sourceDevice)
if err != nil {
log.Error().Err(err).Msg("Failed to create preview pipeline")
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline")
}
// TODO lock map while writing
api.previewPipelines[streamID] = previewPipeline
}
if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil {
log.Error().Err(err).Msg("Failed to set pipeline to playing")
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing")
}
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
log.Error().Err(err).Msg("Upgrade error")
return err
}
defer conn.Close()
// --- WebRTC Signal Handling ---
// on-negotiation-needed: Triggered when webrtcbin needs to create an offer or answer.
// In our case (server receiving an offer), this fires after the remote description is set.
if _, err := previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) {
log.Info().Msg("Negotiation needed, creating answer")
// Create Answer
promise := gst.NewPromise()
_, err := self.Emit("create-answer", nil, promise)
if err != nil {
log.Error().Err(err).Msg("Failed to emit create-answer")
return
}
// Handle the promise result (asynchronously)
promise.Interrupt() // Interrupt any previous waits
reply, err := promise.Await(c.Request().Context()) // Wait for the answer to be created
if err != nil {
log.Error().Err(err).Msg("Failed to await create-answer promise")
return
}
if reply == nil {
log.Error().Msg("Promise reply for create-answer was nil")
return
}
answerValue, err := reply.GetValue("answer")
if err != nil || answerValue == nil {
log.Error().Msg("Failed to get answer from promise reply")
return
}
answer, ok := answerValue.(*gstwebrtc.SessionDescription)
if !ok || answer == nil {
log.Error().Msg("Answer value is not a WebRTCSessionDescription")
return
}
log.Debug().Str("sdp", answer.SDP().String()).Msg("Answer created")
// Set Local Description
descPromise := gst.NewPromise()
_, err = self.Emit("set-local-description", answer, descPromise)
if err != nil {
log.Error().Err(err).Msg("Failed to emit set-local-description")
return
}
descPromise.Interrupt() // Interrupt any previous waits
log.Info().Msg("Set local description (answer)")
// Send Answer back to the client
log.Info().Msg("Sending SDP answer back to browser")
response := SignalMessage{
Type: "answer",
SDP: answer.SDP().String(),
}
msg, err := json.Marshal(response)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal answer")
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Error().Err(err).Msg("Failed to send answer over WebSocket")
}
}); err != nil {
log.Error().Err(err).Msg("Failed to connect on-negotiation-needed signal")
}
// on-ice-candidate: Triggered when webrtcbin generates a new ICE candidate.
if _, err := previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string, _ any) {
log.Debug().Uint("mlineindex", mlineindex).Str("candidate", candidate).Msg("🧊 Generated ICE candidate")
// Note: The 'sdpMid' is often derived from the mlineindex or context,
// but the signal itself only provides mlineindex and candidate string directly.
// The JS side seems to handle this structure correctly.
ice := SignalMessage{
Type: "ice",
ICE: &IceCandidate{
Candidate: candidate,
SDPMLineIndex: uint16(mlineindex), // Assuming JS expects uint16
// sdpMid might need to be derived if required, but often index is enough
},
}
msg, err := json.Marshal(ice)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal ICE candidate")
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Error().Err(err).Msg("Failed to send ICE candidate over WebSocket")
}
}); err != nil {
log.Error().Err(err).Msg("Failed to connect on-ice-candidate signal")
}
// --- WebSocket Message Loop ---
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Error().Err(err).Msg("WebSocket read error")
break
}
var signal SignalMessage
if err := json.Unmarshal(message, &signal); err != nil {
log.Warn().Err(err).Msg("Failed to unmarshal signal message")
log.Warn().Err(err).Str("message", string(message)).Msg("Failed to unmarshal signal message")
continue
}
switch signal.Type {
case "offer":
log.Info().Msg("Received SDP offer from browser")
//log.Debug().Str("sdp", signal.SDP).Msg("Offer SDP")
// Create WebRTCSessionDescription for the offer
offerMsg, err := gstsdp.ParseSDPMessage(signal.SDP)
if err != nil {
log.Error().Err(err).Msg("Failed to parse SDP message")
continue
}
offerDesc := gstwebrtc.NewSessionDescription(gstwebrtc.SDP_TYPE_OFFER, offerMsg)
promise := gst.NewPromise()
// Set Remote Description
// This will trigger on-negotiation-needed if successful and state allows
if _, err := previewPipeline.webrtcbin.Emit("set-remote-description", offerDesc, promise); err != nil {
log.Error().Err(err).Msg("Failed to emit set-remote-description")
continue
}
if promise == nil {
log.Error().Msg("Emit set-remote-description returned nil promise")
continue
}
//promise.Interrupt() // Interrupt previous waits if any
promise.Await(c.Request().Context()) // Wait for the remote description to be set
log.Info().Msg("Set remote description (offer)")
// Answer creation is now handled in on-negotiation-needed
case "ice":
if signal.ICE == nil {
log.Warn().Msg("Received ICE signal with nil ICE field")
continue
}
log.Debug().Str("candidate", signal.ICE.Candidate).Uint16("mlineindex", signal.ICE.SDPMLineIndex).Str("sdpMid", signal.ICE.SDPMid).Msg("Received ICE candidate from browser")
// Add ICE Candidate
// Note: The signal takes mlineindex (uint) and candidate (string).
// sdpMid is usually associated but not directly passed here.
_, err := previewPipeline.webrtcbin.Emit("add-ice-candidate", uint(signal.ICE.SDPMLineIndex), signal.ICE.Candidate)
if err != nil {
log.Error().Err(err).Msg("Failed to emit add-ice-candidate")
} else {
log.Debug().Msg("Added ICE candidate")
}
default:
log.Warn().Str("type", signal.Type).Msg("Received unknown signal type")
}
}
return nil
}
func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) {
pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName()))
if err != nil {
return nil, fmt.Errorf("creating pipeline: %w", err)
}
videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src")
if err != nil {
return nil, fmt.Errorf("creating video proxy src: %w", err)
}
if err := pipeline.Add(videoProxySrc); err != nil {
return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err)
}
videoConvertScale, err := gst.NewElement("videoconvertscale")
if err != nil {
return nil, fmt.Errorf("creating video convert scale: %w", err)
}
if err := pipeline.Add(videoConvertScale); err != nil {
return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err)
}
if err := videoProxySrc.Link(videoConvertScale); err != nil {
return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err)
}
videoRate, err := gst.NewElement("videorate")
if err != nil {
return nil, fmt.Errorf("creating video rate: %w", err)
}
if err := pipeline.Add(videoRate); err != nil {
return nil, fmt.Errorf("adding video rate to pipeline: %w", err)
}
if err := videoConvertScale.Link(videoRate); err != nil {
return nil, fmt.Errorf("linking video convert scale to rate: %w", err)
}
convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS))
videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{
"bitrate": previewBitrate,
"speed-preset": 1, // ultrafast
"tune": 0x00000004, // zero-latency
})
if err != nil {
return nil, fmt.Errorf("creating video encoder: %w", err)
}
if err := pipeline.Add(videoEncoder); err != nil {
return nil, fmt.Errorf("adding video encoder to pipeline: %w", err)
}
if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil {
return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err)
}
videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{
"aggregate-mode": 1, // "zero-latency"
"pt": 96,
})
if err != nil {
return nil, fmt.Errorf("creating video payloader: %w", err)
}
if err := pipeline.Add(videoPayloader); err != nil {
return nil, fmt.Errorf("adding video payloader to pipeline: %w", err)
}
if err := videoEncoder.Link(videoPayloader); err != nil {
return nil, fmt.Errorf("linking video encoder to payloader: %w", err)
}
multiQueue, err := gst.NewElement("multiqueue")
if err != nil {
return nil, fmt.Errorf("creating multiqueue: %w", err)
}
if err := pipeline.Add(multiQueue); err != nil {
return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err)
}
videoPayloaderSrcPad := videoPayloader.GetStaticPad("src")
videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK {
return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String())
}
webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{
"stun-server": "stun://stun.l.google.com:19302",
})
if err != nil {
return nil, fmt.Errorf("creating webrtcbin: %w", err)
}
if err := pipeline.Add(webRtcBin); err != nil {
return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err)
}
audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src")
if err != nil {
return nil, fmt.Errorf("creating audio proxy src: %w", err)
}
if err := pipeline.Add(audioProxySrc); err != nil {
return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err)
}
audioConvert, err := gst.NewElement("audioconvert")
if err != nil {
return nil, fmt.Errorf("creating audio convert: %w", err)
}
if err := pipeline.Add(audioConvert); err != nil {
return nil, fmt.Errorf("adding audio convert to pipeline: %w", err)
}
if err := audioProxySrc.Link(audioConvert); err != nil {
return nil, fmt.Errorf("linking audio proxy src to convert: %w", err)
}
audioRate, err := gst.NewElement("audiorate")
if err != nil {
return nil, fmt.Errorf("creating audio rate: %w", err)
}
if err := pipeline.Add(audioRate); err != nil {
return nil, fmt.Errorf("adding audio rate to pipeline: %w", err)
}
if err := audioConvert.Link(audioRate); err != nil {
return nil, fmt.Errorf("linking audio convert to rate: %w", err)
}
audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000")
audioEncoder, err := gst.NewElement("opusenc")
if err != nil {
return nil, fmt.Errorf("creating audio encoder: %w", err)
}
if err := pipeline.Add(audioEncoder); err != nil {
return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err)
}
if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil {
return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err)
}
audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{
"pt": 97,
})
if err != nil {
return nil, fmt.Errorf("creating audio payloader: %w", err)
}
if err := pipeline.Add(audioPayloader); err != nil {
return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err)
}
if err := audioEncoder.Link(audioPayloader); err != nil {
return nil, fmt.Errorf("linking audio encoder to payloader: %w", err)
}
audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
audioPayloaderSrcPad := audioPayloader.GetStaticPad("src")
if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK {
return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String())
}
outputID, videoSrc, audioSrc, err := d.GetOutput()
if err != nil {
return nil, fmt.Errorf("getting output: %w", err)
}
if err := linkProxySink(videoProxySrc, videoSrc); err != nil {
return nil, fmt.Errorf("setting video proxy src proxysink: %w", err)
}
if err := linkProxySink(audioProxySrc, audioSrc); err != nil {
return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err)
}
// Construct the struct before setting state, so we have it in case of errors during state change
// Store the outputID so we can potentially clean up the source proxy later if needed
previewPipeline := &PreviewPipeline{
outputID: outputID,
pipeline: pipeline,
webrtcbin: webRtcBin,
}
// Connect signals before setting state
log.Debug().Str("pipeline", pipeline.GetName()).Msg("Connecting signals")
if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil {
// Attempt cleanup if signal connection fails
pipeline.SetState(gst.StateNull)
// TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID
return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err)
}
// Now set the preview pipeline to playing
log.Info().Str("pipeline", pipeline.GetName()).Msg("Setting preview pipeline state to PLAYING")
if err := pipeline.SetState(gst.StatePlaying); err != nil {
// Attempt cleanup
pipeline.SetState(gst.StateNull)
// TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID
return nil, fmt.Errorf("setting preview pipeline state to playing: %w", err)
}
log.Info().Str("pipeline", pipeline.GetName()).Msg("Preview pipeline state set to PLAYING")
return previewPipeline, nil
}
func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) {
log.Debug().Msg("Multiqueue pad added")
if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK {
log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad")
return
}
}
// https://github.com/go-gst/go-gst/issues/165
func linkProxySink(src, sink *gst.Element) error {
propType, err := src.GetPropertyType("proxysink")
if err != nil {
return fmt.Errorf("failed to get proxysink property type: %w", err)
}
val, err := glib.ValueInit(propType)
if err != nil {
return fmt.Errorf("failed to init proxysink property value: %w", err)
}
val.SetInstance(sink.GetPrivate())
return src.SetPropertyValue("proxysink", val)
}

View file

@ -2,23 +2,8 @@ package gstreamer
import ( import (
"github.com/go-gst/go-gst/gst" "github.com/go-gst/go-gst/gst"
"github.com/goccy/go-graphviz/cgraph"
) )
func gstreamerBinToDot(bin *gst.Bin) string { func gstreamerBinToDot(bin *gst.Bin) string {
return bin.DebugBinToDotData(gst.DebugGraphShowAll) return bin.DebugBinToDotData(gst.DebugGraphShowAll)
} }
func setFontSize(graph *cgraph.Graph, fontSize float64) {
graph.SetFontSize(20)
graph.Set("arrowSize", "3")
for node, err := graph.FirstNode(); err == nil && node != nil; node, err = graph.NextNode(node) {
node.SetFontSize(fontSize)
for edge, err := graph.FirstEdge(node); err == nil && edge != nil; edge, err = graph.NextEdge(edge, node) {
edge.SetFontSize(fontSize)
}
}
//for subGraph, err := graph.FirstSubGraph(); err == nil && subGraph != nil; subGraph, err = graph.NextSubGraph() {
// setFontSize(subGraph, fontSize)
//}
}

285
internal/ndi/device.go Normal file
View file

@ -0,0 +1,285 @@
package ndi
import (
"fmt"
"github.com/go-gst/go-gst/gst"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"os"
"path/filepath"
"sync"
)
type Device struct {
sync.RWMutex
name string
uri string
props map[string]interface{}
available bool
gstDevice *gst.Device
pipeline *gst.Pipeline
videoTee *gst.Element
videoProxy map[string]*gst.Element
audioTee *gst.Element
audioProxy map[string]*gst.Element
}
func newDevice(d *gst.Device) *Device {
name := d.GetDisplayName()
uri := d.GetProperties().Values()["url-address"].(string)
props := d.GetProperties().Values()
device := &Device{
name: name,
uri: uri,
props: props,
available: true,
gstDevice: d,
videoProxy: make(map[string]*gst.Element),
audioProxy: make(map[string]*gst.Element),
}
err := device.createPipeline()
if err != nil {
log.Error().Err(err).Msg("Failed to create pipeline for device")
return nil
}
return device
}
func (d *Device) updateProperties(dgst *gst.Device) {
d.Lock()
defer d.Unlock()
d.name = dgst.GetDisplayName()
d.uri = dgst.GetProperties().Values()["url-address"].(string)
d.props = dgst.GetProperties().Values()
d.available = true
}
func (d *Device) setUnavailable() {
d.Lock()
defer d.Unlock()
d.available = false
}
func (d *Device) GetDisplayName() string {
d.RLock()
defer d.RUnlock()
return d.name
}
func (d *Device) GetDeviceType() string {
return deviceType
}
func (d *Device) GetProperties() map[string]interface{} {
d.RLock()
defer d.RUnlock()
return d.props
}
// TODO handle case where ndidemux src pads are not present
func (d *Device) createPipeline() error {
d.Lock()
defer d.Unlock()
log.Debug().Str("name", d.name).Msg("Creating pipeline for device")
pipeline, err := gst.NewPipeline("ndi-" + d.name)
if err != nil {
return fmt.Errorf("creating pipeline: %w", err)
}
pipeline.ForceClock(gst.ObtainSystemClock().Clock)
input := d.gstDevice.CreateElement("")
if err := pipeline.Add(input); err != nil {
return fmt.Errorf("adding input to pipeline: %w", err)
}
inputQueue, err := gst.NewElementWithProperties("queue2",
map[string]interface{}{
"name": "input-queue",
"max-size-buffers": 0,
"max-size-bytes": 0,
"max-size-time": 40000000, // 40ms
})
if err != nil {
return fmt.Errorf("creating input-queue: %w", err)
}
if err := pipeline.Add(inputQueue); err != nil {
return fmt.Errorf("adding input-queue to pipeline: %w", err)
}
if err := input.Link(inputQueue); err != nil {
return fmt.Errorf("linking input to input input-queue: %w", err)
}
ndiDemuxer, err := gst.NewElement("ndisrcdemux")
if err != nil {
return fmt.Errorf("creating ndisrcdemux: %w", err)
}
if err := pipeline.Add(ndiDemuxer); err != nil {
return fmt.Errorf("adding ndisrcdemux to pipeline: %w", err)
}
if err := inputQueue.Link(ndiDemuxer); err != nil {
return fmt.Errorf("linking input-queue to ndisrcdemux: %w", err)
}
videoTee, err := gst.NewElementWithName("tee", "video-tee")
if err != nil {
return fmt.Errorf("creating video-tee: %w", err)
}
if err := pipeline.Add(videoTee); err != nil {
return fmt.Errorf("adding video-tee to pipeline: %w", err)
}
audioTee, err := gst.NewElementWithName("tee", "audio-tee")
if err != nil {
return fmt.Errorf("creating audio-tee: %w", err)
}
if err := pipeline.Add(audioTee); err != nil {
return fmt.Errorf("adding audio-tee to pipeline: %w", err)
}
d.pipeline = pipeline
d.videoTee = videoTee
d.audioTee = audioTee
if _, err := ndiDemuxer.Connect("pad-added", d.onNdiSrcDemuxerPadAdded); err != nil {
return fmt.Errorf("connecting pad-added signal: %w", err)
}
cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("getting current working directory: %w", err)
}
log.Debug().Str("cwd", cwd).Msg("dumping pipeline to dot file")
filename := filepath.Join(cwd, "dumpDot", "ndi-pipeline.dot")
dot := pipeline.DebugBinToDotData(gst.DebugGraphShowAll)
if err := os.WriteFile(filename, []byte(dot), 0644); err != nil {
return fmt.Errorf("writing dot file: %w", err)
}
log.Debug().Str("name", d.name).Msg("Pipeline created")
return nil
}
func (d *Device) onNdiSrcDemuxerPadAdded(element *gst.Element, pad *gst.Pad) {
// This is called when ndisrcdemux adds a source pad (e.g., "video", "audio")
// We need to link this source pad to the sink pad of the corresponding tee.
log.Debug().Str("pad", pad.GetName()).Msg("NDI demuxer pad added, linking to tee sink")
var teeSinkPad *gst.Pad
var teeName string
switch pad.GetName() {
case "video":
teeSinkPad = d.videoTee.GetStaticPad("sink")
teeName = d.videoTee.GetName()
case "audio":
teeSinkPad = d.audioTee.GetStaticPad("sink")
teeName = d.audioTee.GetName()
default:
log.Warn().Str("pad", pad.GetName()).Msg("Ignoring unknown pad from ndisrcdemux")
return
}
if teeSinkPad == nil {
log.Error().Str("pad", pad.GetName()).Str("tee", teeName).Msg("Failed to get static sink pad from tee")
return
}
// Link the demuxer's source pad to the tee's sink pad
log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Attempting to link NDI demuxer pad to tee")
if plr := pad.Link(teeSinkPad); plr != gst.PadLinkOK {
log.Error().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msgf("Linking NDI demuxer pad to tee failed: %s", plr)
} else {
log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Successfully linked NDI demuxer pad to tee")
}
}
func (d *Device) GetOutput() (string, *gst.Element, *gst.Element, error) {
id, videoElement, audioElement, err := func() (string, *gst.Element, *gst.Element, error) {
d.Lock()
defer d.Unlock()
id := uuid.NewString()
videoProxySink, err := gst.NewElementWithName("proxysink", "video-proxy")
if err != nil {
return "", nil, nil, fmt.Errorf("creating video-proxy sink: %w", err)
}
if err := d.pipeline.Add(videoProxySink); err != nil {
return "", nil, nil, fmt.Errorf("adding video-proxy sink to pipeline: %w", err)
}
if plr := d.videoTee.GetRequestPad("src_%u").Link(videoProxySink.GetStaticPad("sink")); plr != gst.PadLinkOK {
return "", nil, nil, fmt.Errorf("linking video-tee to video-proxy sink: %s", plr)
}
if !videoProxySink.SyncStateWithParent() {
log.Warn().Msg("video-proxy sink sync state not syncable")
}
audioProxySink, err := gst.NewElementWithName("proxysink", "audio-proxy")
if err != nil {
return "", nil, nil, fmt.Errorf("creating audio-proxy sink: %w", err)
}
if err := d.pipeline.Add(audioProxySink); err != nil {
return "", nil, nil, fmt.Errorf("adding audio-proxy sink to pipeline: %w", err)
}
if plr := d.audioTee.GetRequestPad("src_%u").Link(audioProxySink.GetStaticPad("sink")); plr != gst.PadLinkOK {
return "", nil, nil, fmt.Errorf("linking audio-tee to audio-proxy sink: %s", plr)
}
if !audioProxySink.SyncStateWithParent() {
log.Warn().Msg("audio-proxy sink sync state not syncable")
}
d.videoProxy[id] = videoProxySink
d.audioProxy[id] = audioProxySink
return id, videoProxySink, audioProxySink, nil
}()
if err != nil {
return "", nil, nil, fmt.Errorf("creating output: %w", err)
}
if err := d.setPipelineState(gst.StatePlaying); err != nil {
return "", nil, nil, fmt.Errorf("setting pipeline state: %w", err)
}
return id, videoElement, audioElement, nil
}
func (d *Device) setPipelineState(state gst.State) error {
d.Lock()
defer d.Unlock()
if d.pipeline == nil {
return fmt.Errorf("pipeline is nil")
}
if state == d.pipeline.GetCurrentState() {
return nil
}
log.Info().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("Setting pipeline state")
if err := d.pipeline.SetState(state); err != nil {
log.Error().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Err(err).Msg("SetState failed")
return fmt.Errorf("setting pipeline state to %s: %w", state.String(), err)
}
return nil
}

100
internal/ndi/ndi.go Normal file
View file

@ -0,0 +1,100 @@
package ndi
import (
"git.entr0py.de/garionion/catie/internal/api"
"github.com/go-gst/go-gst/gst"
"github.com/rs/zerolog/log"
"time"
)
const deviceType = "NDI"
type NDI struct {
deviceMonitor *gst.DeviceMonitor
devices map[string]*Device
}
func NewNDI() *NDI {
log.Debug().Msg("creating NDI device monitor")
devMon := gst.NewDeviceMonitor()
caps := gst.NewCapsFromString("application/x-ndi")
devMon.AddFilter("Source/Network", caps)
n := &NDI{
deviceMonitor: devMon,
devices: make(map[string]*Device),
}
bus := devMon.GetBus()
bus.AddWatch(func(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageDeviceAdded:
device := msg.ParseDeviceAdded()
n.createOrUpdateDevice(device)
log.Debug().Str("message", device.GetDisplayName()).Fields(device.GetProperties().Values()).Msg("NDI Device Added")
case gst.MessageDeviceRemoved:
device := msg.ParseDeviceRemoved()
n.setDeviceUnavailable(device)
log.Debug().Str("message", device.GetDisplayName()).Fields(device.GetProperties().Values()).Msg("NDI Device Removed")
default:
log.Debug().Str("msgType", msg.TypeName()).Str("message", msg.String()).Msg("default")
}
return true
})
log.Debug().Msg("starting NDI device monitor")
devMon.Start()
return n
}
func (n *NDI) GetDeviceType() string {
return deviceType
}
func (n *NDI) GetDevices() []api.Device {
devices := make([]api.Device, 0, len(n.devices))
for _, d := range n.devices {
devices = append(devices, d)
}
return devices
}
func (n *NDI) GetDevice(name string) api.Device {
if device, ok := n.devices[name]; ok {
return device
}
return nil
}
func (n *NDI) createOrUpdateDevice(d *gst.Device) {
name := d.GetDisplayName()
if device, ok := n.devices[name]; ok {
device.updateProperties(d)
return
}
n.devices[name] = newDevice(d)
}
func (n *NDI) setDeviceUnavailable(d *gst.Device) {
if device, ok := n.devices[d.GetDisplayName()]; ok {
device.setUnavailable()
}
}
func (n *NDI) DiscoverSources() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
devices := n.deviceMonitor.GetDevices()
for _, device := range devices {
log.Debug().Fields(device.GetProperties().Values()).Msg("discovering sources")
}
}
}

View file

@ -88,6 +88,8 @@ func main() {
wg.Done() wg.Done()
}*/ }*/
e.Static("/", "webRTC-test")
log.Info().Str("address", cfg.ListenAddr).Msg("starting web server") log.Info().Str("address", cfg.ListenAddr).Msg("starting web server")
if err := e.Start(cfg.ListenAddr); err != nil { if err := e.Start(cfg.ListenAddr); err != nil {
log.Fatal().Err(err).Msg("web server failed") log.Fatal().Err(err).Msg("web server failed")

View file

@ -8,12 +8,14 @@ let
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-ugly gst_all_1.gst-plugins-ugly
libnice
] + ":" + lib.makeLibraryPath [ ] + ":" + lib.makeLibraryPath [
gst_all_1.gstreamer.out gst_all_1.gstreamer.out
gst_all_1.gst-plugins-base gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-ugly gst_all_1.gst-plugins-ugly
libnice
]; ];
in in
pkgs.buildGo123Module { pkgs.buildGo123Module {
@ -29,6 +31,7 @@ pkgs.buildGo123Module {
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-ugly gst_all_1.gst-plugins-ugly
libnice
glib glib
ndi ndi
]; ];
@ -39,19 +42,11 @@ pkgs.buildGo123Module {
makeWrapper makeWrapper
]; ];
ldflags = [
"-X git.entr0py.de/garionion/catie/internal/ndi.NDI_LIB_PATH=${pkgs.ndi}"
];
buildFlags = [
"CGO_CFLAGS=-I${pkgs.ndi}/include"
"CGO_LDFLAGS=-L${pkgs.ndi}/lib -lndi"
];
postInstall = '' postInstall = ''
wrapProgram $out/bin/catie \ wrapProgram $out/bin/catie \
--set GST_PLUGIN_SYSTEM_PATH_1_0 ${gstPluginPath} \ --set GST_PLUGIN_SYSTEM_PATH_1_0 ${gstPluginPath} \
--set GST_PLUGIN_PATH_1_0 ${gstPluginPath} --set GST_PLUGIN_PATH_1_0 ${gstPluginPath}
--set NDI_RUNTIME_DIR_V5 ${lib.makeLibraryPath [ ndi ]} \
''; '';
tags = [ ]; tags = [ ];

15
webRTC-test/index.html Normal file
View file

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebRTC Video Preview</title>
<link rel="stylesheet" href="style.css">
</head>
<body>
<h1>WebRTC Video Stream</h1>
<video id="video" autoplay playsinline></video>
<script src="script.js"></script>
</body>
</html>

143
webRTC-test/script.js Normal file
View file

@ -0,0 +1,143 @@
const videoElement = document.getElementById('video');
let peerConnection; // Renamed for clarity
let wsConnection;
// Removed mediaConstraints and iceCandidates (not needed here)
const wsUrl = "ws://localhost:8080/api/v1/preview/5a59a94f-264a-4d24-9e57-8a9be0ecbdbd/KALANI-3.LOCAL%20%28OBS%29"; // Replace with your WebSocket URL if necessary
const peerConnectionConfig = {
iceServers: [{ urls: "stun:stun.l.google.com:19302" }]
};
// Initialize WebSocket connection
function initWebSocket() {
wsConnection = new WebSocket(wsUrl);
wsConnection.onopen = () => {
console.log('Connected to WebSocket');
createOffer();
};
wsConnection.onmessage = async (message) => {
const signal = JSON.parse(message.data);
console.log("Received signal:", signal); // Add logging
// The browser initiates with an offer, so it expects an answer or ICE candidates.
// It should not receive an 'offer'.
if (signal.type === 'answer') {
handleAnswer(signal);
} else if (signal.type === 'ice') {
handleIceCandidate(signal);
} else {
console.warn("Received unexpected signal type:", signal.type);
}
};
wsConnection.onerror = (error) => {
console.error('WebSocket Error:', error);
};
wsConnection.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason);
};
}
// Create an SDP offer to receive media
async function createOffer() {
peerConnection = new RTCPeerConnection(peerConnectionConfig);
// Set up event handlers
peerConnection.onicecandidate = handleIceCandidateEvent;
peerConnection.ontrack = (event) => {
console.log("Track received:", event.track, "Stream:", event.streams[0]);
if (videoElement.srcObject !== event.streams[0]) {
videoElement.srcObject = event.streams[0];
console.log("Assigned stream to video element");
}
};
peerConnection.oniceconnectionstatechange = () => {
console.log("ICE connection state:", peerConnection.iceConnectionState);
};
peerConnection.onconnectionstatechange = () => {
console.log("Peer connection state:", peerConnection.connectionState);
};
// Configure to receive video and audio
peerConnection.addTransceiver('video', { direction: 'recvonly' });
peerConnection.addTransceiver('audio', { direction: 'recvonly' });
try {
console.log("Creating offer...");
const offer = await peerConnection.createOffer();
await peerConnection.setLocalDescription(offer);
console.log("Offer created and set as local description:", offer);
const signalMessage = {
type: 'offer',
sdp: offer.sdp
};
console.log("Sending offer via WebSocket:", signalMessage);
wsConnection.send(JSON.stringify(signalMessage));
} catch (error) {
console.error("Error creating offer:", error);
}
}
// Handle SDP answer from the server
async function handleAnswer(signal) {
console.log("Handling answer:", signal);
const remoteDescription = new RTCSessionDescription({
type: 'answer',
sdp: signal.sdp
});
try {
await peerConnection.setRemoteDescription(remoteDescription);
console.log("Remote description (answer) set successfully.");
} catch (error) {
console.error("Error setting remote description (answer):", error);
}
}
// Handle ICE candidates generated locally (send to server)
function handleIceCandidateEvent(event) {
console.log("Local ICE candidate event:", event);
if (event.candidate) {
console.log("Sending ICE candidate:", event.candidate);
const iceMessage = {
type: 'ice',
ice: {
candidate: event.candidate.candidate,
sdpMid: event.candidate.sdpMid,
sdpMLineIndex: event.candidate.sdpMLineIndex
}
};
wsConnection.send(JSON.stringify(iceMessage));
} else {
console.log("All local ICE candidates have been sent.");
}
}
// Handle ICE candidate received from the server
async function handleIceCandidate(signal) {
console.log("Handling received ICE candidate:", signal);
if (!signal.ice || !signal.ice.candidate) {
console.warn("Received incomplete ICE candidate signal:", signal);
return;
}
try {
const candidate = new RTCIceCandidate({
candidate: signal.ice.candidate,
sdpMid: signal.ice.sdpMid,
sdpMLineIndex: signal.ice.sdpMLineIndex
});
await peerConnection.addIceCandidate(candidate);
console.log("Added received ICE candidate successfully.");
} catch (error) {
console.error("Error adding received ICE candidate:", error);
}
}
// Initialize the connection when the page loads
window.onload = initWebSocket;

15
webRTC-test/style.css Normal file
View file

@ -0,0 +1,15 @@
body {
font-family: Arial, sans-serif;
text-align: center;
}
h1 {
color: #4CAF50;
}
#video {
width: 100%;
max-width: 600px;
border: 1px solid #ddd;
margin-top: 20px;
}