berkutschi/berkutschi/websocket.go

130 lines
2.8 KiB
Go

package berkutschi
import (
"context"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"nhooyr.io/websocket"
)
var u = url.URL{Scheme: "wss", Host: "live.berkutschi.com", Path: "/faye"}
type Berkutschi struct {
conn *websocket.Conn
event int
clientID string
log zerolog.Logger
ctx context.Context
TX chan BerkutschiClientMessages
RX chan []byte
}
func Init(event int) *Berkutschi {
l := log.With().Int("event", event).Logger()
b := &Berkutschi{
event: event,
TX: make(chan BerkutschiClientMessages),
RX: make(chan []byte),
log: l,
}
b.registerClient()
b.connect()
b.connectAndSubscribe()
return b
}
func (b *Berkutschi) connectAndSubscribe() {
connectMessage := BerkutschiConnectMessage{
Channel: "/meta/connect",
ClientID: b.clientID,
ConnectionType: "websocket",
ID: "2",
}
subscribeMessage := BerkutschiSubscribeMessage{
Channel: "/meta/subscribe",
ClientID: b.clientID,
Subscription: fmt.Sprintf("/messages/%d", b.event),
ID: "3",
}
b.TX <- BerkutschiConnectMessages{connectMessage}
b.log.Debug().Msgf("connected to berkutschi")
b.TX <- BerkutschiSubscribeMessages{subscribeMessage}
b.log.Debug().Msgf("subscribed to event %d", b.event)
}
func (b *Berkutschi) connect() error {
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
c, _, err := websocket.Dial(ctx, u.String(), nil)
if err != nil {
b.log.Err(fmt.Errorf("Error connecting to websocket: %s", err)).Send()
}
b.log.Debug().Msgf("Connected to websocket")
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
_, message, err := c.Read(ctx)
if err != nil {
b.log.Error().Err(fmt.Errorf("Error reading from websocket, reconnecting: %s", err)).Send()
return
}
b.RX <- message
}
}
}()
go func() {
defer cancel()
for {
select {
case <-ctx.Done():
return
case message := <-b.TX:
byteMessage, _ := message.Marshal()
err := c.Write(ctx, websocket.MessageText, byteMessage)
if err != nil {
b.log.Error().Err(fmt.Errorf("Error writing to websocket, reconnecting: %s", err)).Send()
return
}
b.log.Debug().Msgf("Sent message: %v", message)
}
}
}()
go func() {
<-ctx.Done()
go b.closeAndReconnect()
}()
b.conn = c
return err
}
func (b *Berkutschi) closeAndReconnect() error {
b.registerClient()
b.log.Debug().Msgf("Reconnecting websocket connection")
b.conn.Close(websocket.StatusInternalError, "")
b.connect()
b.connectAndSubscribe()
return nil
}
func (m BerkutschiConnectMessages) Marshal() ([]byte, error) {
return json.Marshal(m)
}
func (m BerkutschiSubscribeMessages) Marshal() ([]byte, error) {
return json.Marshal(m)
}