From 727ffafe5beee2fbd981fbb5ab39de4c5ab86ea6 Mon Sep 17 00:00:00 2001 From: Garionion Date: Wed, 9 Feb 2022 00:04:00 +0100 Subject: [PATCH] make pipeline and slot updates subscribable --- api/api.go | 50 ++++++----------------------------------- default.nix | 1 + gstreamer/gstreamer.go | 51 +++++++++++++++++++++++++++++++++++++----- gstreamer/pipeline.go | 30 +++++++++++++++++++------ gstreamer/types.go | 7 +++++- 5 files changed, 83 insertions(+), 56 deletions(-) diff --git a/api/api.go b/api/api.go index 6bd719b..ce233a4 100644 --- a/api/api.go +++ b/api/api.go @@ -11,8 +11,6 @@ import ( ) type PipelineService struct { - pipelineSlots map[string][]string - pipelineslotmapping map[string]string UnimplementedPipelineServiceServer *gstreamer.Gstreamer Font, FontWeight string //default font @@ -21,65 +19,44 @@ type PipelineService struct { sync.RWMutex } -func (s *PipelineService) UpdatePipeline(ctx context.Context, updates *PipelineUpdates) (*PipelineUpdateResponse, error) { +func (s *PipelineService) UpdatePipeline(_ context.Context, updates *PipelineUpdates) (*PipelineUpdateResponse, error) { pipeline, err := s.Gstreamer.GetPipeline(updates.Id.GetId()) if err != nil { log.Error().Str("ID", updates.Id.GetId()).Err(err) } else { overlays := s.convertRPCOverlayToGstreamerOverlay(updates.GetOverlays()) - s.Lock() pipeline.PipelineUpdates <- gstreamer.PipelineUpdates{Overlays: overlays} - s.Unlock() } return &PipelineUpdateResponse{PipelineId: &PipelineID{Id: updates.Id.GetId()}}, nil } -func (s *PipelineService) StopPipeline(ctx context.Context, id *PipelineID) (*PipelineStopResponse, error) { +func (s *PipelineService) StopPipeline(_ context.Context, id *PipelineID) (*PipelineStopResponse, error) { pipeline, err := s.Gstreamer.GetPipeline(id.GetId()) if err != nil { log.Error().Str("ID", id.GetId()).Err(err) } else { - pipeline.Lock() err = pipeline.Stop() - pipeline.Unlock() } return &PipelineStopResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err } -func (s *PipelineService) StartPipeline(ctx context.Context, id *PipelineID) (*PipelineStartResponse, error) { +func (s *PipelineService) StartPipeline(_ context.Context, id *PipelineID) (*PipelineStartResponse, error) { pipeline, err := s.Gstreamer.GetPipeline(id.GetId()) if err != nil { log.Error().Str("ID", id.GetId()).Err(err) } else { - pipeline.Lock() err = pipeline.Start() - pipeline.Unlock() } return &PipelineStartResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err } -func (s *PipelineService) DeletePipeline(ctx context.Context, id *PipelineID) (*PipelineDeleteResponse, error) { - s.Gstreamer.DeletePipeline(id.GetId()) - var err error - if slot, ok := s.pipelineslotmapping[id.GetId()]; ok { - s.Lock() - defer s.Unlock() - delete(s.pipelineslotmapping, id.GetId()) - var i int - for i := range s.pipelineSlots[slot] { - if s.pipelineSlots[slot][i] == id.GetId() { - break - } - } - s.pipelineSlots[slot] = append(s.pipelineSlots[slot][:i], s.pipelineSlots[slot][i+1:]...) - } else { - err = fmt.Errorf("Pipeline %s not found", id.GetId()) - } +func (s *PipelineService) DeletePipeline(_ context.Context, id *PipelineID) (*PipelineDeleteResponse, error) { + err := s.Gstreamer.DeletePipeline(id.GetId()) return &PipelineDeleteResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err } -func (s *PipelineService) CreatePipeline(ctx context.Context, pipeline *Pipeline) (*PipelineCreationResponse, error) { +func (s *PipelineService) CreatePipeline(_ context.Context, pipeline *Pipeline) (*PipelineCreationResponse, error) { uid, _ := uuid.NewRandom() id := uid.String() gstreamerPipeline := &gstreamer.Pipeline{ @@ -103,20 +80,7 @@ func (s *PipelineService) CreatePipeline(ctx context.Context, pipeline *Pipeline log.Error().Err(err) return nil, err } - s.Lock() - if s.pipelineSlots == nil { - s.pipelineSlots = make(map[string][]string) - } - if _, ok := s.pipelineSlots[pipeline.GetName()]; ok { - s.pipelineSlots[pipeline.GetName()] = append(s.pipelineSlots[pipeline.GetName()], id) - } else { - s.pipelineSlots[pipeline.GetName()] = []string{id} - } - if s.pipelineslotmapping == nil { - s.pipelineslotmapping = make(map[string]string) - } - s.pipelineslotmapping[id] = pipeline.GetName() - s.Unlock() + return &PipelineCreationResponse{ PipelineId: &PipelineID{Id: id}, }, nil diff --git a/default.nix b/default.nix index 777eb61..0a4822f 100644 --- a/default.nix +++ b/default.nix @@ -6,6 +6,7 @@ mkShell { gcc gst_all_1.gstreamer gst_all_1.gstreamer.dev + gst_all_1.gst-libav gst_all_1.gst-plugins-base gst_all_1.gst-plugins-good gst_all_1.gst-plugins-bad diff --git a/gstreamer/gstreamer.go b/gstreamer/gstreamer.go index 9b633c4..a0c0a90 100644 --- a/gstreamer/gstreamer.go +++ b/gstreamer/gstreamer.go @@ -17,21 +17,44 @@ func Init() *Gstreamer { Slots: map[string]string{"one": ""}, } gstreamer = &Gstreamer{ - ctx: context.Background(), - Pipelines: map[string]*Pipeline{}, - Decklink: decklink, + ctx: context.Background(), + Pipelines: map[string]*Pipeline{}, + pipelineSlots: make(map[string][]string), + notifyOnUpdate: map[string][]NotifyOnPipelineUpdate{}, + Decklink: decklink, } return gstreamer } -func (g *Gstreamer) DeletePipeline(id string) { +func (g *Gstreamer) AddPipeline(pipeline *Pipeline) { + g.Lock() + defer g.Unlock() + if _, ok := g.pipelineSlots[pipeline.Name]; ok { + g.pipelineSlots[pipeline.Name] = append(g.pipelineSlots[pipeline.Name], pipeline.ID) + } else { + g.pipelineSlots[pipeline.Name] = []string{pipeline.ID} + } + g.Pipelines[pipeline.ID] = pipeline + pipeline.SubscribeToUpdates(g.notifySlotSubscriber()) +} + +func (g *Gstreamer) DeletePipeline(id string) error { g.Lock() defer g.Unlock() if pipeline, err := g.getPipeline(id); err == nil { + var i int + for i := range g.pipelineSlots[pipeline.Name] { + if g.pipelineSlots[pipeline.Name][i] == pipeline.ID { + break + } + } + g.pipelineSlots[pipeline.Name] = append(g.pipelineSlots[pipeline.Name][:i], g.pipelineSlots[pipeline.Name][i+1:]...) + delete(g.Pipelines, id) pipeline.ctxCancel() + return nil } - return + return errors.New("pipeline not found") } func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) { @@ -47,3 +70,21 @@ func (g *Gstreamer) getPipeline(id string) (*Pipeline, error) { } return nil, errors.New("Pipeline not found") } + +func (g *Gstreamer) SubscribeToSlotUpdates(slot string, callback NotifyOnPipelineUpdate) { + g.Lock() + defer g.Unlock() + if _, ok := g.notifyOnUpdate[slot]; ok { + g.notifyOnUpdate[slot] = append(g.notifyOnUpdate[slot], callback) + } else { + g.notifyOnUpdate[slot] = []NotifyOnPipelineUpdate{callback} + } +} + +func (g *Gstreamer) notifySlotSubscriber() NotifyOnPipelineUpdate { + return func(pipeline *Pipeline) { + for _, callback := range g.notifyOnUpdate[pipeline.Name] { + callback(pipeline) + } + } +} diff --git a/gstreamer/pipeline.go b/gstreamer/pipeline.go index 42dfedd..d734e44 100644 --- a/gstreamer/pipeline.go +++ b/gstreamer/pipeline.go @@ -93,15 +93,15 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) { updateChannel := make(chan PipelineUpdates) go p.overlayUpdater(updateChannel, gstreamerLogger) p.PipelineUpdates = updateChannel - gstreamer.Lock() - gstreamer.Pipelines[p.ID] = p - gstreamer.Unlock() + gstreamer.AddPipeline(p) return updateChannel, nil } //TODO check if output is decklink func (p *Pipeline) Start() error { + p.Lock() + defer p.Unlock() //output, err := gstreamer.getDecklinkOutput(p.Name) //setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) err := p.pipeline.SetState(gst.StatePlaying) @@ -113,6 +113,8 @@ func (p *Pipeline) Start() error { } func (p *Pipeline) Stop() error { + p.Lock() + defer p.Unlock() err := p.pipeline.SetState(gst.StateNull) if err != nil { p.logger.Error().Msgf("Failed to set pipeline %s to null: %v", p.Name, err) @@ -138,6 +140,7 @@ func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolo log.Info().Msgf("Updating overlays with %d text overlays", len(updates.Overlays)) var wg sync.WaitGroup + p.Lock() for _, overlay := range updates.Overlays { wg.Add(1) go func(overlay Overlay) { @@ -149,14 +152,27 @@ func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolo } overlay.setElement(o.getElement()) overlay.update(log) - + p.Overlays[overlay.getName()] = overlay }(overlay) - } wg.Wait() + p.Unlock() + p.notifySubscribers() log.Debug().Msg("Overlays updated") - //TODO channel done } } - +} + +func (p *Pipeline) SubscribeToUpdates(updateFunc NotifyOnPipelineUpdate) { + p.Lock() + p.notifyOnUpdate = append(p.notifyOnUpdate, updateFunc) + p.Unlock() +} + +func (p *Pipeline) notifySubscribers() { + p.RLock() + for _, updateFunc := range p.notifyOnUpdate { + go updateFunc(p) + } + p.RUnlock() } diff --git a/gstreamer/types.go b/gstreamer/types.go index d4ca7c2..ee2ae7a 100644 --- a/gstreamer/types.go +++ b/gstreamer/types.go @@ -9,12 +9,16 @@ import ( ) type Gstreamer struct { - Pipelines map[string]*Pipeline + pipelineSlots map[string][]string + Pipelines map[string]*Pipeline + notifyOnUpdate map[string][]NotifyOnPipelineUpdate *Decklink ctx context.Context sync.RWMutex } +type NotifyOnPipelineUpdate func(pipeline *Pipeline) + type Pipeline struct { Name string ID string @@ -22,6 +26,7 @@ type Pipeline struct { OutputElement OutputElement Overlays map[string]Overlay PipelineUpdates chan PipelineUpdates + notifyOnUpdate []NotifyOnPipelineUpdate pipeline *gst.Pipeline logger zerolog.Logger elements map[string]element