package main import ( "errors" "flag" "fmt" "github.com/labstack/echo-contrib/echoprometheus" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/oschwald/maxminddb-golang" "github.com/prometheus/client_golang/prometheus" "net" "net/http" ) type Telemetry struct { Slug string `json:"slug"` EventType string `json:"type"` QualityChangeUp bool `json:"isUp"` Offset float32 `json:"offset"` Relay string `json:"relay"` } type metrics struct { bufferingSlug *prometheus.CounterVec bufferingRelay *prometheus.CounterVec recoverySlug *prometheus.CounterVec recoveryRelay *prometheus.CounterVec errorMetricSlug *prometheus.CounterVec errorMetricRelay *prometheus.CounterVec qualityUpSlug *prometheus.CounterVec qualityUpRelay *prometheus.CounterVec qualityDownSlug *prometheus.CounterVec qualityDownRelay *prometheus.CounterVec } const ( typeBuffering = "buffering" typeRecovery = "recovery" typeError = "error" typeQualitySwitch = "quality_switch" mmdbPath = "/tmp/ripe.mmdb" ) func main() { m, err := initMetrics() if err != nil { panic(err) } mmdb, err := maxminddb.Open(mmdbPath) if err != nil { panic(err) } listenAddress := ":2342" flag.StringVar(&listenAddress, "listen", listenAddress, "address to listen on") flag.Parse() e := echo.New() e.Use(middleware.Decompress()) e.Use(middleware.Logger()) e.Use(middleware.Recover()) e.Use(middleware.BodyLimit("64K")) e.Use(middleware.RateLimiter(middleware.NewRateLimiterMemoryStore(5))) e.Use(echoprometheus.NewMiddleware("http")) e.GET("/metrics", echoprometheus.NewHandler()) e.POST("/", telemetryHandler(m, mmdb)) e.Logger.Fatal(e.Start(listenAddress)) } func telemetryHandler(m metrics, mmdb *maxminddb.Reader) echo.HandlerFunc { return func(c echo.Context) error { t := new([]Telemetry) if err := c.Bind(t); err != nil { return c.String(http.StatusBadRequest, "bad request") } ip := c.RealIP() if ip == "" { //log all headers for k, v := range c.Request().Header { fmt.Printf("Header: %s: %s\n", k, v) } } asn, err := queryMMDB(mmdb, ip) if err != nil { asn = "unknown" } for _, v := range *t { switch v.EventType { case typeBuffering: m.bufferingSlug.WithLabelValues(v.Slug).Inc() m.bufferingRelay.WithLabelValues(v.Relay, asn).Inc() case typeRecovery: m.recoverySlug.WithLabelValues(v.Slug).Inc() m.recoveryRelay.WithLabelValues(v.Relay, asn).Inc() case typeError: m.errorMetricSlug.WithLabelValues(v.Slug).Inc() m.errorMetricRelay.WithLabelValues(v.Relay, asn).Inc() case typeQualitySwitch: if v.QualityChangeUp { m.qualityUpSlug.WithLabelValues(v.Slug).Inc() m.qualityUpRelay.WithLabelValues(v.Relay, asn).Inc() } else { m.qualityDownSlug.WithLabelValues(v.Slug).Inc() m.qualityDownRelay.WithLabelValues(v.Relay, asn).Inc() } } } return c.NoContent(http.StatusOK) } } func initMetrics() (metrics, error) { metricBufferingSlug := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "slug", Name: "buffering", }, []string{"slug"}) metricBufferingRelay := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "relay", Name: "buffering", }, []string{"relay", "asn"}) metricRecoverySlug := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "slug", Name: "recovery", }, []string{"slug"}) metricRecoveryRelay := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "relay", Name: "recovery", }, []string{"relay", "asn"}) metricErrorSlug := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "slug", Name: "error", }, []string{"slug"}) metricErrorRelay := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "relay", Name: "error", }, []string{"relay", "asn"}) metricQualitySwitchUpSlug := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "slug", Name: "quality_switch_up", }, []string{"slug"}) metricQualitySwitchUpRelay := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "relay", Name: "quality_switch_up", }, []string{"relay", "asn"}) metricQualitySwitchDownSlug := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "slug", Name: "quality_switch_down", }, []string{"slug"}) metricQualitySwitchDownRelay := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "telemetry", Subsystem: "relay", Name: "quality_switch_down", }, []string{"relay", "asn"}) if err := prometheus.Register(metricBufferingSlug); err != nil { return metrics{}, fmt.Errorf("failed to register buffering slug metric: %w", err) } if err := prometheus.Register(metricBufferingRelay); err != nil { return metrics{}, fmt.Errorf("failed to register buffering relay metric: %w", err) } if err := prometheus.Register(metricRecoverySlug); err != nil { return metrics{}, fmt.Errorf("failed to register recovery slug metric: %w", err) } if err := prometheus.Register(metricRecoveryRelay); err != nil { return metrics{}, fmt.Errorf("failed to register recovery relay metric: %w", err) } if err := prometheus.Register(metricErrorSlug); err != nil { return metrics{}, fmt.Errorf("failed to register error slug metric: %w", err) } if err := prometheus.Register(metricErrorRelay); err != nil { return metrics{}, fmt.Errorf("failed to register error relay metric: %w", err) } if err := prometheus.Register(metricQualitySwitchUpSlug); err != nil { return metrics{}, fmt.Errorf("failed to register quality_switch_up slug metric: %w", err) } if err := prometheus.Register(metricQualitySwitchUpRelay); err != nil { return metrics{}, fmt.Errorf("failed to register quality_switch_up relay metric: %w", err) } if err := prometheus.Register(metricQualitySwitchDownSlug); err != nil { return metrics{}, fmt.Errorf("failed to register quality_switch_down slug metric: %w", err) } if err := prometheus.Register(metricQualitySwitchDownRelay); err != nil { return metrics{}, fmt.Errorf("failed to register quality_switch_down relay metric: %w", err) } return metrics{ bufferingSlug: metricBufferingSlug, bufferingRelay: metricBufferingRelay, recoverySlug: metricRecoverySlug, recoveryRelay: metricRecoveryRelay, errorMetricSlug: metricErrorSlug, errorMetricRelay: metricErrorRelay, qualityUpSlug: metricQualitySwitchUpSlug, qualityUpRelay: metricQualitySwitchUpRelay, qualityDownSlug: metricQualitySwitchDownSlug, qualityDownRelay: metricQualitySwitchDownRelay, }, nil } func queryMMDB(mmdb *maxminddb.Reader, ip string) (string, error) { var record map[string]interface{} err := mmdb.Lookup(net.ParseIP(ip), &record) if err != nil { return "", fmt.Errorf("error looking up IP in MMDB: %w", err) } asn, ok := record["asn"].(string) if !ok { return "", errors.New("asn is not a string") } return asn, nil }