Compare commits
8 commits
main
...
wip/webrtc
Author | SHA1 | Date | |
---|---|---|---|
f103ddb7a8 | |||
7147f3f13b | |||
35a8e20244 | |||
4419e31503 | |||
3d88b76c0d | |||
8558241aa5 | |||
26624ec52d | |||
c3a187d580 |
11 changed files with 1117 additions and 24 deletions
|
@ -21,6 +21,7 @@
|
||||||
gst_all_1.gst-plugins-good
|
gst_all_1.gst-plugins-good
|
||||||
gst_all_1.gst-plugins-bad
|
gst_all_1.gst-plugins-bad
|
||||||
gst_all_1.gst-plugins-ugly
|
gst_all_1.gst-plugins-ugly
|
||||||
|
libnice
|
||||||
|
|
||||||
gcc glib pkg-config
|
gcc glib pkg-config
|
||||||
ndi
|
ndi
|
||||||
|
|
67
internal/api/api.go
Normal file
67
internal/api/api.go
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/go-gst/go-gst/gst"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
type API struct {
|
||||||
|
deviceMonitors map[string]DeviceMonitor
|
||||||
|
previewPipelines map[string]*PreviewPipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeviceMonitor interface {
|
||||||
|
GetDevices() []Device
|
||||||
|
GetDevice(name string) Device
|
||||||
|
GetDeviceType() string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Device interface {
|
||||||
|
GetDisplayName() string
|
||||||
|
GetDeviceType() string
|
||||||
|
GetProperties() map[string]interface{}
|
||||||
|
GetOutput() (string, *gst.Element, *gst.Element, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *API {
|
||||||
|
return &API{
|
||||||
|
deviceMonitors: make(map[string]DeviceMonitor),
|
||||||
|
previewPipelines: make(map[string]*PreviewPipeline),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) RegisterRoutes(g *echo.Group) {
|
||||||
|
g.Add("GET", "/devices", api.GetDevices)
|
||||||
|
g.Add("GET", "/preview/:devicemonitor-id/:source", api.previewHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) RegisterDeviceMonitor(monitors ...DeviceMonitor) {
|
||||||
|
for _, monitor := range monitors {
|
||||||
|
api.deviceMonitors[uuid.NewString()] = monitor
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type device struct {
|
||||||
|
MonitorID string
|
||||||
|
DeviceType string
|
||||||
|
DeviceName string
|
||||||
|
Properties map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) GetDevices(c echo.Context) error {
|
||||||
|
devices := make([]device, 0)
|
||||||
|
|
||||||
|
for id, monitor := range api.deviceMonitors {
|
||||||
|
for _, d := range monitor.GetDevices() {
|
||||||
|
devices = append(devices, device{
|
||||||
|
MonitorID: id,
|
||||||
|
DeviceType: d.GetDeviceType(),
|
||||||
|
DeviceName: d.GetDisplayName(),
|
||||||
|
Properties: d.GetProperties(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(200, devices)
|
||||||
|
}
|
485
internal/api/preview.go
Normal file
485
internal/api/preview.go
Normal file
|
@ -0,0 +1,485 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/go-gst/go-glib/glib"
|
||||||
|
"github.com/go-gst/go-gst/gst"
|
||||||
|
"github.com/go-gst/go-gst/gst/gstsdp"
|
||||||
|
"github.com/go-gst/go-gst/gst/gstwebrtc"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
previewWidth = 640
|
||||||
|
previewHeight = 360
|
||||||
|
previewBitrate = 600 // in kbps
|
||||||
|
previewFPS = 25
|
||||||
|
)
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{
|
||||||
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type PreviewPipeline struct {
|
||||||
|
outputID string
|
||||||
|
|
||||||
|
pipeline *gst.Pipeline
|
||||||
|
webrtcbin *gst.Element
|
||||||
|
}
|
||||||
|
|
||||||
|
type SignalMessage struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
SDP string `json:"sdp,omitempty"`
|
||||||
|
ICE *IceCandidate `json:"ice,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type IceCandidate struct {
|
||||||
|
Candidate string `json:"candidate"`
|
||||||
|
SDPMid string `json:"sdpMid"`
|
||||||
|
SDPMLineIndex uint16 `json:"sdpMLineIndex"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) previewHandler(c echo.Context) error {
|
||||||
|
devicemonitorId := c.Param("devicemonitor-id")
|
||||||
|
source := c.Param("source")
|
||||||
|
streamID := fmt.Sprintf("%s_%s", devicemonitorId, source)
|
||||||
|
|
||||||
|
//debug
|
||||||
|
for id, _ := range api.deviceMonitors {
|
||||||
|
devicemonitorId = id
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug().Str("streamID", streamID).Msg("Preview stream requested")
|
||||||
|
|
||||||
|
deviceMonitor, ok := api.deviceMonitors[devicemonitorId]
|
||||||
|
if !ok {
|
||||||
|
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device monitor with ID %s not found", devicemonitorId))
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceDevice := deviceMonitor.GetDevice(source)
|
||||||
|
if sourceDevice == nil {
|
||||||
|
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Device with name %s not found", source))
|
||||||
|
}
|
||||||
|
|
||||||
|
previewPipeline, ok := api.previewPipelines[streamID]
|
||||||
|
var err error
|
||||||
|
if !ok {
|
||||||
|
previewPipeline, err = api.createPreviewPipeline(sourceDevice)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to create preview pipeline")
|
||||||
|
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create preview pipeline")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO lock map while writing
|
||||||
|
api.previewPipelines[streamID] = previewPipeline
|
||||||
|
}
|
||||||
|
if err := previewPipeline.pipeline.SetState(gst.StatePlaying); err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to set pipeline to playing")
|
||||||
|
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to set pipeline to playing")
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Upgrade error")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// --- WebRTC Signal Handling ---
|
||||||
|
|
||||||
|
// on-negotiation-needed: Triggered when webrtcbin needs to create an offer or answer.
|
||||||
|
// In our case (server receiving an offer), this fires after the remote description is set.
|
||||||
|
if _, err := previewPipeline.webrtcbin.Connect("on-negotiation-needed", func(self *gst.Element) {
|
||||||
|
log.Info().Msg("Negotiation needed, creating answer")
|
||||||
|
|
||||||
|
// Create Answer
|
||||||
|
promise := gst.NewPromise()
|
||||||
|
_, err := self.Emit("create-answer", nil, promise)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to emit create-answer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle the promise result (asynchronously)
|
||||||
|
promise.Interrupt() // Interrupt any previous waits
|
||||||
|
reply, err := promise.Await(c.Request().Context()) // Wait for the answer to be created
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to await create-answer promise")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if reply == nil {
|
||||||
|
log.Error().Msg("Promise reply for create-answer was nil")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
answerValue, err := reply.GetValue("answer")
|
||||||
|
if err != nil || answerValue == nil {
|
||||||
|
log.Error().Msg("Failed to get answer from promise reply")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
answer, ok := answerValue.(*gstwebrtc.SessionDescription)
|
||||||
|
if !ok || answer == nil {
|
||||||
|
log.Error().Msg("Answer value is not a WebRTCSessionDescription")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug().Str("sdp", answer.SDP().String()).Msg("Answer created")
|
||||||
|
|
||||||
|
// Set Local Description
|
||||||
|
descPromise := gst.NewPromise()
|
||||||
|
_, err = self.Emit("set-local-description", answer, descPromise)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to emit set-local-description")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
descPromise.Interrupt() // Interrupt any previous waits
|
||||||
|
|
||||||
|
log.Info().Msg("Set local description (answer)")
|
||||||
|
|
||||||
|
// Send Answer back to the client
|
||||||
|
log.Info().Msg("Sending SDP answer back to browser")
|
||||||
|
response := SignalMessage{
|
||||||
|
Type: "answer",
|
||||||
|
SDP: answer.SDP().String(),
|
||||||
|
}
|
||||||
|
msg, err := json.Marshal(response)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to marshal answer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to send answer over WebSocket")
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to connect on-negotiation-needed signal")
|
||||||
|
}
|
||||||
|
|
||||||
|
// on-ice-candidate: Triggered when webrtcbin generates a new ICE candidate.
|
||||||
|
if _, err := previewPipeline.webrtcbin.Connect("on-ice-candidate", func(self *gst.Element, mlineindex uint, candidate string, _ any) {
|
||||||
|
log.Debug().Uint("mlineindex", mlineindex).Str("candidate", candidate).Msg("🧊 Generated ICE candidate")
|
||||||
|
// Note: The 'sdpMid' is often derived from the mlineindex or context,
|
||||||
|
// but the signal itself only provides mlineindex and candidate string directly.
|
||||||
|
// The JS side seems to handle this structure correctly.
|
||||||
|
ice := SignalMessage{
|
||||||
|
Type: "ice",
|
||||||
|
ICE: &IceCandidate{
|
||||||
|
Candidate: candidate,
|
||||||
|
SDPMLineIndex: uint16(mlineindex), // Assuming JS expects uint16
|
||||||
|
// sdpMid might need to be derived if required, but often index is enough
|
||||||
|
},
|
||||||
|
}
|
||||||
|
msg, err := json.Marshal(ice)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to marshal ICE candidate")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to send ICE candidate over WebSocket")
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to connect on-ice-candidate signal")
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- WebSocket Message Loop ---
|
||||||
|
for {
|
||||||
|
_, message, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("WebSocket read error")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var signal SignalMessage
|
||||||
|
if err := json.Unmarshal(message, &signal); err != nil {
|
||||||
|
log.Warn().Err(err).Msg("Failed to unmarshal signal message")
|
||||||
|
log.Warn().Err(err).Str("message", string(message)).Msg("Failed to unmarshal signal message")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch signal.Type {
|
||||||
|
case "offer":
|
||||||
|
log.Info().Msg("Received SDP offer from browser")
|
||||||
|
//log.Debug().Str("sdp", signal.SDP).Msg("Offer SDP")
|
||||||
|
|
||||||
|
// Create WebRTCSessionDescription for the offer
|
||||||
|
offerMsg, err := gstsdp.ParseSDPMessage(signal.SDP)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to parse SDP message")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
offerDesc := gstwebrtc.NewSessionDescription(gstwebrtc.SDP_TYPE_OFFER, offerMsg)
|
||||||
|
|
||||||
|
promise := gst.NewPromise()
|
||||||
|
// Set Remote Description
|
||||||
|
// This will trigger on-negotiation-needed if successful and state allows
|
||||||
|
if _, err := previewPipeline.webrtcbin.Emit("set-remote-description", offerDesc, promise); err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to emit set-remote-description")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if promise == nil {
|
||||||
|
log.Error().Msg("Emit set-remote-description returned nil promise")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//promise.Interrupt() // Interrupt previous waits if any
|
||||||
|
promise.Await(c.Request().Context()) // Wait for the remote description to be set
|
||||||
|
|
||||||
|
log.Info().Msg("Set remote description (offer)")
|
||||||
|
// Answer creation is now handled in on-negotiation-needed
|
||||||
|
|
||||||
|
case "ice":
|
||||||
|
if signal.ICE == nil {
|
||||||
|
log.Warn().Msg("Received ICE signal with nil ICE field")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Debug().Str("candidate", signal.ICE.Candidate).Uint16("mlineindex", signal.ICE.SDPMLineIndex).Str("sdpMid", signal.ICE.SDPMid).Msg("Received ICE candidate from browser")
|
||||||
|
|
||||||
|
// Add ICE Candidate
|
||||||
|
// Note: The signal takes mlineindex (uint) and candidate (string).
|
||||||
|
// sdpMid is usually associated but not directly passed here.
|
||||||
|
_, err := previewPipeline.webrtcbin.Emit("add-ice-candidate", uint(signal.ICE.SDPMLineIndex), signal.ICE.Candidate)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to emit add-ice-candidate")
|
||||||
|
} else {
|
||||||
|
log.Debug().Msg("Added ICE candidate")
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Warn().Str("type", signal.Type).Msg("Received unknown signal type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) createPreviewPipeline(d Device) (*PreviewPipeline, error) {
|
||||||
|
pipeline, err := gst.NewPipeline(fmt.Sprintf("preview-pipeline_%s", d.GetDisplayName()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoProxySrc, err := gst.NewElementWithName("proxysrc", "video-proxy-src")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating video proxy src: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(videoProxySrc); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding video proxy src to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoConvertScale, err := gst.NewElement("videoconvertscale")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating video convert scale: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(videoConvertScale); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding video convert scale to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := videoProxySrc.Link(videoConvertScale); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking video proxy src to convert scale: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoRate, err := gst.NewElement("videorate")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating video rate: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(videoRate); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding video rate to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := videoConvertScale.Link(videoRate); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking video convert scale to rate: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
convertCaps := gst.NewCapsFromString(fmt.Sprintf("video/x-raw,format=I420,width=%d,height=%d,framerate=%d/1", previewWidth, previewHeight, previewFPS))
|
||||||
|
|
||||||
|
videoEncoder, err := gst.NewElementWithProperties("x264enc", map[string]interface{}{
|
||||||
|
"bitrate": previewBitrate,
|
||||||
|
"speed-preset": 1, // ultrafast
|
||||||
|
"tune": 0x00000004, // zero-latency
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating video encoder: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(videoEncoder); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding video encoder to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := videoRate.LinkFiltered(videoEncoder, convertCaps); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking filtered video rate to encoder: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoPayloader, err := gst.NewElementWithProperties("rtph264pay", map[string]interface{}{
|
||||||
|
"aggregate-mode": 1, // "zero-latency"
|
||||||
|
"pt": 96,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating video payloader: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(videoPayloader); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding video payloader to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := videoEncoder.Link(videoPayloader); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking video encoder to payloader: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
multiQueue, err := gst.NewElement("multiqueue")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating multiqueue: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(multiQueue); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding multiqueue to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoPayloaderSrcPad := videoPayloader.GetStaticPad("src")
|
||||||
|
videoQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
|
||||||
|
|
||||||
|
if plr := videoPayloaderSrcPad.Link(videoQueueSinkPad); plr != gst.PadLinkOK {
|
||||||
|
return nil, fmt.Errorf("linking video encoder src pad to multiqueue sink pad: %s", plr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
webRtcBin, err := gst.NewElementWithProperties("webrtcbin", map[string]interface{}{
|
||||||
|
"stun-server": "stun://stun.l.google.com:19302",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating webrtcbin: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(webRtcBin); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding webrtcbin to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioProxySrc, err := gst.NewElementWithName("proxysrc", "audio-proxy-src")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating audio proxy src: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(audioProxySrc); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding audio proxy src to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioConvert, err := gst.NewElement("audioconvert")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating audio convert: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(audioConvert); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding audio convert to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := audioProxySrc.Link(audioConvert); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking audio proxy src to convert: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioRate, err := gst.NewElement("audiorate")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating audio rate: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(audioRate); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding audio rate to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := audioConvert.Link(audioRate); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking audio convert to rate: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioCaps := gst.NewCapsFromString("audio/x-raw,format=S16LE,channels=2,rate=48000")
|
||||||
|
|
||||||
|
audioEncoder, err := gst.NewElement("opusenc")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating audio encoder: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(audioEncoder); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding audio encoder to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := audioRate.LinkFiltered(audioEncoder, audioCaps); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking filtered audio rate to encoder: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioPayloader, err := gst.NewElementWithProperties("rtpopuspay", map[string]interface{}{
|
||||||
|
"pt": 97,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating audio payloader: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(audioPayloader); err != nil {
|
||||||
|
return nil, fmt.Errorf("adding audio payloader to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := audioEncoder.Link(audioPayloader); err != nil {
|
||||||
|
return nil, fmt.Errorf("linking audio encoder to payloader: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioQueueSinkPad := multiQueue.GetRequestPad("sink_%u")
|
||||||
|
audioPayloaderSrcPad := audioPayloader.GetStaticPad("src")
|
||||||
|
if plr := audioPayloaderSrcPad.Link(audioQueueSinkPad); plr != gst.PadLinkOK {
|
||||||
|
return nil, fmt.Errorf("linking audio encoder src pad to multiqueue sink pad: %s", plr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
outputID, videoSrc, audioSrc, err := d.GetOutput()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting output: %w", err)
|
||||||
|
}
|
||||||
|
if err := linkProxySink(videoProxySrc, videoSrc); err != nil {
|
||||||
|
return nil, fmt.Errorf("setting video proxy src proxysink: %w", err)
|
||||||
|
}
|
||||||
|
if err := linkProxySink(audioProxySrc, audioSrc); err != nil {
|
||||||
|
return nil, fmt.Errorf("setting audio proxy src proxysink: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct the struct before setting state, so we have it in case of errors during state change
|
||||||
|
// Store the outputID so we can potentially clean up the source proxy later if needed
|
||||||
|
previewPipeline := &PreviewPipeline{
|
||||||
|
outputID: outputID,
|
||||||
|
pipeline: pipeline,
|
||||||
|
webrtcbin: webRtcBin,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect signals before setting state
|
||||||
|
log.Debug().Str("pipeline", pipeline.GetName()).Msg("Connecting signals")
|
||||||
|
if _, err := multiQueue.Connect("pad-added", previewPipeline.onMultiqueuePadAdded); err != nil {
|
||||||
|
// Attempt cleanup if signal connection fails
|
||||||
|
pipeline.SetState(gst.StateNull)
|
||||||
|
// TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID
|
||||||
|
return nil, fmt.Errorf("connecting multiqueue pad-added signal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now set the preview pipeline to playing
|
||||||
|
log.Info().Str("pipeline", pipeline.GetName()).Msg("Setting preview pipeline state to PLAYING")
|
||||||
|
if err := pipeline.SetState(gst.StatePlaying); err != nil {
|
||||||
|
// Attempt cleanup
|
||||||
|
pipeline.SetState(gst.StateNull)
|
||||||
|
// TODO: Need to potentially remove the proxysinks from the source device pipeline using outputID
|
||||||
|
return nil, fmt.Errorf("setting preview pipeline state to playing: %w", err)
|
||||||
|
}
|
||||||
|
log.Info().Str("pipeline", pipeline.GetName()).Msg("Preview pipeline state set to PLAYING")
|
||||||
|
|
||||||
|
return previewPipeline, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pipeline *PreviewPipeline) onMultiqueuePadAdded(element *gst.Element, pad *gst.Pad) {
|
||||||
|
log.Debug().Msg("Multiqueue pad added")
|
||||||
|
if plr := pipeline.webrtcbin.GetRequestPad("sink_%u").Link(pad); plr != gst.PadLinkOK {
|
||||||
|
log.Error().Str("pad-link-return", plr.String()).Msg("Failed to link multiqueue pad")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://github.com/go-gst/go-gst/issues/165
|
||||||
|
func linkProxySink(src, sink *gst.Element) error {
|
||||||
|
propType, err := src.GetPropertyType("proxysink")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get proxysink property type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := glib.ValueInit(propType)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to init proxysink property value: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
val.SetInstance(sink.GetPrivate())
|
||||||
|
|
||||||
|
return src.SetPropertyValue("proxysink", val)
|
||||||
|
}
|
|
@ -2,23 +2,8 @@ package gstreamer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/go-gst/go-gst/gst"
|
"github.com/go-gst/go-gst/gst"
|
||||||
"github.com/goccy/go-graphviz/cgraph"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func gstreamerBinToDot(bin *gst.Bin) string {
|
func gstreamerBinToDot(bin *gst.Bin) string {
|
||||||
return bin.DebugBinToDotData(gst.DebugGraphShowAll)
|
return bin.DebugBinToDotData(gst.DebugGraphShowAll)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setFontSize(graph *cgraph.Graph, fontSize float64) {
|
|
||||||
graph.SetFontSize(20)
|
|
||||||
graph.Set("arrowSize", "3")
|
|
||||||
for node, err := graph.FirstNode(); err == nil && node != nil; node, err = graph.NextNode(node) {
|
|
||||||
node.SetFontSize(fontSize)
|
|
||||||
for edge, err := graph.FirstEdge(node); err == nil && edge != nil; edge, err = graph.NextEdge(edge, node) {
|
|
||||||
edge.SetFontSize(fontSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//for subGraph, err := graph.FirstSubGraph(); err == nil && subGraph != nil; subGraph, err = graph.NextSubGraph() {
|
|
||||||
// setFontSize(subGraph, fontSize)
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
|
|
285
internal/ndi/device.go
Normal file
285
internal/ndi/device.go
Normal file
|
@ -0,0 +1,285 @@
|
||||||
|
package ndi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/go-gst/go-gst/gst"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Device struct {
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
|
name string
|
||||||
|
uri string
|
||||||
|
props map[string]interface{}
|
||||||
|
|
||||||
|
available bool
|
||||||
|
|
||||||
|
gstDevice *gst.Device
|
||||||
|
|
||||||
|
pipeline *gst.Pipeline
|
||||||
|
videoTee *gst.Element
|
||||||
|
videoProxy map[string]*gst.Element
|
||||||
|
audioTee *gst.Element
|
||||||
|
audioProxy map[string]*gst.Element
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDevice(d *gst.Device) *Device {
|
||||||
|
name := d.GetDisplayName()
|
||||||
|
uri := d.GetProperties().Values()["url-address"].(string)
|
||||||
|
props := d.GetProperties().Values()
|
||||||
|
|
||||||
|
device := &Device{
|
||||||
|
name: name,
|
||||||
|
uri: uri,
|
||||||
|
props: props,
|
||||||
|
available: true,
|
||||||
|
|
||||||
|
gstDevice: d,
|
||||||
|
|
||||||
|
videoProxy: make(map[string]*gst.Element),
|
||||||
|
audioProxy: make(map[string]*gst.Element),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := device.createPipeline()
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to create pipeline for device")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return device
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) updateProperties(dgst *gst.Device) {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
|
||||||
|
d.name = dgst.GetDisplayName()
|
||||||
|
d.uri = dgst.GetProperties().Values()["url-address"].(string)
|
||||||
|
d.props = dgst.GetProperties().Values()
|
||||||
|
d.available = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) setUnavailable() {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
|
||||||
|
d.available = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) GetDisplayName() string {
|
||||||
|
d.RLock()
|
||||||
|
defer d.RUnlock()
|
||||||
|
|
||||||
|
return d.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) GetDeviceType() string {
|
||||||
|
return deviceType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) GetProperties() map[string]interface{} {
|
||||||
|
d.RLock()
|
||||||
|
defer d.RUnlock()
|
||||||
|
|
||||||
|
return d.props
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO handle case where ndidemux src pads are not present
|
||||||
|
func (d *Device) createPipeline() error {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
|
||||||
|
log.Debug().Str("name", d.name).Msg("Creating pipeline for device")
|
||||||
|
|
||||||
|
pipeline, err := gst.NewPipeline("ndi-" + d.name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating pipeline: %w", err)
|
||||||
|
}
|
||||||
|
pipeline.ForceClock(gst.ObtainSystemClock().Clock)
|
||||||
|
|
||||||
|
input := d.gstDevice.CreateElement("")
|
||||||
|
if err := pipeline.Add(input); err != nil {
|
||||||
|
return fmt.Errorf("adding input to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
inputQueue, err := gst.NewElementWithProperties("queue2",
|
||||||
|
map[string]interface{}{
|
||||||
|
"name": "input-queue",
|
||||||
|
"max-size-buffers": 0,
|
||||||
|
"max-size-bytes": 0,
|
||||||
|
"max-size-time": 40000000, // 40ms
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating input-queue: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(inputQueue); err != nil {
|
||||||
|
return fmt.Errorf("adding input-queue to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := input.Link(inputQueue); err != nil {
|
||||||
|
return fmt.Errorf("linking input to input input-queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ndiDemuxer, err := gst.NewElement("ndisrcdemux")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating ndisrcdemux: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(ndiDemuxer); err != nil {
|
||||||
|
return fmt.Errorf("adding ndisrcdemux to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := inputQueue.Link(ndiDemuxer); err != nil {
|
||||||
|
return fmt.Errorf("linking input-queue to ndisrcdemux: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoTee, err := gst.NewElementWithName("tee", "video-tee")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating video-tee: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(videoTee); err != nil {
|
||||||
|
return fmt.Errorf("adding video-tee to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
audioTee, err := gst.NewElementWithName("tee", "audio-tee")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating audio-tee: %w", err)
|
||||||
|
}
|
||||||
|
if err := pipeline.Add(audioTee); err != nil {
|
||||||
|
return fmt.Errorf("adding audio-tee to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.pipeline = pipeline
|
||||||
|
d.videoTee = videoTee
|
||||||
|
d.audioTee = audioTee
|
||||||
|
|
||||||
|
if _, err := ndiDemuxer.Connect("pad-added", d.onNdiSrcDemuxerPadAdded); err != nil {
|
||||||
|
return fmt.Errorf("connecting pad-added signal: %w", err)
|
||||||
|
}
|
||||||
|
cwd, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting current working directory: %w", err)
|
||||||
|
}
|
||||||
|
log.Debug().Str("cwd", cwd).Msg("dumping pipeline to dot file")
|
||||||
|
filename := filepath.Join(cwd, "dumpDot", "ndi-pipeline.dot")
|
||||||
|
dot := pipeline.DebugBinToDotData(gst.DebugGraphShowAll)
|
||||||
|
if err := os.WriteFile(filename, []byte(dot), 0644); err != nil {
|
||||||
|
return fmt.Errorf("writing dot file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug().Str("name", d.name).Msg("Pipeline created")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) onNdiSrcDemuxerPadAdded(element *gst.Element, pad *gst.Pad) {
|
||||||
|
// This is called when ndisrcdemux adds a source pad (e.g., "video", "audio")
|
||||||
|
// We need to link this source pad to the sink pad of the corresponding tee.
|
||||||
|
log.Debug().Str("pad", pad.GetName()).Msg("NDI demuxer pad added, linking to tee sink")
|
||||||
|
|
||||||
|
var teeSinkPad *gst.Pad
|
||||||
|
var teeName string
|
||||||
|
switch pad.GetName() {
|
||||||
|
case "video":
|
||||||
|
teeSinkPad = d.videoTee.GetStaticPad("sink")
|
||||||
|
teeName = d.videoTee.GetName()
|
||||||
|
case "audio":
|
||||||
|
teeSinkPad = d.audioTee.GetStaticPad("sink")
|
||||||
|
teeName = d.audioTee.GetName()
|
||||||
|
default:
|
||||||
|
log.Warn().Str("pad", pad.GetName()).Msg("Ignoring unknown pad from ndisrcdemux")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if teeSinkPad == nil {
|
||||||
|
log.Error().Str("pad", pad.GetName()).Str("tee", teeName).Msg("Failed to get static sink pad from tee")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Link the demuxer's source pad to the tee's sink pad
|
||||||
|
log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Attempting to link NDI demuxer pad to tee")
|
||||||
|
if plr := pad.Link(teeSinkPad); plr != gst.PadLinkOK {
|
||||||
|
log.Error().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msgf("Linking NDI demuxer pad to tee failed: %s", plr)
|
||||||
|
} else {
|
||||||
|
log.Info().Str("src_pad", pad.GetName()).Str("sink_pad", teeSinkPad.GetName()).Str("tee", teeName).Msg("Successfully linked NDI demuxer pad to tee")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) GetOutput() (string, *gst.Element, *gst.Element, error) {
|
||||||
|
id, videoElement, audioElement, err := func() (string, *gst.Element, *gst.Element, error) {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
id := uuid.NewString()
|
||||||
|
|
||||||
|
videoProxySink, err := gst.NewElementWithName("proxysink", "video-proxy")
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, nil, fmt.Errorf("creating video-proxy sink: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.pipeline.Add(videoProxySink); err != nil {
|
||||||
|
return "", nil, nil, fmt.Errorf("adding video-proxy sink to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if plr := d.videoTee.GetRequestPad("src_%u").Link(videoProxySink.GetStaticPad("sink")); plr != gst.PadLinkOK {
|
||||||
|
return "", nil, nil, fmt.Errorf("linking video-tee to video-proxy sink: %s", plr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !videoProxySink.SyncStateWithParent() {
|
||||||
|
log.Warn().Msg("video-proxy sink sync state not syncable")
|
||||||
|
}
|
||||||
|
|
||||||
|
audioProxySink, err := gst.NewElementWithName("proxysink", "audio-proxy")
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, nil, fmt.Errorf("creating audio-proxy sink: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := d.pipeline.Add(audioProxySink); err != nil {
|
||||||
|
return "", nil, nil, fmt.Errorf("adding audio-proxy sink to pipeline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if plr := d.audioTee.GetRequestPad("src_%u").Link(audioProxySink.GetStaticPad("sink")); plr != gst.PadLinkOK {
|
||||||
|
return "", nil, nil, fmt.Errorf("linking audio-tee to audio-proxy sink: %s", plr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !audioProxySink.SyncStateWithParent() {
|
||||||
|
log.Warn().Msg("audio-proxy sink sync state not syncable")
|
||||||
|
}
|
||||||
|
|
||||||
|
d.videoProxy[id] = videoProxySink
|
||||||
|
d.audioProxy[id] = audioProxySink
|
||||||
|
|
||||||
|
return id, videoProxySink, audioProxySink, nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, nil, fmt.Errorf("creating output: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.setPipelineState(gst.StatePlaying); err != nil {
|
||||||
|
return "", nil, nil, fmt.Errorf("setting pipeline state: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return id, videoElement, audioElement, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) setPipelineState(state gst.State) error {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
|
||||||
|
if d.pipeline == nil {
|
||||||
|
return fmt.Errorf("pipeline is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if state == d.pipeline.GetCurrentState() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Msg("Setting pipeline state")
|
||||||
|
if err := d.pipeline.SetState(state); err != nil {
|
||||||
|
log.Error().Str("pipeline", d.pipeline.GetName()).Str("target_state", state.String()).Err(err).Msg("SetState failed")
|
||||||
|
return fmt.Errorf("setting pipeline state to %s: %w", state.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
100
internal/ndi/ndi.go
Normal file
100
internal/ndi/ndi.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
package ndi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.entr0py.de/garionion/catie/internal/api"
|
||||||
|
"github.com/go-gst/go-gst/gst"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const deviceType = "NDI"
|
||||||
|
|
||||||
|
type NDI struct {
|
||||||
|
deviceMonitor *gst.DeviceMonitor
|
||||||
|
devices map[string]*Device
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNDI() *NDI {
|
||||||
|
|
||||||
|
log.Debug().Msg("creating NDI device monitor")
|
||||||
|
devMon := gst.NewDeviceMonitor()
|
||||||
|
caps := gst.NewCapsFromString("application/x-ndi")
|
||||||
|
devMon.AddFilter("Source/Network", caps)
|
||||||
|
|
||||||
|
n := &NDI{
|
||||||
|
deviceMonitor: devMon,
|
||||||
|
devices: make(map[string]*Device),
|
||||||
|
}
|
||||||
|
|
||||||
|
bus := devMon.GetBus()
|
||||||
|
bus.AddWatch(func(msg *gst.Message) bool {
|
||||||
|
switch msg.Type() {
|
||||||
|
case gst.MessageDeviceAdded:
|
||||||
|
device := msg.ParseDeviceAdded()
|
||||||
|
n.createOrUpdateDevice(device)
|
||||||
|
log.Debug().Str("message", device.GetDisplayName()).Fields(device.GetProperties().Values()).Msg("NDI Device Added")
|
||||||
|
case gst.MessageDeviceRemoved:
|
||||||
|
device := msg.ParseDeviceRemoved()
|
||||||
|
n.setDeviceUnavailable(device)
|
||||||
|
log.Debug().Str("message", device.GetDisplayName()).Fields(device.GetProperties().Values()).Msg("NDI Device Removed")
|
||||||
|
default:
|
||||||
|
log.Debug().Str("msgType", msg.TypeName()).Str("message", msg.String()).Msg("default")
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Debug().Msg("starting NDI device monitor")
|
||||||
|
devMon.Start()
|
||||||
|
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NDI) GetDeviceType() string {
|
||||||
|
return deviceType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NDI) GetDevices() []api.Device {
|
||||||
|
devices := make([]api.Device, 0, len(n.devices))
|
||||||
|
for _, d := range n.devices {
|
||||||
|
devices = append(devices, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
return devices
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NDI) GetDevice(name string) api.Device {
|
||||||
|
if device, ok := n.devices[name]; ok {
|
||||||
|
return device
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NDI) createOrUpdateDevice(d *gst.Device) {
|
||||||
|
name := d.GetDisplayName()
|
||||||
|
if device, ok := n.devices[name]; ok {
|
||||||
|
device.updateProperties(d)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n.devices[name] = newDevice(d)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NDI) setDeviceUnavailable(d *gst.Device) {
|
||||||
|
if device, ok := n.devices[d.GetDisplayName()]; ok {
|
||||||
|
device.setUnavailable()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NDI) DiscoverSources() {
|
||||||
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
|
|
||||||
|
for range ticker.C {
|
||||||
|
devices := n.deviceMonitor.GetDevices()
|
||||||
|
for _, device := range devices {
|
||||||
|
log.Debug().Fields(device.GetProperties().Values()).Msg("discovering sources")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
2
main.go
2
main.go
|
@ -88,6 +88,8 @@ func main() {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
|
e.Static("/", "webRTC-test")
|
||||||
|
|
||||||
log.Info().Str("address", cfg.ListenAddr).Msg("starting web server")
|
log.Info().Str("address", cfg.ListenAddr).Msg("starting web server")
|
||||||
if err := e.Start(cfg.ListenAddr); err != nil {
|
if err := e.Start(cfg.ListenAddr); err != nil {
|
||||||
log.Fatal().Err(err).Msg("web server failed")
|
log.Fatal().Err(err).Msg("web server failed")
|
||||||
|
|
13
package.nix
13
package.nix
|
@ -8,12 +8,14 @@ let
|
||||||
gst_all_1.gst-plugins-good
|
gst_all_1.gst-plugins-good
|
||||||
gst_all_1.gst-plugins-bad
|
gst_all_1.gst-plugins-bad
|
||||||
gst_all_1.gst-plugins-ugly
|
gst_all_1.gst-plugins-ugly
|
||||||
|
libnice
|
||||||
] + ":" + lib.makeLibraryPath [
|
] + ":" + lib.makeLibraryPath [
|
||||||
gst_all_1.gstreamer.out
|
gst_all_1.gstreamer.out
|
||||||
gst_all_1.gst-plugins-base
|
gst_all_1.gst-plugins-base
|
||||||
gst_all_1.gst-plugins-good
|
gst_all_1.gst-plugins-good
|
||||||
gst_all_1.gst-plugins-bad
|
gst_all_1.gst-plugins-bad
|
||||||
gst_all_1.gst-plugins-ugly
|
gst_all_1.gst-plugins-ugly
|
||||||
|
libnice
|
||||||
];
|
];
|
||||||
in
|
in
|
||||||
pkgs.buildGo123Module {
|
pkgs.buildGo123Module {
|
||||||
|
@ -29,6 +31,7 @@ pkgs.buildGo123Module {
|
||||||
gst_all_1.gst-plugins-good
|
gst_all_1.gst-plugins-good
|
||||||
gst_all_1.gst-plugins-bad
|
gst_all_1.gst-plugins-bad
|
||||||
gst_all_1.gst-plugins-ugly
|
gst_all_1.gst-plugins-ugly
|
||||||
|
libnice
|
||||||
glib
|
glib
|
||||||
ndi
|
ndi
|
||||||
];
|
];
|
||||||
|
@ -39,19 +42,11 @@ pkgs.buildGo123Module {
|
||||||
makeWrapper
|
makeWrapper
|
||||||
];
|
];
|
||||||
|
|
||||||
ldflags = [
|
|
||||||
"-X git.entr0py.de/garionion/catie/internal/ndi.NDI_LIB_PATH=${pkgs.ndi}"
|
|
||||||
];
|
|
||||||
|
|
||||||
buildFlags = [
|
|
||||||
"CGO_CFLAGS=-I${pkgs.ndi}/include"
|
|
||||||
"CGO_LDFLAGS=-L${pkgs.ndi}/lib -lndi"
|
|
||||||
];
|
|
||||||
|
|
||||||
postInstall = ''
|
postInstall = ''
|
||||||
wrapProgram $out/bin/catie \
|
wrapProgram $out/bin/catie \
|
||||||
--set GST_PLUGIN_SYSTEM_PATH_1_0 ${gstPluginPath} \
|
--set GST_PLUGIN_SYSTEM_PATH_1_0 ${gstPluginPath} \
|
||||||
--set GST_PLUGIN_PATH_1_0 ${gstPluginPath}
|
--set GST_PLUGIN_PATH_1_0 ${gstPluginPath}
|
||||||
|
--set NDI_RUNTIME_DIR_V5 ${lib.makeLibraryPath [ ndi ]} \
|
||||||
'';
|
'';
|
||||||
|
|
||||||
tags = [ ];
|
tags = [ ];
|
||||||
|
|
15
webRTC-test/index.html
Normal file
15
webRTC-test/index.html
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
|
<title>WebRTC Video Preview</title>
|
||||||
|
<link rel="stylesheet" href="style.css">
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>WebRTC Video Stream</h1>
|
||||||
|
<video id="video" autoplay playsinline></video>
|
||||||
|
|
||||||
|
<script src="script.js"></script>
|
||||||
|
</body>
|
||||||
|
</html>
|
143
webRTC-test/script.js
Normal file
143
webRTC-test/script.js
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
const videoElement = document.getElementById('video');
|
||||||
|
let peerConnection; // Renamed for clarity
|
||||||
|
let wsConnection;
|
||||||
|
// Removed mediaConstraints and iceCandidates (not needed here)
|
||||||
|
|
||||||
|
const wsUrl = "ws://localhost:8080/api/v1/preview/5a59a94f-264a-4d24-9e57-8a9be0ecbdbd/KALANI-3.LOCAL%20%28OBS%29"; // Replace with your WebSocket URL if necessary
|
||||||
|
const peerConnectionConfig = {
|
||||||
|
iceServers: [{ urls: "stun:stun.l.google.com:19302" }]
|
||||||
|
};
|
||||||
|
|
||||||
|
// Initialize WebSocket connection
|
||||||
|
function initWebSocket() {
|
||||||
|
wsConnection = new WebSocket(wsUrl);
|
||||||
|
|
||||||
|
wsConnection.onopen = () => {
|
||||||
|
console.log('Connected to WebSocket');
|
||||||
|
createOffer();
|
||||||
|
};
|
||||||
|
|
||||||
|
wsConnection.onmessage = async (message) => {
|
||||||
|
const signal = JSON.parse(message.data);
|
||||||
|
console.log("Received signal:", signal); // Add logging
|
||||||
|
|
||||||
|
// The browser initiates with an offer, so it expects an answer or ICE candidates.
|
||||||
|
// It should not receive an 'offer'.
|
||||||
|
if (signal.type === 'answer') {
|
||||||
|
handleAnswer(signal);
|
||||||
|
} else if (signal.type === 'ice') {
|
||||||
|
handleIceCandidate(signal);
|
||||||
|
} else {
|
||||||
|
console.warn("Received unexpected signal type:", signal.type);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
wsConnection.onerror = (error) => {
|
||||||
|
console.error('WebSocket Error:', error);
|
||||||
|
};
|
||||||
|
|
||||||
|
wsConnection.onclose = (event) => {
|
||||||
|
console.log('WebSocket closed:', event.code, event.reason);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an SDP offer to receive media
|
||||||
|
async function createOffer() {
|
||||||
|
peerConnection = new RTCPeerConnection(peerConnectionConfig);
|
||||||
|
|
||||||
|
// Set up event handlers
|
||||||
|
peerConnection.onicecandidate = handleIceCandidateEvent;
|
||||||
|
peerConnection.ontrack = (event) => {
|
||||||
|
console.log("Track received:", event.track, "Stream:", event.streams[0]);
|
||||||
|
if (videoElement.srcObject !== event.streams[0]) {
|
||||||
|
videoElement.srcObject = event.streams[0];
|
||||||
|
console.log("Assigned stream to video element");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
peerConnection.oniceconnectionstatechange = () => {
|
||||||
|
console.log("ICE connection state:", peerConnection.iceConnectionState);
|
||||||
|
};
|
||||||
|
peerConnection.onconnectionstatechange = () => {
|
||||||
|
console.log("Peer connection state:", peerConnection.connectionState);
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// Configure to receive video and audio
|
||||||
|
peerConnection.addTransceiver('video', { direction: 'recvonly' });
|
||||||
|
peerConnection.addTransceiver('audio', { direction: 'recvonly' });
|
||||||
|
|
||||||
|
try {
|
||||||
|
console.log("Creating offer...");
|
||||||
|
const offer = await peerConnection.createOffer();
|
||||||
|
await peerConnection.setLocalDescription(offer);
|
||||||
|
console.log("Offer created and set as local description:", offer);
|
||||||
|
|
||||||
|
const signalMessage = {
|
||||||
|
type: 'offer',
|
||||||
|
sdp: offer.sdp
|
||||||
|
};
|
||||||
|
|
||||||
|
console.log("Sending offer via WebSocket:", signalMessage);
|
||||||
|
wsConnection.send(JSON.stringify(signalMessage));
|
||||||
|
} catch (error) {
|
||||||
|
console.error("Error creating offer:", error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle SDP answer from the server
|
||||||
|
async function handleAnswer(signal) {
|
||||||
|
console.log("Handling answer:", signal);
|
||||||
|
const remoteDescription = new RTCSessionDescription({
|
||||||
|
type: 'answer',
|
||||||
|
sdp: signal.sdp
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
await peerConnection.setRemoteDescription(remoteDescription);
|
||||||
|
console.log("Remote description (answer) set successfully.");
|
||||||
|
} catch (error) {
|
||||||
|
console.error("Error setting remote description (answer):", error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle ICE candidates generated locally (send to server)
|
||||||
|
function handleIceCandidateEvent(event) {
|
||||||
|
console.log("Local ICE candidate event:", event);
|
||||||
|
if (event.candidate) {
|
||||||
|
console.log("Sending ICE candidate:", event.candidate);
|
||||||
|
const iceMessage = {
|
||||||
|
type: 'ice',
|
||||||
|
ice: {
|
||||||
|
candidate: event.candidate.candidate,
|
||||||
|
sdpMid: event.candidate.sdpMid,
|
||||||
|
sdpMLineIndex: event.candidate.sdpMLineIndex
|
||||||
|
}
|
||||||
|
};
|
||||||
|
wsConnection.send(JSON.stringify(iceMessage));
|
||||||
|
} else {
|
||||||
|
console.log("All local ICE candidates have been sent.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle ICE candidate received from the server
|
||||||
|
async function handleIceCandidate(signal) {
|
||||||
|
console.log("Handling received ICE candidate:", signal);
|
||||||
|
if (!signal.ice || !signal.ice.candidate) {
|
||||||
|
console.warn("Received incomplete ICE candidate signal:", signal);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const candidate = new RTCIceCandidate({
|
||||||
|
candidate: signal.ice.candidate,
|
||||||
|
sdpMid: signal.ice.sdpMid,
|
||||||
|
sdpMLineIndex: signal.ice.sdpMLineIndex
|
||||||
|
});
|
||||||
|
await peerConnection.addIceCandidate(candidate);
|
||||||
|
console.log("Added received ICE candidate successfully.");
|
||||||
|
} catch (error) {
|
||||||
|
console.error("Error adding received ICE candidate:", error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the connection when the page loads
|
||||||
|
window.onload = initWebSocket;
|
15
webRTC-test/style.css
Normal file
15
webRTC-test/style.css
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
body {
|
||||||
|
font-family: Arial, sans-serif;
|
||||||
|
text-align: center;
|
||||||
|
}
|
||||||
|
|
||||||
|
h1 {
|
||||||
|
color: #4CAF50;
|
||||||
|
}
|
||||||
|
|
||||||
|
#video {
|
||||||
|
width: 100%;
|
||||||
|
max-width: 600px;
|
||||||
|
border: 1px solid #ddd;
|
||||||
|
margin-top: 20px;
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue