From 5e9004c4076420ed3ecf706200084bbb044d4d33 Mon Sep 17 00:00:00 2001 From: Juan Font Alonso Date: Mon, 20 Jun 2022 21:40:28 +0200 Subject: [PATCH] Fix issues in the poll loop --- poll.go | 77 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/poll.go b/poll.go index af7edab..195a261 100644 --- a/poll.go +++ b/poll.go @@ -293,17 +293,17 @@ func (h *Headscale) PollNetMapStream( machine, ) + log.Trace(). + Str("handler", "PollNetMapStream"). + Str("machine", machine.Hostname). + Msg("Waiting for data to stream...") + + log.Trace(). + Str("handler", "PollNetMapStream"). + Str("machine", machine.Hostname). + Msgf("pollData is %#v, keepAliveChan is %#v, updateChan is %#v", pollDataChan, keepAliveChan, updateChan) + for { - log.Trace(). - Str("handler", "PollNetMapStream"). - Str("machine", machine.Hostname). - Msg("Waiting for data to stream...") - - log.Trace(). - Str("handler", "PollNetMapStream"). - Str("machine", machine.Hostname). - Msgf("pollData is %#v, keepAliveChan is %#v, updateChan is %#v", pollDataChan, keepAliveChan, updateChan) - select { case data := <-pollDataChan: log.Trace(). @@ -321,8 +321,9 @@ func (h *Headscale) PollNetMapStream( Err(err). Msg("Cannot write data") - break + return } + log.Trace(). Str("handler", "PollNetMapStream"). Str("machine", machine.Hostname). @@ -343,7 +344,7 @@ func (h *Headscale) PollNetMapStream( // client has been removed from database // since the stream opened, terminate connection. - break + return } now := time.Now().UTC() machine.LastSeen = &now @@ -360,16 +361,16 @@ func (h *Headscale) PollNetMapStream( Str("channel", "pollData"). Err(err). Msg("Cannot update machine LastSuccessfulUpdate") - } else { - log.Trace(). - Str("handler", "PollNetMapStream"). - Str("machine", machine.Hostname). - Str("channel", "pollData"). - Int("bytes", len(data)). - Msg("Machine entry in database updated successfully after sending pollData") + + return } - break + log.Trace(). + Str("handler", "PollNetMapStream"). + Str("machine", machine.Hostname). + Str("channel", "pollData"). + Int("bytes", len(data)). + Msg("Machine entry in database updated successfully after sending data") case data := <-keepAliveChan: log.Trace(). @@ -387,8 +388,9 @@ func (h *Headscale) PollNetMapStream( Err(err). Msg("Cannot write keep alive message") - break + return } + log.Trace(). Str("handler", "PollNetMapStream"). Str("machine", machine.Hostname). @@ -409,7 +411,7 @@ func (h *Headscale) PollNetMapStream( // client has been removed from database // since the stream opened, terminate connection. - break + return } now := time.Now().UTC() machine.LastSeen = &now @@ -421,16 +423,16 @@ func (h *Headscale) PollNetMapStream( Str("channel", "keepAlive"). Err(err). Msg("Cannot update machine LastSeen") - } else { - log.Trace(). - Str("handler", "PollNetMapStream"). - Str("machine", machine.Hostname). - Str("channel", "keepAlive"). - Int("bytes", len(data)). - Msg("Machine updated successfully after sending keep alive") + + return } - break + log.Trace(). + Str("handler", "PollNetMapStream"). + Str("machine", machine.Hostname). + Str("channel", "keepAlive"). + Int("bytes", len(data)). + Msg("Machine updated successfully after sending keep alive") case <-updateChan: log.Trace(). @@ -440,6 +442,7 @@ func (h *Headscale) PollNetMapStream( Msg("Received a request for update") updateRequestsReceivedOnChannel.WithLabelValues(machine.Namespace.Name, machine.Hostname). Inc() + if h.isOutdated(machine) { var lastUpdate time.Time if machine.LastSuccessfulUpdate != nil { @@ -459,6 +462,8 @@ func (h *Headscale) PollNetMapStream( Str("channel", "update"). Err(err). Msg("Could not get the map update") + + return } _, err = w.Write(data) if err != nil { @@ -515,7 +520,10 @@ func (h *Headscale) PollNetMapStream( Str("channel", "update"). Err(err). Msg("Cannot update machine LastSuccessfulUpdate") + + return } + } else { var lastUpdate time.Time if machine.LastSuccessfulUpdate != nil { @@ -529,8 +537,6 @@ func (h *Headscale) PollNetMapStream( Msgf("%s is up to date", machine.Hostname) } - return - case <-ctx.Done(): log.Info(). Str("handler", "PollNetMapStream"). @@ -550,7 +556,7 @@ func (h *Headscale) PollNetMapStream( // client has been removed from database // since the stream opened, terminate connection. - break + return } now := time.Now().UTC() machine.LastSeen = &now @@ -564,11 +570,10 @@ func (h *Headscale) PollNetMapStream( Msg("Cannot update machine LastSeen") } - break + // The connection has been closed, so we can stop polling. + return } } - - log.Info().Msgf("Closing poll loop to %s", machine.Hostname) } func (h *Headscale) scheduledPollWorker(