make pipeline and slot updates subscribable

This commit is contained in:
Garionion 2022-02-09 00:04:00 +01:00
parent aa6d0ed864
commit 727ffafe5b
5 changed files with 83 additions and 56 deletions

View file

@ -11,8 +11,6 @@ import (
) )
type PipelineService struct { type PipelineService struct {
pipelineSlots map[string][]string
pipelineslotmapping map[string]string
UnimplementedPipelineServiceServer UnimplementedPipelineServiceServer
*gstreamer.Gstreamer *gstreamer.Gstreamer
Font, FontWeight string //default font Font, FontWeight string //default font
@ -21,65 +19,44 @@ type PipelineService struct {
sync.RWMutex sync.RWMutex
} }
func (s *PipelineService) UpdatePipeline(ctx context.Context, updates *PipelineUpdates) (*PipelineUpdateResponse, error) { func (s *PipelineService) UpdatePipeline(_ context.Context, updates *PipelineUpdates) (*PipelineUpdateResponse, error) {
pipeline, err := s.Gstreamer.GetPipeline(updates.Id.GetId()) pipeline, err := s.Gstreamer.GetPipeline(updates.Id.GetId())
if err != nil { if err != nil {
log.Error().Str("ID", updates.Id.GetId()).Err(err) log.Error().Str("ID", updates.Id.GetId()).Err(err)
} else { } else {
overlays := s.convertRPCOverlayToGstreamerOverlay(updates.GetOverlays()) overlays := s.convertRPCOverlayToGstreamerOverlay(updates.GetOverlays())
s.Lock()
pipeline.PipelineUpdates <- gstreamer.PipelineUpdates{Overlays: overlays} pipeline.PipelineUpdates <- gstreamer.PipelineUpdates{Overlays: overlays}
s.Unlock()
} }
return &PipelineUpdateResponse{PipelineId: &PipelineID{Id: updates.Id.GetId()}}, nil return &PipelineUpdateResponse{PipelineId: &PipelineID{Id: updates.Id.GetId()}}, nil
} }
func (s *PipelineService) StopPipeline(ctx context.Context, id *PipelineID) (*PipelineStopResponse, error) { func (s *PipelineService) StopPipeline(_ context.Context, id *PipelineID) (*PipelineStopResponse, error) {
pipeline, err := s.Gstreamer.GetPipeline(id.GetId()) pipeline, err := s.Gstreamer.GetPipeline(id.GetId())
if err != nil { if err != nil {
log.Error().Str("ID", id.GetId()).Err(err) log.Error().Str("ID", id.GetId()).Err(err)
} else { } else {
pipeline.Lock()
err = pipeline.Stop() err = pipeline.Stop()
pipeline.Unlock()
} }
return &PipelineStopResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err return &PipelineStopResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err
} }
func (s *PipelineService) StartPipeline(ctx context.Context, id *PipelineID) (*PipelineStartResponse, error) { func (s *PipelineService) StartPipeline(_ context.Context, id *PipelineID) (*PipelineStartResponse, error) {
pipeline, err := s.Gstreamer.GetPipeline(id.GetId()) pipeline, err := s.Gstreamer.GetPipeline(id.GetId())
if err != nil { if err != nil {
log.Error().Str("ID", id.GetId()).Err(err) log.Error().Str("ID", id.GetId()).Err(err)
} else { } else {
pipeline.Lock()
err = pipeline.Start() err = pipeline.Start()
pipeline.Unlock()
} }
return &PipelineStartResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err return &PipelineStartResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err
} }
func (s *PipelineService) DeletePipeline(ctx context.Context, id *PipelineID) (*PipelineDeleteResponse, error) { func (s *PipelineService) DeletePipeline(_ context.Context, id *PipelineID) (*PipelineDeleteResponse, error) {
s.Gstreamer.DeletePipeline(id.GetId()) err := s.Gstreamer.DeletePipeline(id.GetId())
var err error
if slot, ok := s.pipelineslotmapping[id.GetId()]; ok {
s.Lock()
defer s.Unlock()
delete(s.pipelineslotmapping, id.GetId())
var i int
for i := range s.pipelineSlots[slot] {
if s.pipelineSlots[slot][i] == id.GetId() {
break
}
}
s.pipelineSlots[slot] = append(s.pipelineSlots[slot][:i], s.pipelineSlots[slot][i+1:]...)
} else {
err = fmt.Errorf("Pipeline %s not found", id.GetId())
}
return &PipelineDeleteResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err return &PipelineDeleteResponse{PipelineId: &PipelineID{Id: id.GetId()}}, err
} }
func (s *PipelineService) CreatePipeline(ctx context.Context, pipeline *Pipeline) (*PipelineCreationResponse, error) { func (s *PipelineService) CreatePipeline(_ context.Context, pipeline *Pipeline) (*PipelineCreationResponse, error) {
uid, _ := uuid.NewRandom() uid, _ := uuid.NewRandom()
id := uid.String() id := uid.String()
gstreamerPipeline := &gstreamer.Pipeline{ gstreamerPipeline := &gstreamer.Pipeline{
@ -103,20 +80,7 @@ func (s *PipelineService) CreatePipeline(ctx context.Context, pipeline *Pipeline
log.Error().Err(err) log.Error().Err(err)
return nil, err return nil, err
} }
s.Lock()
if s.pipelineSlots == nil {
s.pipelineSlots = make(map[string][]string)
}
if _, ok := s.pipelineSlots[pipeline.GetName()]; ok {
s.pipelineSlots[pipeline.GetName()] = append(s.pipelineSlots[pipeline.GetName()], id)
} else {
s.pipelineSlots[pipeline.GetName()] = []string{id}
}
if s.pipelineslotmapping == nil {
s.pipelineslotmapping = make(map[string]string)
}
s.pipelineslotmapping[id] = pipeline.GetName()
s.Unlock()
return &PipelineCreationResponse{ return &PipelineCreationResponse{
PipelineId: &PipelineID{Id: id}, PipelineId: &PipelineID{Id: id},
}, nil }, nil

View file

@ -6,6 +6,7 @@ mkShell {
gcc gcc
gst_all_1.gstreamer gst_all_1.gstreamer
gst_all_1.gstreamer.dev gst_all_1.gstreamer.dev
gst_all_1.gst-libav
gst_all_1.gst-plugins-base gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-bad

View file

@ -19,19 +19,42 @@ func Init() *Gstreamer {
gstreamer = &Gstreamer{ gstreamer = &Gstreamer{
ctx: context.Background(), ctx: context.Background(),
Pipelines: map[string]*Pipeline{}, Pipelines: map[string]*Pipeline{},
pipelineSlots: make(map[string][]string),
notifyOnUpdate: map[string][]NotifyOnPipelineUpdate{},
Decklink: decklink, Decklink: decklink,
} }
return gstreamer return gstreamer
} }
func (g *Gstreamer) DeletePipeline(id string) { func (g *Gstreamer) AddPipeline(pipeline *Pipeline) {
g.Lock()
defer g.Unlock()
if _, ok := g.pipelineSlots[pipeline.Name]; ok {
g.pipelineSlots[pipeline.Name] = append(g.pipelineSlots[pipeline.Name], pipeline.ID)
} else {
g.pipelineSlots[pipeline.Name] = []string{pipeline.ID}
}
g.Pipelines[pipeline.ID] = pipeline
pipeline.SubscribeToUpdates(g.notifySlotSubscriber())
}
func (g *Gstreamer) DeletePipeline(id string) error {
g.Lock() g.Lock()
defer g.Unlock() defer g.Unlock()
if pipeline, err := g.getPipeline(id); err == nil { if pipeline, err := g.getPipeline(id); err == nil {
var i int
for i := range g.pipelineSlots[pipeline.Name] {
if g.pipelineSlots[pipeline.Name][i] == pipeline.ID {
break
}
}
g.pipelineSlots[pipeline.Name] = append(g.pipelineSlots[pipeline.Name][:i], g.pipelineSlots[pipeline.Name][i+1:]...)
delete(g.Pipelines, id) delete(g.Pipelines, id)
pipeline.ctxCancel() pipeline.ctxCancel()
return nil
} }
return return errors.New("pipeline not found")
} }
func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) { func (g *Gstreamer) GetPipeline(id string) (*Pipeline, error) {
@ -47,3 +70,21 @@ func (g *Gstreamer) getPipeline(id string) (*Pipeline, error) {
} }
return nil, errors.New("Pipeline not found") return nil, errors.New("Pipeline not found")
} }
func (g *Gstreamer) SubscribeToSlotUpdates(slot string, callback NotifyOnPipelineUpdate) {
g.Lock()
defer g.Unlock()
if _, ok := g.notifyOnUpdate[slot]; ok {
g.notifyOnUpdate[slot] = append(g.notifyOnUpdate[slot], callback)
} else {
g.notifyOnUpdate[slot] = []NotifyOnPipelineUpdate{callback}
}
}
func (g *Gstreamer) notifySlotSubscriber() NotifyOnPipelineUpdate {
return func(pipeline *Pipeline) {
for _, callback := range g.notifyOnUpdate[pipeline.Name] {
callback(pipeline)
}
}
}

View file

@ -93,15 +93,15 @@ func (p *Pipeline) Create() (chan PipelineUpdates, error) {
updateChannel := make(chan PipelineUpdates) updateChannel := make(chan PipelineUpdates)
go p.overlayUpdater(updateChannel, gstreamerLogger) go p.overlayUpdater(updateChannel, gstreamerLogger)
p.PipelineUpdates = updateChannel p.PipelineUpdates = updateChannel
gstreamer.Lock() gstreamer.AddPipeline(p)
gstreamer.Pipelines[p.ID] = p
gstreamer.Unlock()
return updateChannel, nil return updateChannel, nil
} }
//TODO check if output is decklink //TODO check if output is decklink
func (p *Pipeline) Start() error { func (p *Pipeline) Start() error {
p.Lock()
defer p.Unlock()
//output, err := gstreamer.getDecklinkOutput(p.Name) //output, err := gstreamer.getDecklinkOutput(p.Name)
//setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output) //setPropertyWrapper(p.elements[p.OutputElement.Name].element, "device-number", output)
err := p.pipeline.SetState(gst.StatePlaying) err := p.pipeline.SetState(gst.StatePlaying)
@ -113,6 +113,8 @@ func (p *Pipeline) Start() error {
} }
func (p *Pipeline) Stop() error { func (p *Pipeline) Stop() error {
p.Lock()
defer p.Unlock()
err := p.pipeline.SetState(gst.StateNull) err := p.pipeline.SetState(gst.StateNull)
if err != nil { if err != nil {
p.logger.Error().Msgf("Failed to set pipeline %s to null: %v", p.Name, err) p.logger.Error().Msgf("Failed to set pipeline %s to null: %v", p.Name, err)
@ -138,6 +140,7 @@ func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolo
log.Info().Msgf("Updating overlays with %d text overlays", len(updates.Overlays)) log.Info().Msgf("Updating overlays with %d text overlays", len(updates.Overlays))
var wg sync.WaitGroup var wg sync.WaitGroup
p.Lock()
for _, overlay := range updates.Overlays { for _, overlay := range updates.Overlays {
wg.Add(1) wg.Add(1)
go func(overlay Overlay) { go func(overlay Overlay) {
@ -149,14 +152,27 @@ func (p *Pipeline) overlayUpdater(updateChannel chan PipelineUpdates, log zerolo
} }
overlay.setElement(o.getElement()) overlay.setElement(o.getElement())
overlay.update(log) overlay.update(log)
p.Overlays[overlay.getName()] = overlay
}(overlay) }(overlay)
} }
wg.Wait() wg.Wait()
p.Unlock()
p.notifySubscribers()
log.Debug().Msg("Overlays updated") log.Debug().Msg("Overlays updated")
//TODO channel done }
} }
} }
func (p *Pipeline) SubscribeToUpdates(updateFunc NotifyOnPipelineUpdate) {
p.Lock()
p.notifyOnUpdate = append(p.notifyOnUpdate, updateFunc)
p.Unlock()
}
func (p *Pipeline) notifySubscribers() {
p.RLock()
for _, updateFunc := range p.notifyOnUpdate {
go updateFunc(p)
}
p.RUnlock()
} }

View file

@ -9,12 +9,16 @@ import (
) )
type Gstreamer struct { type Gstreamer struct {
pipelineSlots map[string][]string
Pipelines map[string]*Pipeline Pipelines map[string]*Pipeline
notifyOnUpdate map[string][]NotifyOnPipelineUpdate
*Decklink *Decklink
ctx context.Context ctx context.Context
sync.RWMutex sync.RWMutex
} }
type NotifyOnPipelineUpdate func(pipeline *Pipeline)
type Pipeline struct { type Pipeline struct {
Name string Name string
ID string ID string
@ -22,6 +26,7 @@ type Pipeline struct {
OutputElement OutputElement OutputElement OutputElement
Overlays map[string]Overlay Overlays map[string]Overlay
PipelineUpdates chan PipelineUpdates PipelineUpdates chan PipelineUpdates
notifyOnUpdate []NotifyOnPipelineUpdate
pipeline *gst.Pipeline pipeline *gst.Pipeline
logger zerolog.Logger logger zerolog.Logger
elements map[string]element elements map[string]element