Remove unstable update channel, replace with state updates

This commit is contained in:
Kristoffer Dalby 2021-10-05 16:24:46 +00:00
parent 8abc7575cd
commit a01a0d1039
3 changed files with 7 additions and 124 deletions

6
app.go
View file

@ -65,9 +65,6 @@ type Headscale struct {
aclPolicy *ACLPolicy aclPolicy *ACLPolicy
aclRules *[]tailcfg.FilterRule aclRules *[]tailcfg.FilterRule
clientsUpdateChannels sync.Map
clientsUpdateChannelMutex sync.Mutex
lastStateChange sync.Map lastStateChange sync.Map
} }
@ -145,10 +142,9 @@ func (h *Headscale) expireEphemeralNodesWorker() {
if err != nil { if err != nil {
log.Error().Err(err).Str("machine", m.Name).Msg("🤮 Cannot delete ephemeral machine from the database") log.Error().Err(err).Str("machine", m.Name).Msg("🤮 Cannot delete ephemeral machine from the database")
} }
updateRequestsFromNode.WithLabelValues("ephemeral-node-update").Inc()
h.notifyChangesToPeers(&m)
} }
} }
h.setLastStateChangeToNow(ns.Name)
} }
} }

View file

@ -2,7 +2,6 @@ package headscale
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"sort" "sort"
"strconv" "strconv"
@ -214,111 +213,6 @@ func (m *Machine) GetHostInfo() (*tailcfg.Hostinfo, error) {
return &hostinfo, nil return &hostinfo, nil
} }
func (h *Headscale) notifyChangesToPeers(m *Machine) {
peers, err := h.getPeers(m)
if err != nil {
log.Error().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Msgf("Error getting peers: %s", err)
return
}
for _, peer := range peers {
log.Info().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Str("address", peer.IPAddress).
Msgf("Notifying peer %s (%s)", peer.Name, peer.IPAddress)
err := h.sendRequestOnUpdateChannel(&peer)
if err != nil {
log.Info().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Msgf("Peer %s does not have an open update client, skipping.", peer.Name)
continue
}
log.Trace().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Str("address", peer.IPAddress).
Msgf("Notified peer %s (%s)", peer.Name, peer.IPAddress)
}
}
func (h *Headscale) getOrOpenUpdateChannel(m *Machine) <-chan struct{} {
var updateChan chan struct{}
if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok {
if unwrapped, ok := storedChan.(chan struct{}); ok {
updateChan = unwrapped
} else {
log.Error().
Str("handler", "openUpdateChannel").
Str("machine", m.Name).
Msg("Failed to convert update channel to struct{}")
}
} else {
log.Debug().
Str("handler", "openUpdateChannel").
Str("machine", m.Name).
Msg("Update channel not found, creating")
updateChan = make(chan struct{})
h.clientsUpdateChannels.Store(m.ID, updateChan)
}
return updateChan
}
func (h *Headscale) closeUpdateChannel(m *Machine) {
h.clientsUpdateChannelMutex.Lock()
defer h.clientsUpdateChannelMutex.Unlock()
if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok {
if unwrapped, ok := storedChan.(chan struct{}); ok {
close(unwrapped)
}
}
h.clientsUpdateChannels.Delete(m.ID)
}
func (h *Headscale) sendRequestOnUpdateChannel(m *Machine) error {
h.clientsUpdateChannelMutex.Lock()
defer h.clientsUpdateChannelMutex.Unlock()
pUp, ok := h.clientsUpdateChannels.Load(uint64(m.ID))
if ok {
log.Info().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Notifying peer %s", m.Name)
if update, ok := pUp.(chan struct{}); ok {
log.Trace().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Update channel is %#v", update)
updateRequestsToNode.Inc()
update <- struct{}{}
log.Trace().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Notified machine %s", m.Name)
}
} else {
err := errors.New("machine does not have an open update channel")
log.Info().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Machine %s does not have an open update channel", m.Name)
return err
}
return nil
}
func (h *Headscale) isOutdated(m *Machine) bool { func (h *Headscale) isOutdated(m *Machine) bool {
err := h.UpdateMachine(m) err := h.UpdateMachine(m)
if err != nil { if err != nil {

View file

@ -176,24 +176,17 @@ func (h *Headscale) checkForNamespacesPendingUpdates() {
return return
} }
names := []string{} namespaces := []string{}
err = json.Unmarshal([]byte(v), &names) err = json.Unmarshal([]byte(v), &namespaces)
if err != nil { if err != nil {
return return
} }
for _, name := range names { for _, namespace := range namespaces {
log.Trace(). log.Trace().
Str("func", "RequestMapUpdates"). Str("func", "RequestMapUpdates").
Str("machine", name). Str("machine", namespace).
Msg("Sending updates to nodes in namespace") Msg("Sending updates to nodes in namespacespace")
machines, err := h.ListMachinesInNamespace(name) h.setLastStateChangeToNow(namespace)
if err != nil {
continue
}
for _, m := range *machines {
updateRequestsFromNode.WithLabelValues("namespace-update").Inc()
h.notifyChangesToPeers(&m)
}
} }
newV, err := h.getValue("namespaces_pending_updates") newV, err := h.getValue("namespaces_pending_updates")
if err != nil { if err != nil {