package gstreamer import ( "context" "errors" "github.com/tinyzimmer/go-glib/glib" "github.com/tinyzimmer/go-gst/gst" ) var gstreamer *Gstreamer func Init() *Gstreamer { gst.Init(nil) mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) go mainLoop.Run() decklink := &Decklink{ Slots: map[string]string{"one": ""}, } gstreamer = &Gstreamer{ ctx: context.Background(), Pipelines: map[string]*Pipeline{}, pipelineSlots: make(map[string][]string), notifyOnUpdate: map[string][]NotifyOnPipelineUpdate{}, Decklink: decklink, } return gstreamer } func (g *Gstreamer) AddPipeline(pipeline *Pipeline) { g.Lock() defer g.Unlock() if _, ok := g.pipelineSlots[pipeline.Name]; ok { g.pipelineSlots[pipeline.Name] = append(g.pipelineSlots[pipeline.Name], pipeline.ID) } else { g.pipelineSlots[pipeline.Name] = []string{pipeline.ID} } g.Pipelines[pipeline.ID] = pipeline pipeline.SubscribeToUpdates(g.notifySlotSubscriber()) } func (g *Gstreamer) DeletePipeline(id string) error { g.Lock() defer g.Unlock() if pipeline, err := g.getPipeline(id); err == nil { var i int for i := range g.pipelineSlots[pipeline.Name] { if g.pipelineSlots[pipeline.Name][i] == pipeline.ID { break } } g.pipelineSlots[pipeline.Name] = append(g.pipelineSlots[pipeline.Name][:i], g.pipelineSlots[pipeline.Name][i+1:]...) delete(g.Pipelines, id) pipeline.ctxCancel() return nil } return errors.New("pipeline not found") } func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) { g.RLock() defer g.RUnlock() return g.getPipeline(id) } // only use this if ressource is locked func (g *Gstreamer) getPipeline(id string) (*Pipeline, error) { if pipeline, ok := g.Pipelines[id]; ok { return pipeline, nil } return nil, errors.New("Pipeline not found") } func (g *Gstreamer) SubscribeToSlotUpdates(slot string, callback NotifyOnPipelineUpdate) { g.Lock() defer g.Unlock() if _, ok := g.notifyOnUpdate[slot]; ok { g.notifyOnUpdate[slot] = append(g.notifyOnUpdate[slot], callback) } else { g.notifyOnUpdate[slot] = []NotifyOnPipelineUpdate{callback} } } func (g *Gstreamer) notifySlotSubscriber() NotifyOnPipelineUpdate { return func(pipeline *Pipeline) { for _, callback := range g.notifyOnUpdate[pipeline.Name] { callback(pipeline) } } }