package main import ( "fmt" "github.com/prometheus/client_golang/prometheus" "strconv" "strings" "sync" ) type Progress struct { streams map[string]*FfmpegProgress lock sync.RWMutex } type FfmpegProgress struct { Frame int `json:"frame"` Fps float64 `json:"fps"` Q float64 `json:"q"` Bitrate float64 `json:"bitrate"` Size int `json:"size"` Time int `json:"time"` DupFrames int `json:"dup_frames"` DropFrames int `json:"drop_frames"` lock sync.RWMutex } func NewProgress() *Progress { return &Progress{ streams: make(map[string]*FfmpegProgress), } } func (p *Progress) AddStream(stream string) { p.lock.Lock() defer p.lock.Unlock() p.streams[stream] = &FfmpegProgress{} } func (p *Progress) RemoveStream(stream string) { p.lock.Lock() defer p.lock.Unlock() delete(p.streams, stream) } func (p *Progress) IngestProgressValue(streamName, line string) { p.lock.RLock() defer p.lock.RUnlock() stream := p.streams[streamName] stream.lock.Lock() defer stream.lock.Unlock() defer func(p *Progress, streamName string, stream *FfmpegProgress) { p.streams[streamName] = stream }(p, streamName, stream) input := strings.Split(line, "=") if len(input) != 2 { return } key := input[0] value := strings.TrimSpace(input[1]) switch key { case "frame": frame, err := strconv.Atoi(value) if err != nil { return } stream.Frame = frame case "fps": fps, err := strconv.ParseFloat(value, 64) if err != nil { return } stream.Fps = fps case "stream_0_0_q": q, err := strconv.ParseFloat(value, 64) if err != nil { return } stream.Q = q case "bitrate": kbitrate, err := strconv.ParseFloat(strings.TrimSuffix(value, "kbits/s"), 64) if err != nil { fmt.Println("parse bitrate error: ", err) return } stream.Bitrate = kbitrate * 1000 case "total_size": size, err := strconv.Atoi(value) if err != nil { return } stream.Size = size case "out_time_ms": timeMS, err := strconv.Atoi(value) if err != nil { return } stream.Time = timeMS case "dup_frames": dupFrames, err := strconv.Atoi(value) if err != nil { return } stream.DupFrames = dupFrames case "drop_frames": dropFrames, err := strconv.Atoi(value) if err != nil { return } stream.DropFrames = dropFrames } } func (p *Progress) Collect(ch chan<- prometheus.Metric) { p.lock.Lock() defer p.lock.Unlock() for name, stream := range p.streams { ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_frame", "Current frame", []string{"stream"}, nil), prometheus.GaugeValue, float64(stream.Frame), name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_fps", "Current fps", []string{"stream"}, nil), prometheus.GaugeValue, stream.Fps, name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_q", "Current q", []string{"stream"}, nil), prometheus.GaugeValue, stream.Q, name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_bitrate", "Current bitrate", []string{"stream"}, nil), prometheus.GaugeValue, stream.Bitrate, name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_size", "Current size", []string{"stream"}, nil), prometheus.GaugeValue, float64(stream.Size), name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_time", "Current time", []string{"stream"}, nil), prometheus.GaugeValue, float64(stream.Time), name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_dup_frames", "Current dup_frames", []string{"stream"}, nil), prometheus.GaugeValue, float64(stream.DupFrames), name, ) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc("ffmpeg_drop_frames", "Current drop_frames", []string{"stream"}, nil), prometheus.GaugeValue, float64(stream.DropFrames), name, ) } } func (p *Progress) Describe(ch chan<- *prometheus.Desc) { prometheus.DescribeByCollect(p, ch) }