diff --git a/Dockerfile.tailscale-HEAD b/Dockerfile.tailscale-HEAD index 83ff9fe..f78d687 100644 --- a/Dockerfile.tailscale-HEAD +++ b/Dockerfile.tailscale-HEAD @@ -1,21 +1,43 @@ -# This Dockerfile and the images produced are for testing headscale, -# and are in no way endorsed by Headscale's maintainers as an -# official nor supported release or distribution. +# Copyright (c) Tailscale Inc & AUTHORS +# SPDX-License-Identifier: BSD-3-Clause -FROM golang:latest +# This Dockerfile is more or less lifted from tailscale/tailscale +# to ensure a similar build process when testing the HEAD of tailscale. -RUN apt-get update \ - && apt-get install -y dnsutils git iptables ssh ca-certificates \ - && rm -rf /var/lib/apt/lists/* +FROM golang:1.22-alpine AS build-env -RUN useradd --shell=/bin/bash --create-home ssh-it-user +WORKDIR /go/src +RUN apk add --no-cache git + +# Replace `RUN git...` with `COPY` and a local checked out version of Tailscale in `./tailscale` +# to test specific commits of the Tailscale client. This is useful when trying to find out why +# something specific broke between two versions of Tailscale with for example `git bisect`. +# COPY ./tailscale . RUN git clone https://github.com/tailscale/tailscale.git -WORKDIR /go/tailscale +WORKDIR /go/src/tailscale -RUN git checkout main \ - && sh build_dist.sh tailscale.com/cmd/tailscale \ - && sh build_dist.sh tailscale.com/cmd/tailscaled \ - && cp tailscale /usr/local/bin/ \ - && cp tailscaled /usr/local/bin/ + +# see build_docker.sh +ARG VERSION_LONG="" +ENV VERSION_LONG=$VERSION_LONG +ARG VERSION_SHORT="" +ENV VERSION_SHORT=$VERSION_SHORT +ARG VERSION_GIT_HASH="" +ENV VERSION_GIT_HASH=$VERSION_GIT_HASH +ARG TARGETARCH + +RUN GOARCH=$TARGETARCH go install -ldflags="\ + -X tailscale.com/version.longStamp=$VERSION_LONG \ + -X tailscale.com/version.shortStamp=$VERSION_SHORT \ + -X tailscale.com/version.gitCommitStamp=$VERSION_GIT_HASH" \ + -v ./cmd/tailscale ./cmd/tailscaled ./cmd/containerboot + +FROM alpine:3.18 +RUN apk add --no-cache ca-certificates iptables iproute2 ip6tables curl + +COPY --from=build-env /go/bin/* /usr/local/bin/ +# For compat with the previous run.sh, although ideally you should be +# using build_docker.sh which sets an entrypoint for the image. +RUN mkdir /tailscale && ln -s /usr/local/bin/containerboot /tailscale/run.sh diff --git a/flake.nix b/flake.nix index 94ec615..5d4978c 100644 --- a/flake.nix +++ b/flake.nix @@ -30,8 +30,8 @@ checkFlags = ["-short"]; # When updating go.mod or go.sum, a new sha will need to be calculated, - # update this if you have a mismatch after doing a change to those files. - vendorHash = "sha256-wXfKeiJaGe6ahOsONrQhvbuMN8flQ13b0ZjxdbFs1e8="; + # update this if you have a mismatch after doing a change to thos files. + vendorHash = "sha256-EorT2AVwA3usly/LcNor6r5UIhLCdj3L4O4ilgTIC2o="; subPackages = ["cmd/headscale"]; diff --git a/go.mod b/go.mod index 0e0e12a..e96bcc8 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/puzpuzpuz/xsync/v3 v3.1.0 github.com/rs/zerolog v1.32.0 github.com/samber/lo v1.39.0 + github.com/sasha-s/go-deadlock v0.3.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 @@ -155,6 +156,7 @@ require ( github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.12 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index 309d14e..a534a8e 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/philip-bui/grpc-zerolog v1.0.1 h1:EMacvLRUd2O1K0eWod27ZP5CY1iTNkhBDLSN+Q4JEvA= github.com/philip-bui/grpc-zerolog v1.0.1/go.mod h1:qXbiq/2X4ZUMMshsqlWyTHOcw7ns+GZmlqZZN05ZHcQ= github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -423,6 +425,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= +github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= diff --git a/hscontrol/app.go b/hscontrol/app.go index 28211db..253c267 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -19,6 +19,7 @@ import ( "time" "github.com/coreos/go-oidc/v3/oidc" + "github.com/davecgh/go-spew/spew" "github.com/gorilla/mux" grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -104,16 +105,15 @@ type Headscale struct { registrationCache *cache.Cache pollNetMapStreamWG sync.WaitGroup - - mapSessions map[types.NodeID]*mapSession - mapSessionMu sync.Mutex } var ( - profilingEnabled = envknob.Bool("HEADSCALE_PROFILING_ENABLED") + profilingEnabled = envknob.Bool("HEADSCALE_DEBUG_PROFILING_ENABLED") + profilingPath = envknob.String("HEADSCALE_DEBUG_PROFILING_PATH") tailsqlEnabled = envknob.Bool("HEADSCALE_DEBUG_TAILSQL_ENABLED") tailsqlStateDir = envknob.String("HEADSCALE_DEBUG_TAILSQL_STATE_DIR") tailsqlTSKey = envknob.String("TS_AUTHKEY") + dumpConfig = envknob.Bool("HEADSCALE_DEBUG_DUMP_CONFIG") ) func NewHeadscale(cfg *types.Config) (*Headscale, error) { @@ -138,7 +138,6 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) { registrationCache: registrationCache, pollNetMapStreamWG: sync.WaitGroup{}, nodeNotifier: notifier.NewNotifier(cfg), - mapSessions: make(map[types.NodeID]*mapSession), } app.db, err = db.NewHeadscaleDatabase( @@ -502,14 +501,14 @@ func (h *Headscale) createRouter(grpcMux *grpcRuntime.ServeMux) *mux.Router { // Serve launches the HTTP and gRPC server service Headscale and the API. func (h *Headscale) Serve() error { - if _, enableProfile := os.LookupEnv("HEADSCALE_PROFILING_ENABLED"); enableProfile { - if profilePath, ok := os.LookupEnv("HEADSCALE_PROFILING_PATH"); ok { - err := os.MkdirAll(profilePath, os.ModePerm) + if profilingEnabled { + if profilingPath != "" { + err := os.MkdirAll(profilingPath, os.ModePerm) if err != nil { log.Fatal().Err(err).Msg("failed to create profiling directory") } - defer profile.Start(profile.ProfilePath(profilePath)).Stop() + defer profile.Start(profile.ProfilePath(profilingPath)).Stop() } else { defer profile.Start().Stop() } @@ -517,6 +516,10 @@ func (h *Headscale) Serve() error { var err error + if dumpConfig { + spew.Dump(h.cfg) + } + // Fetch an initial DERP Map before we start serving h.DERPMap = derp.GetDERPMap(h.cfg.DERP) h.mapper = mapper.NewMapper(h.db, h.cfg, h.DERPMap, h.nodeNotifier) @@ -729,19 +732,6 @@ func (h *Headscale) Serve() error { w.WriteHeader(http.StatusOK) w.Write([]byte(h.nodeNotifier.String())) }) - debugMux.HandleFunc("/debug/mapresp", func(w http.ResponseWriter, r *http.Request) { - h.mapSessionMu.Lock() - defer h.mapSessionMu.Unlock() - - var b strings.Builder - b.WriteString("mapresponders:\n") - for k, v := range h.mapSessions { - fmt.Fprintf(&b, "\t%d: %p\n", k, v) - } - - w.WriteHeader(http.StatusOK) - w.Write([]byte(b.String())) - }) debugMux.Handle("/metrics", promhttp.Handler()) debugHTTPServer := &http.Server{ @@ -822,17 +812,6 @@ func (h *Headscale) Serve() error { expireNodeCancel() expireEphemeralCancel() - trace("closing map sessions") - wg := sync.WaitGroup{} - for _, mapSess := range h.mapSessions { - wg.Add(1) - go func() { - mapSess.close() - wg.Done() - }() - } - wg.Wait() - trace("waiting for netmap stream to close") h.pollNetMapStreamWG.Wait() diff --git a/hscontrol/metrics.go b/hscontrol/metrics.go index 9d802ca..835a6aa 100644 --- a/hscontrol/metrics.go +++ b/hscontrol/metrics.go @@ -7,8 +7,23 @@ import ( "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "tailscale.com/envknob" ) +var debugHighCardinalityMetrics = envknob.Bool("HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS") + +var mapResponseLastSentSeconds *prometheus.GaugeVec + +func init() { + if debugHighCardinalityMetrics { + mapResponseLastSentSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_last_sent_seconds", + Help: "last sent metric to node.id", + }, []string{"type", "id"}) + } +} + const prometheusNamespace = "headscale" var ( @@ -37,16 +52,16 @@ var ( Name: "mapresponse_readonly_requests_total", Help: "total count of readonly requests received", }, []string{"status"}) - mapResponseSessions = promauto.NewGauge(prometheus.GaugeOpts{ + mapResponseEnded = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, - Name: "mapresponse_current_sessions_total", - Help: "total count open map response sessions", - }) - mapResponseRejected = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: prometheusNamespace, - Name: "mapresponse_rejected_new_sessions_total", - Help: "total count of new mapsessions rejected", + Name: "mapresponse_ended_total", + Help: "total count of new mapsessions ended", }, []string{"reason"}) + mapResponseClosed = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_closed_total", + Help: "total count of calls to mapresponse close", + }, []string{"return"}) httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: prometheusNamespace, Name: "http_duration_seconds", diff --git a/hscontrol/noise.go b/hscontrol/noise.go index 7fcbc25..360c704 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -231,62 +231,12 @@ func (ns *noiseServer) NoisePollNetMapHandler( return } + sess := ns.headscale.newMapSession(req.Context(), mapRequest, writer, node) - sess.tracef("a node sending a MapRequest with Noise protocol") - - // If a streaming mapSession exists for this node, close it - // and start a new one. - if sess.isStreaming() { - sess.tracef("aquiring lock to check stream") - - ns.headscale.mapSessionMu.Lock() - if _, ok := ns.headscale.mapSessions[node.ID]; ok { - // NOTE/TODO(kradalby): From how I understand the protocol, when - // a client connects with stream=true, and already has a streaming - // connection open, the correct way is to close the current channel - // and replace it. However, I cannot manage to get that working with - // some sort of lock/block happening on the cancelCh in the streaming - // session. - // Not closing the channel and replacing it puts us in a weird state - // which keeps a ghost stream open, receiving keep alives, but no updates. - // - // Typically a new connection is opened when one exists as a client which - // is already authenticated reconnects (e.g. down, then up). The client will - // start auth and streaming at the same time, and then cancel the streaming - // when the auth has finished successfully, opening a new connection. - // - // As a work-around to not replacing, abusing the clients "resilience" - // by reject the new connection which will cause the client to immediately - // reconnect and "fix" the issue, as the other connection typically has been - // closed, meaning there is nothing to replace. - // - // sess.infof("node has an open stream(%p), replacing with %p", oldSession, sess) - // oldSession.close() - - defer ns.headscale.mapSessionMu.Unlock() - - sess.infof("node has an open stream(%p), rejecting new stream", sess) - mapResponseRejected.WithLabelValues("exists").Inc() - return - } - - ns.headscale.mapSessions[node.ID] = sess - mapResponseSessions.Inc() - ns.headscale.mapSessionMu.Unlock() - sess.tracef("releasing lock to check stream") - } - - sess.serve() - - if sess.isStreaming() { - sess.tracef("aquiring lock to remove stream") - ns.headscale.mapSessionMu.Lock() - defer ns.headscale.mapSessionMu.Unlock() - - delete(ns.headscale.mapSessions, node.ID) - mapResponseSessions.Dec() - - sess.tracef("releasing lock to remove stream") + if !sess.isStreaming() { + sess.serve() + } else { + sess.serveLongPoll() } } diff --git a/hscontrol/notifier/metrics.go b/hscontrol/notifier/metrics.go index 1cc4df2..8a7a883 100644 --- a/hscontrol/notifier/metrics.go +++ b/hscontrol/notifier/metrics.go @@ -3,22 +3,43 @@ package notifier import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "tailscale.com/envknob" ) const prometheusNamespace = "headscale" +var debugHighCardinalityMetrics = envknob.Bool("HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS") + +var notifierUpdateSent *prometheus.CounterVec + +func init() { + if debugHighCardinalityMetrics { + notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "notifier_update_sent_total", + Help: "total count of update sent on nodes channel", + }, []string{"status", "type", "trigger", "id"}) + } else { + notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "notifier_update_sent_total", + Help: "total count of update sent on nodes channel", + }, []string{"status", "type", "trigger"}) + } +} + var ( + notifierWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_waiters_for_lock", + Help: "gauge of waiters for the notifier lock", + }, []string{"type", "action"}) notifierWaitForLock = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: prometheusNamespace, Name: "notifier_wait_for_lock_seconds", Help: "histogram of time spent waiting for the notifier lock", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.5, 1, 3, 5, 10}, }, []string{"action"}) - notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: prometheusNamespace, - Name: "notifier_update_sent_total", - Help: "total count of update sent on nodes channel", - }, []string{"status", "type", "trigger"}) notifierUpdateReceived = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: prometheusNamespace, Name: "notifier_update_received_total", @@ -29,4 +50,19 @@ var ( Name: "notifier_open_channels_total", Help: "total count open channels in notifier", }) + notifierBatcherWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_batcher_waiters_for_lock", + Help: "gauge of waiters for the notifier batcher lock", + }, []string{"type", "action"}) + notifierBatcherChanges = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_batcher_changes_pending", + Help: "gauge of full changes pending in the notifier batcher", + }, []string{}) + notifierBatcherPatches = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_batcher_patches_pending", + Help: "gauge of patches pending in the notifier batcher", + }, []string{}) ) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index 339a56f..483c3f3 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -11,25 +11,40 @@ import ( "github.com/juanfont/headscale/hscontrol/types" "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog/log" + "github.com/sasha-s/go-deadlock" + "tailscale.com/envknob" "tailscale.com/tailcfg" "tailscale.com/util/set" ) +var debugDeadlock = envknob.Bool("HEADSCALE_DEBUG_DEADLOCK") +var debugDeadlockTimeout = envknob.RegisterDuration("HEADSCALE_DEBUG_DEADLOCK_TIMEOUT") + +func init() { + deadlock.Opts.Disable = !debugDeadlock + if debugDeadlock { + deadlock.Opts.DeadlockTimeout = debugDeadlockTimeout() + deadlock.Opts.PrintAllCurrentGoroutines = true + } +} + type Notifier struct { - l sync.RWMutex + l deadlock.Mutex nodes map[types.NodeID]chan<- types.StateUpdate connected *xsync.MapOf[types.NodeID, bool] b *batcher + cfg *types.Config } func NewNotifier(cfg *types.Config) *Notifier { n := &Notifier{ nodes: make(map[types.NodeID]chan<- types.StateUpdate), connected: xsync.NewMapOf[types.NodeID, bool](), + cfg: cfg, } b := newBatcher(cfg.Tuning.BatchChangeDelay, n) n.b = b - // TODO(kradalby): clean this up + go b.doWork() return n } @@ -39,59 +54,75 @@ func (n *Notifier) Close() { n.b.close() } -func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { - log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to add node") - defer log.Trace(). - Caller(). - Uint64("node.id", nodeID.Uint64()). - Msg("releasing lock to add node") +func (n *Notifier) tracef(nID types.NodeID, msg string, args ...any) { + log.Trace(). + Uint64("node.id", nID.Uint64()). + Int("open_chans", len(n.nodes)).Msgf(msg, args...) +} +func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { start := time.Now() + notifierWaitersForLock.WithLabelValues("lock", "add").Inc() n.l.Lock() defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "add").Dec() notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds()) + // If a channel exists, it means the node has opened a new + // connection. Close the old channel and replace it. + if curr, ok := n.nodes[nodeID]; ok { + n.tracef(nodeID, "channel present, closing and replacing") + close(curr) + } + n.nodes[nodeID] = c n.connected.Store(nodeID, true) - log.Trace(). - Uint64("node.id", nodeID.Uint64()). - Int("open_chans", len(n.nodes)). - Msg("Added new channel") + n.tracef(nodeID, "added new channel") notifierNodeUpdateChans.Inc() } -func (n *Notifier) RemoveNode(nodeID types.NodeID) { - log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to remove node") - defer log.Trace(). - Caller(). - Uint64("node.id", nodeID.Uint64()). - Msg("releasing lock to remove node") - +// RemoveNode removes a node and a given channel from the notifier. +// It checks that the channel is the same as currently being updated +// and ignores the removal if it is not. +// RemoveNode reports if the node/chan was removed. +func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) bool { start := time.Now() + notifierWaitersForLock.WithLabelValues("lock", "remove").Inc() n.l.Lock() defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "remove").Dec() notifierWaitForLock.WithLabelValues("remove").Observe(time.Since(start).Seconds()) if len(n.nodes) == 0 { - return + return true + } + + // If the channel exist, but it does not belong + // to the caller, ignore. + if curr, ok := n.nodes[nodeID]; ok { + if curr != c { + n.tracef(nodeID, "channel has been replaced, not removing") + return false + } } delete(n.nodes, nodeID) n.connected.Store(nodeID, false) - log.Trace(). - Uint64("node.id", nodeID.Uint64()). - Int("open_chans", len(n.nodes)). - Msg("Removed channel") + n.tracef(nodeID, "removed channel") notifierNodeUpdateChans.Dec() + + return true } // IsConnected reports if a node is connected to headscale and has a // poll session open. func (n *Notifier) IsConnected(nodeID types.NodeID) bool { - n.l.RLock() - defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("lock", "conncheck").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "conncheck").Dec() if val, ok := n.connected.Load(nodeID); ok { return val @@ -130,15 +161,11 @@ func (n *Notifier) NotifyByNodeID( update types.StateUpdate, nodeID types.NodeID, ) { - log.Trace().Caller().Str("type", update.Type.String()).Msg("acquiring lock to notify") - defer log.Trace(). - Caller(). - Str("type", update.Type.String()). - Msg("releasing lock, finished notifying") - start := time.Now() - n.l.RLock() - defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("lock", "notify").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "notify").Dec() notifierWaitForLock.WithLabelValues("notify").Observe(time.Since(start).Seconds()) if c, ok := n.nodes[nodeID]; ok { @@ -150,50 +177,94 @@ func (n *Notifier) NotifyByNodeID( Any("origin", types.NotifyOriginKey.Value(ctx)). Any("origin-hostname", types.NotifyHostnameKey.Value(ctx)). Msgf("update not sent, context cancelled") - notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + } return case c <- update: - log.Trace(). - Uint64("node.id", nodeID.Uint64()). - Any("origin", ctx.Value("origin")). - Any("origin-hostname", ctx.Value("hostname")). - Msgf("update successfully sent on chan") - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + n.tracef(nodeID, "update successfully sent on chan, origin: %s, origin-hostname: %s", ctx.Value("origin"), ctx.Value("hostname")) + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc() + } } } } func (n *Notifier) sendAll(update types.StateUpdate) { start := time.Now() - n.l.RLock() - defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("lock", "send-all").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "send-all").Dec() notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds()) - for _, c := range n.nodes { - c <- update - notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + for id, c := range n.nodes { + // Whenever an update is sent to all nodes, there is a chance that the node + // has disconnected and the goroutine that was supposed to consume the update + // has shut down the channel and is waiting for the lock held here in RemoveNode. + // This means that there is potential for a deadlock which would stop all updates + // going out to clients. This timeout prevents that from happening by moving on to the + // next node if the context is cancelled. Afther sendAll releases the lock, the add/remove + // call will succeed and the update will go to the correct nodes on the next call. + ctx, cancel := context.WithTimeout(context.Background(), n.cfg.Tuning.NotifierSendTimeout) + defer cancel() + select { + case <-ctx.Done(): + log.Error(). + Err(ctx.Err()). + Uint64("node.id", id.Uint64()). + Msgf("update not sent, context cancelled") + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc() + } + + return + case c <- update: + if debugHighCardinalityMetrics { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.String()).Inc() + } else { + notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc() + } + } } } func (n *Notifier) String() string { - n.l.RLock() - defer n.l.RUnlock() + notifierWaitersForLock.WithLabelValues("lock", "string").Inc() + n.l.Lock() + defer n.l.Unlock() + notifierWaitersForLock.WithLabelValues("lock", "string").Dec() var b strings.Builder - b.WriteString("chans:\n") + fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes)) - for k, v := range n.nodes { - fmt.Fprintf(&b, "\t%d: %p\n", k, v) + var keys []types.NodeID + n.connected.Range(func(key types.NodeID, value bool) bool { + keys = append(keys, key) + return true + }) + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + + for _, key := range keys { + fmt.Fprintf(&b, "\t%d: %p\n", key, n.nodes[key]) } b.WriteString("\n") - b.WriteString("connected:\n") + fmt.Fprintf(&b, "connected (%d):\n", len(n.nodes)) - n.connected.Range(func(k types.NodeID, v bool) bool { - fmt.Fprintf(&b, "\t%d: %t\n", k, v) - return true - }) + for _, key := range keys { + val, _ := n.connected.Load(key) + fmt.Fprintf(&b, "\t%d: %t\n", key, val) + } return b.String() } @@ -230,13 +301,16 @@ func (b *batcher) close() { // addOrPassthrough adds the update to the batcher, if it is not a // type that is currently batched, it will be sent immediately. func (b *batcher) addOrPassthrough(update types.StateUpdate) { + notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Inc() b.mu.Lock() defer b.mu.Unlock() + notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Dec() switch update.Type { case types.StatePeerChanged: b.changedNodeIDs.Add(update.ChangeNodes...) b.nodesChanged = true + notifierBatcherChanges.WithLabelValues().Set(float64(b.changedNodeIDs.Len())) case types.StatePeerChangedPatch: for _, newPatch := range update.ChangePatches { @@ -248,6 +322,7 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) { } } b.patchesChanged = true + notifierBatcherPatches.WithLabelValues().Set(float64(len(b.patches))) default: b.n.sendAll(update) @@ -257,8 +332,10 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) { // flush sends all the accumulated patches to all // nodes in the notifier. func (b *batcher) flush() { + notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Inc() b.mu.Lock() defer b.mu.Unlock() + notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Dec() if b.nodesChanged || b.patchesChanged { var patches []*tailcfg.PeerChange @@ -296,8 +373,10 @@ func (b *batcher) flush() { } b.changedNodeIDs = set.Slice[types.NodeID]{} + notifierBatcherChanges.WithLabelValues().Set(0) b.nodesChanged = false b.patches = make(map[types.NodeID]tailcfg.PeerChange, len(b.patches)) + notifierBatcherPatches.WithLabelValues().Set(0) b.patchesChanged = false } } diff --git a/hscontrol/notifier/notifier_test.go b/hscontrol/notifier/notifier_test.go index 4d61f13..8841a46 100644 --- a/hscontrol/notifier/notifier_test.go +++ b/hscontrol/notifier/notifier_test.go @@ -227,7 +227,7 @@ func TestBatcher(t *testing.T) { ch := make(chan types.StateUpdate, 30) defer close(ch) n.AddNode(1, ch) - defer n.RemoveNode(1) + defer n.RemoveNode(1, ch) for _, u := range tt.updates { n.NotifyAll(context.Background(), u) diff --git a/hscontrol/poll.go b/hscontrol/poll.go index e3137cc..d3c8211 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -9,13 +9,13 @@ import ( "net/netip" "sort" "strings" - "sync" "time" "github.com/juanfont/headscale/hscontrol/db" "github.com/juanfont/headscale/hscontrol/mapper" "github.com/juanfont/headscale/hscontrol/types" "github.com/rs/zerolog/log" + "github.com/sasha-s/go-deadlock" xslices "golang.org/x/exp/slices" "gorm.io/gorm" "tailscale.com/tailcfg" @@ -29,11 +29,6 @@ type contextKey string const nodeNameContextKey = contextKey("nodeName") -type sessionManager struct { - mu sync.RWMutex - sess map[types.NodeID]*mapSession -} - type mapSession struct { h *Headscale req tailcfg.MapRequest @@ -41,12 +36,13 @@ type mapSession struct { capVer tailcfg.CapabilityVersion mapper *mapper.Mapper - serving bool - servingMu sync.Mutex + cancelChMu deadlock.Mutex - ch chan types.StateUpdate - cancelCh chan struct{} + ch chan types.StateUpdate + cancelCh chan struct{} + cancelChOpen bool + keepAlive time.Duration keepAliveTicker *time.Ticker node *types.Node @@ -77,6 +73,8 @@ func (h *Headscale) newMapSession( } } + ka := keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond) + return &mapSession{ h: h, ctx: ctx, @@ -86,13 +84,12 @@ func (h *Headscale) newMapSession( capVer: req.Version, mapper: h.mapper, - // serving indicates if a client is being served. - serving: false, + ch: updateChan, + cancelCh: make(chan struct{}), + cancelChOpen: true, - ch: updateChan, - cancelCh: make(chan struct{}), - - keepAliveTicker: time.NewTicker(keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond)), + keepAlive: ka, + keepAliveTicker: nil, // Loggers warnf: warnf, @@ -103,15 +100,23 @@ func (h *Headscale) newMapSession( } func (m *mapSession) close() { - m.servingMu.Lock() - defer m.servingMu.Unlock() - if !m.serving { + m.cancelChMu.Lock() + defer m.cancelChMu.Unlock() + + if !m.cancelChOpen { + mapResponseClosed.WithLabelValues("chanclosed").Inc() return } - m.tracef("mapSession (%p) sending message on cancel chan") - m.cancelCh <- struct{}{} - m.tracef("mapSession (%p) sent message on cancel chan") + m.tracef("mapSession (%p) sending message on cancel chan", m) + select { + case m.cancelCh <- struct{}{}: + mapResponseClosed.WithLabelValues("sent").Inc() + m.tracef("mapSession (%p) sent message on cancel chan", m) + case <-time.After(30 * time.Second): + mapResponseClosed.WithLabelValues("timeout").Inc() + m.tracef("mapSession (%p) timed out sending close message", m) + } } func (m *mapSession) isStreaming() bool { @@ -126,40 +131,12 @@ func (m *mapSession) isReadOnlyUpdate() bool { return !m.req.Stream && m.req.OmitPeers && m.req.ReadOnly } -// handlePoll ensures the node gets the appropriate updates from either -// polling or immediate responses. -// -//nolint:gocyclo +func (m *mapSession) resetKeepAlive() { + m.keepAliveTicker.Reset(m.keepAlive) +} + +// serve handles non-streaming requests. func (m *mapSession) serve() { - // Register with the notifier if this is a streaming - // session - if m.isStreaming() { - // defers are called in reverse order, - // so top one is executed last. - - // Failover the node's routes if any. - defer m.infof("node has disconnected, mapSession: %p", m) - defer m.pollFailoverRoutes("node closing connection", m.node) - - defer m.h.updateNodeOnlineStatus(false, m.node) - defer m.h.nodeNotifier.RemoveNode(m.node.ID) - - defer func() { - m.servingMu.Lock() - defer m.servingMu.Unlock() - - m.serving = false - close(m.cancelCh) - }() - - m.serving = true - - m.h.nodeNotifier.AddNode(m.node.ID, m.ch) - m.h.updateNodeOnlineStatus(true, m.node) - - m.infof("node has connected, mapSession: %p", m) - } - // TODO(kradalby): A set todos to harden: // - func to tell the stream to die, readonly -> false, !stream && omitpeers -> false, true @@ -196,13 +173,43 @@ func (m *mapSession) serve() { return } +} + +// serveLongPoll ensures the node gets the appropriate updates from either +// polling or immediate responses. +// +//nolint:gocyclo +func (m *mapSession) serveLongPoll() { + // Clean up the session when the client disconnects + defer func() { + m.cancelChMu.Lock() + m.cancelChOpen = false + close(m.cancelCh) + m.cancelChMu.Unlock() + + // only update node status if the node channel was removed. + // in principal, it will be removed, but the client rapidly + // reconnects, the channel might be of another connection. + // In that case, it is not closed and the node is still online. + if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) { + // Failover the node's routes if any. + m.h.updateNodeOnlineStatus(false, m.node) + m.pollFailoverRoutes("node closing connection", m.node) + } + + m.infof("node has disconnected, mapSession: %p, chan: %p", m, m.ch) + }() + // From version 68, all streaming requests can be treated as read only. + // TODO: Remove when we drop support for 1.48 if m.capVer < 68 { // Error has been handled/written to client in the func // return err := m.handleSaveNode() if err != nil { mapResponseWriteUpdatesInStream.WithLabelValues("error").Inc() + + m.close() return } mapResponseWriteUpdatesInStream.WithLabelValues("ok").Inc() @@ -224,6 +231,13 @@ func (m *mapSession) serve() { ctx, cancel := context.WithCancel(context.WithValue(m.ctx, nodeNameContextKey, m.node.Hostname)) defer cancel() + m.keepAliveTicker = time.NewTicker(m.keepAlive) + + m.h.nodeNotifier.AddNode(m.node.ID, m.ch) + go m.h.updateNodeOnlineStatus(true, m.node) + + m.infof("node has connected, mapSession: %p, chan: %p", m, m.ch) + // Loop through updates and continuously send them to the // client. for { @@ -231,13 +245,21 @@ func (m *mapSession) serve() { select { case <-m.cancelCh: m.tracef("poll cancelled received") - return - case <-ctx.Done(): - m.tracef("poll context done") + mapResponseEnded.WithLabelValues("cancelled").Inc() return - // Consume all updates sent to node - case update := <-m.ch: + case <-ctx.Done(): + m.tracef("poll context done") + mapResponseEnded.WithLabelValues("done").Inc() + return + + // Consume updates sent to node + case update, ok := <-m.ch: + if !ok { + m.tracef("update channel closed, streaming session is likely being replaced") + return + } + m.tracef("received stream update: %s %s", update.Type.String(), update.Message) mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc() @@ -303,15 +325,13 @@ func (m *mapSession) serve() { return } - // log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response") - // Only send update if there is change if data != nil { startWrite := time.Now() _, err = m.w.Write(data) if err != nil { mapResponseSent.WithLabelValues("error", updateType).Inc() - m.errf(err, "Could not write the map response, for mapSession: %p", m) + m.errf(err, "could not write the map response(%s), for mapSession: %p", update.Type.String(), m) return } @@ -324,8 +344,12 @@ func (m *mapSession) serve() { log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", m.node.MachineKey.String()).Msg("finished writing mapresp to node") + if debugHighCardinalityMetrics { + mapResponseLastSentSeconds.WithLabelValues(updateType, m.node.ID.String()).Set(float64(time.Now().Unix())) + } mapResponseSent.WithLabelValues("ok", updateType).Inc() m.tracef("update sent") + m.resetKeepAlive() } case <-m.keepAliveTicker.C: @@ -348,6 +372,9 @@ func (m *mapSession) serve() { return } + if debugHighCardinalityMetrics { + mapResponseLastSentSeconds.WithLabelValues("keepalive", m.node.ID.String()).Set(float64(time.Now().Unix())) + } mapResponseSent.WithLabelValues("ok", "keepalive").Inc() } } @@ -404,16 +431,6 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) { }, node.ID) } -func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) { - log.Trace(). - Str("handler", "PollNetMap"). - Str("node", node). - Str("channel", "Done"). - Msg(fmt.Sprintf("Closing %s channel", name)) - - close(channel) -} - func (m *mapSession) handleEndpointUpdate() { m.tracef("received endpoint update") @@ -425,6 +442,17 @@ func (m *mapSession) handleEndpointUpdate() { m.node.ApplyPeerChange(&change) sendUpdate, routesChanged := hostInfoChanged(m.node.Hostinfo, m.req.Hostinfo) + + // The node might not set NetInfo if it has not changed and if + // the full HostInfo object is overrwritten, the information is lost. + // If there is no NetInfo, keep the previous one. + // From 1.66 the client only sends it if changed: + // https://github.com/tailscale/tailscale/commit/e1011f138737286ecf5123ff887a7a5800d129a2 + // TODO(kradalby): evaulate if we need better comparing of hostinfo + // before we take the changes. + if m.req.Hostinfo.NetInfo == nil { + m.req.Hostinfo.NetInfo = m.node.Hostinfo.NetInfo + } m.node.Hostinfo = m.req.Hostinfo logTracePeerChange(m.node.Hostname, sendUpdate, &change) diff --git a/hscontrol/types/config.go b/hscontrol/types/config.go index bd0bfea..ab17cfb 100644 --- a/hscontrol/types/config.go +++ b/hscontrol/types/config.go @@ -171,6 +171,7 @@ type LogConfig struct { } type Tuning struct { + NotifierSendTimeout time.Duration BatchChangeDelay time.Duration NodeMapSessionBufferedChanSize int } @@ -232,6 +233,7 @@ func LoadConfig(path string, isFile bool) error { viper.SetDefault("ephemeral_node_inactivity_timeout", "120s") + viper.SetDefault("tuning.notifier_send_timeout", "800ms") viper.SetDefault("tuning.batch_change_delay", "800ms") viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30) @@ -640,7 +642,7 @@ func GetHeadscaleConfig() (*Config, error) { }, nil } - logConfig := GetLogConfig() + logConfig := GetLogConfig() zerolog.SetGlobalLevel(logConfig.Level) prefix4, err := PrefixV4() @@ -768,6 +770,7 @@ func GetHeadscaleConfig() (*Config, error) { // TODO(kradalby): Document these settings when more stable Tuning: Tuning{ + NotifierSendTimeout: viper.GetDuration("tuning.notifier_send_timeout"), BatchChangeDelay: viper.GetDuration("tuning.batch_change_delay"), NodeMapSessionBufferedChanSize: viper.GetInt("tuning.node_mapsession_buffered_chan_size"), }, diff --git a/hscontrol/types/node.go b/hscontrol/types/node.go index 7a5756a..3ccadc3 100644 --- a/hscontrol/types/node.go +++ b/hscontrol/types/node.go @@ -43,6 +43,10 @@ func (id NodeID) Uint64() uint64 { return uint64(id) } +func (id NodeID) String() string { + return strconv.FormatUint(id.Uint64(), util.Base10) +} + // Node is a Headscale client. type Node struct { ID NodeID `gorm:"primary_key"` diff --git a/integration/general_test.go b/integration/general_test.go index db9bf83..245e8f0 100644 --- a/integration/general_test.go +++ b/integration/general_test.go @@ -1,6 +1,7 @@ package integration import ( + "context" "encoding/json" "fmt" "net/netip" @@ -15,6 +16,7 @@ import ( "github.com/rs/zerolog/log" "github.com/samber/lo" "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" "tailscale.com/client/tailscale/apitype" "tailscale.com/types/key" ) @@ -829,24 +831,10 @@ func TestPingAllByIPManyUpDown(t *testing.T) { "user2": len(MustTestVersions), } - headscaleConfig := map[string]string{ - "HEADSCALE_DERP_URLS": "", - "HEADSCALE_DERP_SERVER_ENABLED": "true", - "HEADSCALE_DERP_SERVER_REGION_ID": "999", - "HEADSCALE_DERP_SERVER_REGION_CODE": "headscale", - "HEADSCALE_DERP_SERVER_REGION_NAME": "Headscale Embedded DERP", - "HEADSCALE_DERP_SERVER_STUN_LISTEN_ADDR": "0.0.0.0:3478", - "HEADSCALE_DERP_SERVER_PRIVATE_KEY_PATH": "/tmp/derp.key", - - // Envknob for enabling DERP debug logs - "DERP_DEBUG_LOGS": "true", - "DERP_PROBER_DEBUG_LOGS": "true", - } - err = scenario.CreateHeadscaleEnv(spec, []tsic.Option{}, - hsic.WithTestName("pingallbyip"), - hsic.WithConfigEnv(headscaleConfig), + hsic.WithTestName("pingallbyipmany"), + hsic.WithEmbeddedDERPServerOnly(), hsic.WithTLS(), hsic.WithHostnameAsServerURL(), ) @@ -870,19 +858,35 @@ func TestPingAllByIPManyUpDown(t *testing.T) { success := pingAllHelper(t, allClients, allAddrs) t.Logf("%d successful pings out of %d", success, len(allClients)*len(allIps)) + wg, _ := errgroup.WithContext(context.Background()) + for run := range 3 { t.Logf("Starting DownUpPing run %d", run+1) for _, client := range allClients { - t.Logf("taking down %q", client.Hostname()) - client.Down() + c := client + wg.Go(func() error { + t.Logf("taking down %q", c.Hostname()) + return c.Down() + }) + } + + if err := wg.Wait(); err != nil { + t.Fatalf("failed to take down all nodes: %s", err) } time.Sleep(5 * time.Second) for _, client := range allClients { - t.Logf("bringing up %q", client.Hostname()) - client.Up() + c := client + wg.Go(func() error { + t.Logf("bringing up %q", c.Hostname()) + return c.Up() + }) + } + + if err := wg.Wait(); err != nil { + t.Fatalf("failed to take down all nodes: %s", err) } time.Sleep(5 * time.Second) diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index a118b6f..5b55a0a 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -286,9 +286,13 @@ func New( } env := []string{ - "HEADSCALE_PROFILING_ENABLED=1", - "HEADSCALE_PROFILING_PATH=/tmp/profile", + "HEADSCALE_DEBUG_PROFILING_ENABLED=1", + "HEADSCALE_DEBUG_PROFILING_PATH=/tmp/profile", "HEADSCALE_DEBUG_DUMP_MAPRESPONSE_PATH=/tmp/mapresponses", + "HEADSCALE_DEBUG_DEADLOCK=1", + "HEADSCALE_DEBUG_DEADLOCK_TIMEOUT=5s", + "HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS=1", + "HEADSCALE_DEBUG_DUMP_CONFIG=1", } for key, value := range hsic.env { env = append(env, fmt.Sprintf("%s=%s", key, value)) @@ -397,7 +401,7 @@ func (t *HeadscaleInContainer) Shutdown() error { ) } - err = t.SaveMetrics("/tmp/control/metrics.txt") + err = t.SaveMetrics(fmt.Sprintf("/tmp/control/%s_metrics.txt", t.hostname)) if err != nil { log.Printf( "Failed to metrics from control: %s", @@ -747,7 +751,7 @@ func createCertificate(hostname string) ([]byte, []byte, error) { Locality: []string{"Leiden"}, }, NotBefore: time.Now(), - NotAfter: time.Now().Add(60 * time.Minute), + NotAfter: time.Now().Add(60 * time.Hour), IsCA: true, ExtKeyUsage: []x509.ExtKeyUsage{ x509.ExtKeyUsageClientAuth, diff --git a/integration/scenario.go b/integration/scenario.go index 3f0eb7d..bd00424 100644 --- a/integration/scenario.go +++ b/integration/scenario.go @@ -51,8 +51,11 @@ var ( tailscaleVersions2021 = map[string]bool{ "head": true, "unstable": true, - "1.60": true, // CapVer: 82 - "1.58": true, // CapVer: 82 + "1.66": true, // CapVer: not checked + "1.64": true, // CapVer: not checked + "1.62": true, // CapVer: not checked + "1.60": true, // CapVer: not checked + "1.58": true, // CapVer: not checked "1.56": true, // CapVer: 82 "1.54": true, // CapVer: 79 "1.52": true, // CapVer: 79 @@ -423,8 +426,10 @@ func (s *Scenario) WaitForTailscaleSync() error { if err != nil { for _, user := range s.users { for _, client := range user.Clients { - peers, _ := client.PrettyPeers() - log.Println(peers) + peers, allOnline, _ := client.FailingPeersAsString() + if !allOnline { + log.Println(peers) + } } } } diff --git a/integration/tailscale.go b/integration/tailscale.go index 6bcf607..2ea3faa 100644 --- a/integration/tailscale.go +++ b/integration/tailscale.go @@ -36,5 +36,8 @@ type TailscaleClient interface { Ping(hostnameOrIP string, opts ...tsic.PingOption) error Curl(url string, opts ...tsic.CurlOption) (string, error) ID() string - PrettyPeers() (string, error) + + // FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client + // and a bool indicating if the clients online count and peer count is equal. + FailingPeersAsString() (string, bool, error) } diff --git a/integration/tsic/tsic.go b/integration/tsic/tsic.go index 6ae0226..0e3c91f 100644 --- a/integration/tsic/tsic.go +++ b/integration/tsic/tsic.go @@ -691,15 +691,18 @@ func (t *TailscaleInContainer) FQDN() (string, error) { return status.Self.DNSName, nil } -// PrettyPeers returns a formatted-ish table of peers in the client. -func (t *TailscaleInContainer) PrettyPeers() (string, error) { +// FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client +// and a bool indicating if the clients online count and peer count is equal. +func (t *TailscaleInContainer) FailingPeersAsString() (string, bool, error) { status, err := t.Status() if err != nil { - return "", fmt.Errorf("failed to get FQDN: %w", err) + return "", false, fmt.Errorf("failed to get FQDN: %w", err) } - str := fmt.Sprintf("Peers of %s\n", t.hostname) - str += "Hostname\tOnline\tLastSeen\n" + var b strings.Builder + + fmt.Fprintf(&b, "Peers of %s\n", t.hostname) + fmt.Fprint(&b, "Hostname\tOnline\tLastSeen\n") peerCount := len(status.Peers()) onlineCount := 0 @@ -711,12 +714,12 @@ func (t *TailscaleInContainer) PrettyPeers() (string, error) { onlineCount++ } - str += fmt.Sprintf("%s\t%t\t%s\n", peer.HostName, peer.Online, peer.LastSeen) + fmt.Fprintf(&b, "%s\t%t\t%s\n", peer.HostName, peer.Online, peer.LastSeen) } - str += fmt.Sprintf("Peer Count: %d, Online Count: %d\n\n", peerCount, onlineCount) + fmt.Fprintf(&b, "Peer Count: %d, Online Count: %d\n\n", peerCount, onlineCount) - return str, nil + return b.String(), peerCount == onlineCount, nil } // WaitForNeedsLogin blocks until the Tailscale (tailscaled) instance has