voc-telemetry/main.go

254 lines
7.1 KiB
Go

package main
import (
"errors"
"flag"
"fmt"
"github.com/labstack/echo-contrib/echoprometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/oschwald/maxminddb-golang"
"github.com/prometheus/client_golang/prometheus"
"net"
"net/http"
)
type Telemetry struct {
Slug string `json:"slug"`
EventType string `json:"type"`
QualityChangeUp bool `json:"isUp"`
Offset float32 `json:"offset"`
Relay string `json:"relay"`
}
type metrics struct {
bufferingSlug *prometheus.CounterVec
bufferingRelay *prometheus.CounterVec
recoverySlug *prometheus.CounterVec
recoveryRelay *prometheus.CounterVec
errorMetricSlug *prometheus.CounterVec
errorMetricRelay *prometheus.CounterVec
qualityUpSlug *prometheus.CounterVec
qualityUpRelay *prometheus.CounterVec
qualityDownSlug *prometheus.CounterVec
qualityDownRelay *prometheus.CounterVec
}
const (
typeBuffering = "buffering"
typeRecovery = "recovery"
typeError = "error"
typeQualitySwitch = "quality_switch"
mmdbPath = "/tmp/ripe.mmdb"
)
func main() {
m, err := initMetrics()
if err != nil {
panic(err)
}
mmdb, err := maxminddb.Open(mmdbPath)
if err != nil {
panic(err)
}
listenAddress := ":2342"
flag.StringVar(&listenAddress, "listen", listenAddress, "address to listen on")
flag.Parse()
e := echo.New()
e.Use(middleware.Decompress())
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.Use(middleware.BodyLimit("64K"))
e.Use(middleware.RateLimiter(middleware.NewRateLimiterMemoryStore(5)))
e.Use(echoprometheus.NewMiddleware("http"))
e.GET("/metrics", echoprometheus.NewHandler())
e.POST("/", telemetryHandler(m, mmdb))
e.Logger.Fatal(e.Start(listenAddress))
}
func telemetryHandler(m metrics, mmdb *maxminddb.Reader) echo.HandlerFunc {
return func(c echo.Context) error {
t := new([]Telemetry)
if err := c.Bind(t); err != nil {
return c.String(http.StatusBadRequest, "bad request")
}
ip := c.RealIP()
if ip == "" {
//log all headers
for k, v := range c.Request().Header {
fmt.Printf("Header: %s: %s\n", k, v)
}
}
asn, err := queryMMDB(mmdb, ip)
if err != nil {
asn = "unknown"
}
for _, v := range *t {
switch v.EventType {
case typeBuffering:
m.bufferingSlug.WithLabelValues(v.Slug).Inc()
m.bufferingRelay.WithLabelValues(v.Relay, asn).Inc()
case typeRecovery:
m.recoverySlug.WithLabelValues(v.Slug).Inc()
m.recoveryRelay.WithLabelValues(v.Relay, asn).Inc()
case typeError:
m.errorMetricSlug.WithLabelValues(v.Slug).Inc()
m.errorMetricRelay.WithLabelValues(v.Relay, asn).Inc()
case typeQualitySwitch:
if v.QualityChangeUp {
m.qualityUpSlug.WithLabelValues(v.Slug).Inc()
m.qualityUpRelay.WithLabelValues(v.Relay, asn).Inc()
} else {
m.qualityDownSlug.WithLabelValues(v.Slug).Inc()
m.qualityDownRelay.WithLabelValues(v.Relay, asn).Inc()
}
}
}
return c.NoContent(http.StatusOK)
}
}
func initMetrics() (metrics, error) {
metricBufferingSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "slug",
Name: "buffering",
}, []string{"slug"})
metricBufferingRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "relay",
Name: "buffering",
}, []string{"relay", "asn"})
metricRecoverySlug := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "slug",
Name: "recovery",
}, []string{"slug"})
metricRecoveryRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "relay",
Name: "recovery",
}, []string{"relay", "asn"})
metricErrorSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "slug",
Name: "error",
}, []string{"slug"})
metricErrorRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "relay",
Name: "error",
}, []string{"relay", "asn"})
metricQualitySwitchUpSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "slug",
Name: "quality_switch_up",
}, []string{"slug"})
metricQualitySwitchUpRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "relay",
Name: "quality_switch_up",
}, []string{"relay", "asn"})
metricQualitySwitchDownSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "slug",
Name: "quality_switch_down",
}, []string{"slug"})
metricQualitySwitchDownRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "telemetry",
Subsystem: "relay",
Name: "quality_switch_down",
}, []string{"relay", "asn"})
if err := prometheus.Register(metricBufferingSlug); err != nil {
return metrics{}, fmt.Errorf("failed to register buffering slug metric: %w", err)
}
if err := prometheus.Register(metricBufferingRelay); err != nil {
return metrics{}, fmt.Errorf("failed to register buffering relay metric: %w", err)
}
if err := prometheus.Register(metricRecoverySlug); err != nil {
return metrics{}, fmt.Errorf("failed to register recovery slug metric: %w", err)
}
if err := prometheus.Register(metricRecoveryRelay); err != nil {
return metrics{}, fmt.Errorf("failed to register recovery relay metric: %w", err)
}
if err := prometheus.Register(metricErrorSlug); err != nil {
return metrics{}, fmt.Errorf("failed to register error slug metric: %w", err)
}
if err := prometheus.Register(metricErrorRelay); err != nil {
return metrics{}, fmt.Errorf("failed to register error relay metric: %w", err)
}
if err := prometheus.Register(metricQualitySwitchUpSlug); err != nil {
return metrics{}, fmt.Errorf("failed to register quality_switch_up slug metric: %w", err)
}
if err := prometheus.Register(metricQualitySwitchUpRelay); err != nil {
return metrics{}, fmt.Errorf("failed to register quality_switch_up relay metric: %w", err)
}
if err := prometheus.Register(metricQualitySwitchDownSlug); err != nil {
return metrics{}, fmt.Errorf("failed to register quality_switch_down slug metric: %w", err)
}
if err := prometheus.Register(metricQualitySwitchDownRelay); err != nil {
return metrics{}, fmt.Errorf("failed to register quality_switch_down relay metric: %w", err)
}
return metrics{
bufferingSlug: metricBufferingSlug,
bufferingRelay: metricBufferingRelay,
recoverySlug: metricRecoverySlug,
recoveryRelay: metricRecoveryRelay,
errorMetricSlug: metricErrorSlug,
errorMetricRelay: metricErrorRelay,
qualityUpSlug: metricQualitySwitchUpSlug,
qualityUpRelay: metricQualitySwitchUpRelay,
qualityDownSlug: metricQualitySwitchDownSlug,
qualityDownRelay: metricQualitySwitchDownRelay,
}, nil
}
func queryMMDB(mmdb *maxminddb.Reader, ip string) (string, error) {
var record map[string]interface{}
err := mmdb.Lookup(net.ParseIP(ip), &record)
if err != nil {
return "", fmt.Errorf("error looking up IP in MMDB: %w", err)
}
asn, ok := record["asn"].(string)
if !ok {
return "", errors.New("asn is not a string")
}
return asn, nil
}