From c3a187d5802366245ddb7fb05da8375b9c129495 Mon Sep 17 00:00:00 2001 From: gari Date: Sun, 13 Apr 2025 10:42:42 +0200 Subject: [PATCH] feat: add api and ndi device monitor with webrtc preview pipeline --- flake.nix | 1 + internal/api/api.go | 67 ++++++++ internal/api/preview.go | 362 ++++++++++++++++++++++++++++++++++++++++ internal/ndi/device.go | 232 +++++++++++++++++++++++++ internal/ndi/ndi.go | 100 +++++++++++ main.go | 2 + package.nix | 13 +- 7 files changed, 768 insertions(+), 9 deletions(-) create mode 100644 internal/api/api.go create mode 100644 internal/api/preview.go create mode 100644 internal/ndi/device.go create mode 100644 internal/ndi/ndi.go diff --git a/flake.nix b/flake.nix index e3bab37..ff6e5bc 100644 --- a/flake.nix +++ b/flake.nix @@ -21,6 +21,7 @@ gst_all_1.gst-plugins-good gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-ugly + libnice gcc glib pkg-config ndi diff --git a/internal/api/api.go b/internal/api/api.go new file mode 100644 index 0000000..a2612c4 --- /dev/null +++ b/internal/api/api.go @@ -0,0 +1,67 @@ +package api + +import ( + "github.com/go-gst/go-gst/gst" + "github.com/google/uuid" + "github.com/labstack/echo/v4" +) + +type API struct { + deviceMonitors map[string]DeviceMonitor + previewPipelines map[string]*PreviewPipeline +} + +type DeviceMonitor interface { + GetDevices() []Device + GetDevice(name string) Device + GetDeviceType() string +} + +type Device interface { + GetDisplayName() string + GetDeviceType() string + GetProperties() map[string]interface{} + GetOutput() (string, *gst.Element, *gst.Element, error) +} + +func New() *API { + return &API{ + deviceMonitors: make(map[string]DeviceMonitor), + previewPipelines: make(map[string]*PreviewPipeline), + } +} + +func (api *API) RegisterRoutes(g *echo.Group) { + g.Add("GET", "/devices", api.GetDevices) + g.Add("GET", "/preview/:devicemonitor-id/:source", api.previewHandler) +} + +func (api *API) RegisterDeviceMonitor(monitors ...DeviceMonitor) { + for _, monitor := range monitors { + api.deviceMonitors[uuid.NewString()] = monitor + } +} + +type device struct { + MonitorID string + DeviceType string + DeviceName string + Properties map[string]interface{} +} + +func (api *API) GetDevices(c echo.Context) error { + devices := make([]device, 0) + + for id, monitor := range api.deviceMonitors { + for _, d := range monitor.GetDevices() { + devices = append(devices, device{ + MonitorID: id, + DeviceType: d.GetDeviceType(), + DeviceName: d.GetDisplayName(), + Properties: d.GetProperties(), + }) + } + } + + return c.JSON(200, devices) +} diff --git a/internal/api/preview.go b/internal/api/preview.go new file mode 100644 index 0000000..3f0220e --- /dev/null +++ b/internal/api/preview.go @@ -0,0 +1,362 @@ +package api + +import ( + "encoding/json" + "fmt" + "github.com/go-gst/go-glib/glib" + "github.com/go-gst/go-gst/gst" + "github.com/gorilla/websocket" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog/log" + "net/http" +) + +const ( + previewWidth = 640 + previewHeight = 360 + previewBitrate = 600 // in kbps + previewFPS = 25 +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type PreviewPipeline struct { + outputID string + + pipeline *gst.Pipeline + webrtcbin *gst.Element +} + +type SignalMessage struct { + Type string `json:"type"` + SDP string `json:"sdp,omitempty"` + ICE *IceCandidate `json:"ice,omitempty"` +} + +type IceCandidate struct { + Candidate string `json:"candidate"` + SDPMid string `json:"sdpMid"` + SDPMLineIndex uint16 `json:"sdpMLineIndex"` +} + +func (api *API) previewHandler(c echo.Context) error { + devicemonitorId := c.Param("devicemonitor-id") + source := c.Param("source") + streamID := fmt.Sprintf("%s_%s", devicemonitorId, source) + + //debug + for id, _ := range api.deviceMonitors { + devicemonitorId = id + } + + log.Debug().Str("streamID", streamID).Msg("Preview stream requested") + + deviceMonitor, ok := api.deviceMonitors[devicemonitorId] + if !ok { + return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId)) + } + + sourceDevice := deviceMonitor.GetDevice(source) + if sourceDevice == nil { + return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source)) + } + + previewPipeline, ok := api.previewPipelines[streamID] + var err error + if !ok { + previewPipeline, err = api.createPreviewPipeline(sourceDevice) + if err != nil { + log.Error().Err(err).Msg("Failed to create preview pipeline") + return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline") + } + + // TODO lock map while writing + api.previewPipelines[streamID] = previewPipeline + } + if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil { + log.Error().Err(err).Msg("Failed to set pipeline to playing") + return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing") + } + + conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) + if err != nil { + log.Error().Err(err).Msg("Upgrade error") + return err + } + defer conn.Close() + + previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) { + log.Debug().Msg("🧠 Negotiation needed (ignored)") + }) + + previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex int, candidate string) { + ice := SignalMessage{ + Type: "ice", + ICE: &IceCandidate{ + Candidate: candidate, + SDPMLineIndex: uint16(mlineindex), + }, + } + msg, _ := json.Marshal(ice) + conn.WriteMessage(websocket.TextMessage, msg) + }) + + previewPipeline.webrtcbin.Connect("on-sdp-answer", func(self *gst.Element, answer string) { + log.Info().Msg("📨 Sending SDP answer back to browser") + response := SignalMessage{ + Type: "answer", + SDP: answer, + } + msg, _ := json.Marshal(response) + conn.WriteMessage(websocket.TextMessage, msg) + }) + + for { + _, message, err := conn.ReadMessage() + if err != nil { + log.Error().Err(err).Msg("WebSocket read error") + break + } + var signal SignalMessage + if err := json.Unmarshal(message, &signal); err != nil { + log.Warn().Err(err).Msg("Failed to unmarshal signal message") + continue + } + + if signal.Type == "offer" { + log.Info().Msg("📥 Received SDP offer from browser") + // Pass the SDP offer to webrtcbin + previewPipeline.webrtcbin.Emit("set-remote-description", "offer", signal.SDP) + + // Create an answer + previewPipeline.webrtcbin.Emit("create-answer") + log.Debug().Msg("➡️ Sent SDP offer into webrtcbin and requested answer") + } + + if signal.Type == "ice" && signal.ICE != nil { + log.Debug().Str("candidate", signal.ICE.Candidate).Msg("➕ Adding ICE candidate") + previewPipeline.webrtcbin.Emit("add-ice-candidate", signal.ICE.SDPMLineIndex, signal.ICE.Candidate) + } + } + + return nil +} + +func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) { + pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName())) + if err != nil { + return nil, fmt.Errorf("creating pipeline: %w", err) + } + + videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src") + if err != nil { + return nil, fmt.Errorf("creating video proxy src: %w", err) + } + if err := pipeline.Add(videoProxySrc); err != nil { + return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err) + } + + videoConvertScale, err := gst.NewElement("videoconvertscale") + if err != nil { + return nil, fmt.Errorf("creating video convert scale: %w", err) + } + if err := pipeline.Add(videoConvertScale); err != nil { + return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err) + } + + if err := videoProxySrc.Link(videoConvertScale); err != nil { + return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err) + } + + videoRate, err := gst.NewElement("videorate") + if err != nil { + return nil, fmt.Errorf("creating video rate: %w", err) + } + if err := pipeline.Add(videoRate); err != nil { + return nil, fmt.Errorf("adding video rate to pipeline: %w", err) + } + + if err := videoConvertScale.Link(videoRate); err != nil { + return nil, fmt.Errorf("linking video convert scale to rate: %w", err) + } + + convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS)) + log.Debug().Str("caps", convertCaps.String()).Msg("Video convert caps") + + videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{ + "bitrate": previewBitrate, + "speed-preset": 1, // ultrafast + "tune": 0x00000004, // zero-latency + }) + if err != nil { + return nil, fmt.Errorf("creating video encoder: %w", err) + } + if err := pipeline.Add(videoEncoder); err != nil { + return nil, fmt.Errorf("adding video encoder to pipeline: %w", err) + } + + if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil { + return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err) + } + + videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{ + "aggregate-mode": 1, // "zero-latency" + "pt": 96, + }) + if err != nil { + return nil, fmt.Errorf("creating video payloader: %w", err) + } + if err := pipeline.Add(videoPayloader); err != nil { + return nil, fmt.Errorf("adding video payloader to pipeline: %w", err) + } + + if err := videoEncoder.Link(videoPayloader); err != nil { + return nil, fmt.Errorf("linking video encoder to payloader: %w", err) + } + + multiQueue, err := gst.NewElement("multiqueue") + if err != nil { + return nil, fmt.Errorf("creating multiqueue: %w", err) + } + if err := pipeline.Add(multiQueue); err != nil { + return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err) + } + + videoPayloaderSrcPad := videoPayloader.GetStaticPad("src") + videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u") + + if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK { + return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String()) + } + + webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{ + "stun-server": "stun://stun.l.google.com:19302", + }) + if err != nil { + return nil, fmt.Errorf("creating webrtcbin: %w", err) + } + if err := pipeline.Add(webRtcBin); err != nil { + return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err) + } + + audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src") + if err != nil { + return nil, fmt.Errorf("creating audio proxy src: %w", err) + } + if err := pipeline.Add(audioProxySrc); err != nil { + return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err) + } + + audioConvert, err := gst.NewElement("audioconvert") + if err != nil { + return nil, fmt.Errorf("creating audio convert: %w", err) + } + if err := pipeline.Add(audioConvert); err != nil { + return nil, fmt.Errorf("adding audio convert to pipeline: %w", err) + } + + if err := audioProxySrc.Link(audioConvert); err != nil { + return nil, fmt.Errorf("linking audio proxy src to convert: %w", err) + } + + audioRate, err := gst.NewElement("audiorate") + if err != nil { + return nil, fmt.Errorf("creating audio rate: %w", err) + } + if err := pipeline.Add(audioRate); err != nil { + return nil, fmt.Errorf("adding audio rate to pipeline: %w", err) + } + + if err := audioConvert.Link(audioRate); err != nil { + return nil, fmt.Errorf("linking audio convert to rate: %w", err) + } + + audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000") + + audioEncoder, err := gst.NewElement("opusenc") + if err != nil { + return nil, fmt.Errorf("creating audio encoder: %w", err) + } + if err := pipeline.Add(audioEncoder); err != nil { + return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err) + } + + if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil { + return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err) + } + + audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{ + "pt": 97, + }) + if err != nil { + return nil, fmt.Errorf("creating audio payloader: %w", err) + } + if err := pipeline.Add(audioPayloader); err != nil { + return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err) + } + + if err := audioEncoder.Link(audioPayloader); err != nil { + return nil, fmt.Errorf("linking audio encoder to payloader: %w", err) + } + + audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u") + audioPayloaderSrcPad := audioPayloader.GetStaticPad("src") + if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK { + return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String()) + } + + outputID, videoSrc, audioSrc, err := d.GetOutput() + if err != nil { + return nil, fmt.Errorf("getting output: %w", err) + } + if err := linkProxySink(videoProxySrc, videoSrc); err != nil { + return nil, fmt.Errorf("setting video proxy src proxysink: %w", err) + } + if err := linkProxySink(audioProxySrc, audioSrc); err != nil { + return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err) + } + + if err := pipeline.SetState(gst.StatePlaying); err != nil { + return nil, fmt.Errorf("setting pipeline state: %w", err) + } + previewPipeline := &PreviewPipeline{ + outputID: outputID, + pipeline: pipeline, + webrtcbin: webRtcBin, + } + + if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil { + return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err) + } + + return previewPipeline, nil +} + +func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) { + log.Debug().Msg("Multiqueue pad added") + if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK { + log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad") + return + } +} + +// https://github.com/go-gst/go-gst/issues/165 +func linkProxySink(src, sink *gst.Element) error { + propType, err := src.GetPropertyType("proxysink") + if err != nil { + return fmt.Errorf("failed to get proxysink property type: %w", err) + } + + val, err := glib.ValueInit(propType) + if err != nil { + return fmt.Errorf("failed to init proxysink property value: %w", err) + } + + val.SetInstance(sink.GetPrivate()) + + return src.SetPropertyValue("proxysink", val) +} diff --git a/internal/ndi/device.go b/internal/ndi/device.go new file mode 100644 index 0000000..c8132a4 --- /dev/null +++ b/internal/ndi/device.go @@ -0,0 +1,232 @@ +package ndi + +import ( + "fmt" + "github.com/go-gst/go-gst/gst" + "github.com/google/uuid" + "github.com/rs/zerolog/log" + "sync" +) + +type Device struct { + sync.RWMutex + + name string + uri string + props map[string]interface{} + + available bool + + gstDevice *gst.Device + + pipeline *gst.Pipeline + videoTee *gst.Element + videoProxy map[string]*gst.Element + audioTee *gst.Element + audioProxy map[string]*gst.Element +} + +func newDevice(d *gst.Device) *Device { + name := d.GetDisplayName() + uri := d.GetProperties().Values()["url-address"].(string) + props := d.GetProperties().Values() + + device := &Device{ + name: name, + uri: uri, + props: props, + available: true, + + gstDevice: d, + + videoProxy: make(map[string]*gst.Element), + audioProxy: make(map[string]*gst.Element), + } + + err := device.createPipeline() + if err != nil { + log.Error().Err(err).Msg("Failed to create pipeline for device") + return nil + } + + return device +} + +func (d *Device) updateProperties(dgst *gst.Device) { + d.Lock() + defer d.Unlock() + + d.name = dgst.GetDisplayName() + d.uri = dgst.GetProperties().Values()["url-address"].(string) + d.props = dgst.GetProperties().Values() + d.available = true +} + +func (d *Device) setUnavailable() { + d.Lock() + defer d.Unlock() + + d.available = false +} + +func (d *Device) GetDisplayName() string { + d.RLock() + defer d.RUnlock() + + return d.name +} + +func (d *Device) GetDeviceType() string { + return deviceType +} + +func (d *Device) GetProperties() map[string]interface{} { + d.RLock() + defer d.RUnlock() + + return d.props +} + +// TODO handle case where ndidemux src pads are not present +func (d *Device) createPipeline() error { + d.Lock() + defer d.Unlock() + + pipeline, err := gst.NewPipeline("ndi-" + d.name) + if err != nil { + return fmt.Errorf("creating pipeline: %w", err) + } + pipeline.ForceClock(gst.ObtainSystemClock().Clock) + + input := d.gstDevice.CreateElement("") + inputQueue, err := gst.NewElementWithProperties("queue2", + map[string]interface{}{ + "name": "input-queue", + "max-size-buffers": 0, + "max-size-bytes": 0, + "max-size-time": 40000000, // 40ms + }) + if err := input.Link(inputQueue); err != nil { + return fmt.Errorf("linking input to input input-queue: %w", err) + } + + ndiDemuxer, err := gst.NewElement("ndisrcdemux") + if err != nil { + return fmt.Errorf("creating ndisrcdemux: %w", err) + } + + if err := inputQueue.Link(ndiDemuxer); err != nil { + return fmt.Errorf("linking input-queue to ndisrcdemux: %w", err) + } + + videoTee, err := gst.NewElementWithName("tee", "video-tee") + if err != nil { + return fmt.Errorf("creating video-tee: %w", err) + } + + audioTee, err := gst.NewElementWithName("tee", "audio-tee") + if err != nil { + return fmt.Errorf("creating audio-tee: %w", err) + } + + if err := pipeline.AddMany(input, ndiDemuxer, videoTee, audioTee); err != nil { + return fmt.Errorf("adding elements to pipeline: %w", err) + } + + d.pipeline = pipeline + d.videoTee = videoTee + d.audioTee = audioTee + + if _, err := ndiDemuxer.Connect("pad-added", d.onNdiSrcDemuxerPadAdded); err != nil { + return fmt.Errorf("connecting pad-added signal: %w", err) + } + + return nil +} + +func (d *Device) onNdiSrcDemuxerPadAdded(element *gst.Element, pad *gst.Pad) { + switch pad.GetName() { + case "video": + if plr := d.videoTee.GetStaticPad("sink").Link(pad); plr != gst.PadLinkOK { + log.Error().Str("pad", pad.GetName()).Msgf("linking video pad to video tee: %s", plr) + } + case "audio": + if plr := d.audioTee.GetStaticPad("sink").Link(pad); plr != gst.PadLinkOK { + log.Error().Str("pad", pad.GetName()).Msgf("linking audio pad to audio tee: %s", plr) + } + } + +} + +func (d *Device) GetOutput() (string, *gst.Element, *gst.Element, error) { + id, videoElement, audioElement, err := func() (string, *gst.Element, *gst.Element, error) { + d.Lock() + defer d.Unlock() + id := uuid.NewString() + + videoProxySink, err := gst.NewElementWithName("proxysink", "video-proxy") + if err != nil { + return "", nil, nil, fmt.Errorf("creating video-proxy sink: %w", err) + } + if err := d.pipeline.Add(videoProxySink); err != nil { + return "", nil, nil, fmt.Errorf("adding video-proxy sink to pipeline: %w", err) + } + + if plr := d.videoTee.GetRequestPad("src_%u").Link(videoProxySink.GetStaticPad("sink")); plr != gst.PadLinkOK { + return "", nil, nil, fmt.Errorf("linking video-tee to video-proxy sink: %s", plr) + } + + if !videoProxySink.SyncStateWithParent() { + log.Warn().Msg("video-proxy sink sync state not syncable") + } + + audioProxySink, err := gst.NewElementWithName("proxysink", "audio-proxy") + if err != nil { + return "", nil, nil, fmt.Errorf("creating audio-proxy sink: %w", err) + } + + if err := d.pipeline.Add(audioProxySink); err != nil { + return "", nil, nil, fmt.Errorf("adding audio-proxy sink to pipeline: %w", err) + } + + if plr := d.audioTee.GetRequestPad("src_%u").Link(audioProxySink.GetStaticPad("sink")); plr != gst.PadLinkOK { + return "", nil, nil, fmt.Errorf("linking audio-tee to audio-proxy sink: %s", plr) + } + + if !audioProxySink.SyncStateWithParent() { + log.Warn().Msg("audio-proxy sink sync state not syncable") + } + + d.videoProxy[id] = videoProxySink + d.audioProxy[id] = audioProxySink + + return id, videoProxySink, audioProxySink, nil + }() + if err != nil { + return "", nil, nil, fmt.Errorf("creating output: %w", err) + } + if err := d.setPipelineState(gst.StatePlaying); err != nil { + return "", nil, nil, fmt.Errorf("setting pipeline state: %w", err) + } + + return id, videoElement, audioElement, nil +} + +func (d *Device) setPipelineState(state gst.State) error { + d.Lock() + defer d.Unlock() + + if d.pipeline == nil { + return fmt.Errorf("pipeline is nil") + } + + if state == d.pipeline.GetCurrentState() { + return nil + } + + if err := d.pipeline.SetState(state); err != nil { + return fmt.Errorf("setting pipeline state: %w", err) + } + + return nil +} diff --git a/internal/ndi/ndi.go b/internal/ndi/ndi.go new file mode 100644 index 0000000..27d4768 --- /dev/null +++ b/internal/ndi/ndi.go @@ -0,0 +1,100 @@ +package ndi + +import ( + "git.entr0py.de/garionion/catie/internal/api" + "github.com/go-gst/go-gst/gst" + "github.com/rs/zerolog/log" + "time" +) + +const deviceType = "NDI" + +type NDI struct { + deviceMonitor *gst.DeviceMonitor + devices map[string]*Device +} + +func NewNDI() *NDI { + + log.Debug().Msg("creating NDI device monitor") + devMon := gst.NewDeviceMonitor() + caps := gst.NewCapsFromString("application/x-ndi") + devMon.AddFilter("Source/Network", caps) + + n := &NDI{ + deviceMonitor: devMon, + devices: make(map[string]*Device), + } + + bus := devMon.GetBus() + bus.AddWatch(func(msg *gst.Message) bool { + switch msg.Type() { + case gst.MessageDeviceAdded: + device := msg.ParseDeviceAdded() + n.createOrUpdateDevice(device) + log.Debug().Str("message", device.GetDisplayName()).Fields(device.GetProperties().Values()).Msg("NDI Device Added") + case gst.MessageDeviceRemoved: + device := msg.ParseDeviceRemoved() + n.setDeviceUnavailable(device) + log.Debug().Str("message", device.GetDisplayName()).Fields(device.GetProperties().Values()).Msg("NDI Device Removed") + default: + log.Debug().Str("msgType", msg.TypeName()).Str("message", msg.String()).Msg("default") + } + + return true + }) + + log.Debug().Msg("starting NDI device monitor") + devMon.Start() + + return n +} + +func (n *NDI) GetDeviceType() string { + return deviceType +} + +func (n *NDI) GetDevices() []api.Device { + devices := make([]api.Device, 0, len(n.devices)) + for _, d := range n.devices { + devices = append(devices, d) + } + + return devices +} + +func (n *NDI) GetDevice(name string) api.Device { + if device, ok := n.devices[name]; ok { + return device + } + + return nil +} + +func (n *NDI) createOrUpdateDevice(d *gst.Device) { + name := d.GetDisplayName() + if device, ok := n.devices[name]; ok { + device.updateProperties(d) + return + } + + n.devices[name] = newDevice(d) + +} + +func (n *NDI) setDeviceUnavailable(d *gst.Device) { + if device, ok := n.devices[d.GetDisplayName()]; ok { + device.setUnavailable() + } +} + +func (n *NDI) DiscoverSources() { + ticker := time.NewTicker(5 * time.Second) + + for range ticker.C { + devices := n.deviceMonitor.GetDevices() + for _, device := range devices { + log.Debug().Fields(device.GetProperties().Values()).Msg("discovering sources") + } + } +} diff --git a/main.go b/main.go index 9a9a684..bee9443 100644 --- a/main.go +++ b/main.go @@ -88,6 +88,8 @@ func main() { wg.Done() }*/ + e.Static("/", "webRTC-test") + log.Info().Str("address", cfg.ListenAddr).Msg("starting web server") if err := e.Start(cfg.ListenAddr); err != nil { log.Fatal().Err(err).Msg("web server failed") diff --git a/package.nix b/package.nix index 89fdbad..aaecb0b 100644 --- a/package.nix +++ b/package.nix @@ -8,12 +8,14 @@ let gst_all_1.gst-plugins-good gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-ugly + libnice ] + ":" + lib.makeLibraryPath [ gst_all_1.gstreamer.out gst_all_1.gst-plugins-base gst_all_1.gst-plugins-good gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-ugly + libnice ]; in pkgs.buildGo123Module { @@ -29,6 +31,7 @@ pkgs.buildGo123Module { gst_all_1.gst-plugins-good gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-ugly + libnice glib ndi ]; @@ -39,19 +42,11 @@ pkgs.buildGo123Module { makeWrapper ]; - ldflags = [ - "-X git.entr0py.de/garionion/catie/internal/ndi.NDI_LIB_PATH=${pkgs.ndi}" - ]; - - buildFlags = [ - "CGO_CFLAGS=-I${pkgs.ndi}/include" - "CGO_LDFLAGS=-L${pkgs.ndi}/lib -lndi" - ]; - postInstall = '' wrapProgram $out/bin/catie \ --set GST_PLUGIN_SYSTEM_PATH_1_0 ${gstPluginPath} \ --set GST_PLUGIN_PATH_1_0 ${gstPluginPath} + --set NDI_RUNTIME_DIR_V5 ${lib.makeLibraryPath [ ndi ]} \ ''; tags = [ ];