254 lines
7.1 KiB
Go
254 lines
7.1 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"github.com/labstack/echo-contrib/echoprometheus"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/labstack/echo/v4/middleware"
|
|
"github.com/oschwald/maxminddb-golang"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"net"
|
|
"net/http"
|
|
)
|
|
|
|
type Telemetry struct {
|
|
Slug string `json:"slug"`
|
|
EventType string `json:"type"`
|
|
QualityChangeUp bool `json:"isUp"`
|
|
Offset float32 `json:"offset"`
|
|
Relay string `json:"relay"`
|
|
}
|
|
|
|
type metrics struct {
|
|
bufferingSlug *prometheus.CounterVec
|
|
bufferingRelay *prometheus.CounterVec
|
|
recoverySlug *prometheus.CounterVec
|
|
recoveryRelay *prometheus.CounterVec
|
|
errorMetricSlug *prometheus.CounterVec
|
|
errorMetricRelay *prometheus.CounterVec
|
|
qualityUpSlug *prometheus.CounterVec
|
|
qualityUpRelay *prometheus.CounterVec
|
|
qualityDownSlug *prometheus.CounterVec
|
|
qualityDownRelay *prometheus.CounterVec
|
|
}
|
|
|
|
const (
|
|
typeBuffering = "buffering"
|
|
typeRecovery = "recovery"
|
|
typeError = "error"
|
|
typeQualitySwitch = "quality_switch"
|
|
|
|
mmdbPath = "/tmp/ripe.mmdb"
|
|
)
|
|
|
|
func main() {
|
|
m, err := initMetrics()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
mmdb, err := maxminddb.Open(mmdbPath)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
listenAddress := ":2342"
|
|
flag.StringVar(&listenAddress, "listen", listenAddress, "address to listen on")
|
|
flag.Parse()
|
|
|
|
e := echo.New()
|
|
|
|
e.Use(middleware.Decompress())
|
|
|
|
e.Use(middleware.Logger())
|
|
e.Use(middleware.Recover())
|
|
|
|
e.Use(middleware.BodyLimit("64K"))
|
|
e.Use(middleware.RateLimiter(middleware.NewRateLimiterMemoryStore(5)))
|
|
|
|
e.Use(echoprometheus.NewMiddleware("http"))
|
|
e.GET("/metrics", echoprometheus.NewHandler())
|
|
|
|
e.POST("/", telemetryHandler(m, mmdb))
|
|
|
|
e.Logger.Fatal(e.Start(listenAddress))
|
|
}
|
|
|
|
func telemetryHandler(m metrics, mmdb *maxminddb.Reader) echo.HandlerFunc {
|
|
return func(c echo.Context) error {
|
|
t := new([]Telemetry)
|
|
if err := c.Bind(t); err != nil {
|
|
return c.String(http.StatusBadRequest, "bad request")
|
|
}
|
|
|
|
ip := c.RealIP()
|
|
if ip == "" {
|
|
//log all headers
|
|
for k, v := range c.Request().Header {
|
|
fmt.Printf("Header: %s: %s\n", k, v)
|
|
}
|
|
}
|
|
|
|
asn, err := queryMMDB(mmdb, ip)
|
|
if err != nil {
|
|
asn = "unknown"
|
|
}
|
|
|
|
for _, v := range *t {
|
|
switch v.EventType {
|
|
case typeBuffering:
|
|
m.bufferingSlug.WithLabelValues(v.Slug).Inc()
|
|
m.bufferingRelay.WithLabelValues(v.Relay, asn).Inc()
|
|
case typeRecovery:
|
|
m.recoverySlug.WithLabelValues(v.Slug).Inc()
|
|
m.recoveryRelay.WithLabelValues(v.Relay, asn).Inc()
|
|
case typeError:
|
|
m.errorMetricSlug.WithLabelValues(v.Slug).Inc()
|
|
m.errorMetricRelay.WithLabelValues(v.Relay, asn).Inc()
|
|
case typeQualitySwitch:
|
|
if v.QualityChangeUp {
|
|
m.qualityUpSlug.WithLabelValues(v.Slug).Inc()
|
|
m.qualityUpRelay.WithLabelValues(v.Relay, asn).Inc()
|
|
} else {
|
|
m.qualityDownSlug.WithLabelValues(v.Slug).Inc()
|
|
m.qualityDownRelay.WithLabelValues(v.Relay, asn).Inc()
|
|
}
|
|
}
|
|
}
|
|
|
|
return c.NoContent(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func initMetrics() (metrics, error) {
|
|
metricBufferingSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "slug",
|
|
Name: "buffering",
|
|
}, []string{"slug"})
|
|
|
|
metricBufferingRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "relay",
|
|
Name: "buffering",
|
|
}, []string{"relay", "asn"})
|
|
|
|
metricRecoverySlug := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "slug",
|
|
Name: "recovery",
|
|
}, []string{"slug"})
|
|
|
|
metricRecoveryRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "relay",
|
|
Name: "recovery",
|
|
}, []string{"relay", "asn"})
|
|
|
|
metricErrorSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "slug",
|
|
Name: "error",
|
|
}, []string{"slug"})
|
|
|
|
metricErrorRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "relay",
|
|
Name: "error",
|
|
}, []string{"relay", "asn"})
|
|
|
|
metricQualitySwitchUpSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "slug",
|
|
Name: "quality_switch_up",
|
|
}, []string{"slug"})
|
|
|
|
metricQualitySwitchUpRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "relay",
|
|
Name: "quality_switch_up",
|
|
}, []string{"relay", "asn"})
|
|
|
|
metricQualitySwitchDownSlug := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "slug",
|
|
Name: "quality_switch_down",
|
|
}, []string{"slug"})
|
|
|
|
metricQualitySwitchDownRelay := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "telemetry",
|
|
Subsystem: "relay",
|
|
Name: "quality_switch_down",
|
|
}, []string{"relay", "asn"})
|
|
|
|
if err := prometheus.Register(metricBufferingSlug); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register buffering slug metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricBufferingRelay); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register buffering relay metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricRecoverySlug); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register recovery slug metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricRecoveryRelay); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register recovery relay metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricErrorSlug); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register error slug metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricErrorRelay); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register error relay metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricQualitySwitchUpSlug); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register quality_switch_up slug metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricQualitySwitchUpRelay); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register quality_switch_up relay metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricQualitySwitchDownSlug); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register quality_switch_down slug metric: %w", err)
|
|
}
|
|
|
|
if err := prometheus.Register(metricQualitySwitchDownRelay); err != nil {
|
|
return metrics{}, fmt.Errorf("failed to register quality_switch_down relay metric: %w", err)
|
|
}
|
|
|
|
return metrics{
|
|
bufferingSlug: metricBufferingSlug,
|
|
bufferingRelay: metricBufferingRelay,
|
|
recoverySlug: metricRecoverySlug,
|
|
recoveryRelay: metricRecoveryRelay,
|
|
errorMetricSlug: metricErrorSlug,
|
|
errorMetricRelay: metricErrorRelay,
|
|
qualityUpSlug: metricQualitySwitchUpSlug,
|
|
qualityUpRelay: metricQualitySwitchUpRelay,
|
|
qualityDownSlug: metricQualitySwitchDownSlug,
|
|
qualityDownRelay: metricQualitySwitchDownRelay,
|
|
}, nil
|
|
}
|
|
|
|
func queryMMDB(mmdb *maxminddb.Reader, ip string) (string, error) {
|
|
var record map[string]interface{}
|
|
|
|
err := mmdb.Lookup(net.ParseIP(ip), &record)
|
|
if err != nil {
|
|
return "", fmt.Errorf("error looking up IP in MMDB: %w", err)
|
|
}
|
|
|
|
asn, ok := record["asn"].(string)
|
|
if !ok {
|
|
return "", errors.New("asn is not a string")
|
|
}
|
|
return asn, nil
|
|
}
|