ensure expire routines are cleaned up (#1924)
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
parent
a9c568c801
commit
622aa82da2
1 changed files with 63 additions and 46 deletions
109
hscontrol/app.go
109
hscontrol/app.go
|
@ -70,7 +70,7 @@ var (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
AuthPrefix = "Bearer "
|
AuthPrefix = "Bearer "
|
||||||
updateInterval = 5000
|
updateInterval = 5 * time.Second
|
||||||
privateKeyFileMode = 0o600
|
privateKeyFileMode = 0o600
|
||||||
headscaleDirPerm = 0o700
|
headscaleDirPerm = 0o700
|
||||||
|
|
||||||
|
@ -219,64 +219,75 @@ func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
// deleteExpireEphemeralNodes deletes ephemeral node records that have not been
|
// deleteExpireEphemeralNodes deletes ephemeral node records that have not been
|
||||||
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
|
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
|
||||||
func (h *Headscale) deleteExpireEphemeralNodes(milliSeconds int64) {
|
func (h *Headscale) deleteExpireEphemeralNodes(ctx context.Context, every time.Duration) {
|
||||||
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
|
ticker := time.NewTicker(every)
|
||||||
|
|
||||||
for range ticker.C {
|
for {
|
||||||
var removed []types.NodeID
|
select {
|
||||||
var changed []types.NodeID
|
case <-ctx.Done():
|
||||||
if err := h.db.Write(func(tx *gorm.DB) error {
|
ticker.Stop()
|
||||||
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
var removed []types.NodeID
|
||||||
|
var changed []types.NodeID
|
||||||
|
if err := h.db.Write(func(tx *gorm.DB) error {
|
||||||
|
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
|
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if removed != nil {
|
if removed != nil {
|
||||||
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
|
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
|
||||||
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
|
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
|
||||||
Type: types.StatePeerRemoved,
|
Type: types.StatePeerRemoved,
|
||||||
Removed: removed,
|
Removed: removed,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if changed != nil {
|
if changed != nil {
|
||||||
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
|
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
|
||||||
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
|
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
|
||||||
Type: types.StatePeerChanged,
|
Type: types.StatePeerChanged,
|
||||||
ChangeNodes: changed,
|
ChangeNodes: changed,
|
||||||
})
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// expireExpiredMachines expires nodes that have an explicit expiry set
|
// expireExpiredNodes expires nodes that have an explicit expiry set
|
||||||
// after that expiry time has passed.
|
// after that expiry time has passed.
|
||||||
func (h *Headscale) expireExpiredMachines(intervalMs int64) {
|
func (h *Headscale) expireExpiredNodes(ctx context.Context, every time.Duration) {
|
||||||
interval := time.Duration(intervalMs) * time.Millisecond
|
ticker := time.NewTicker(every)
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
|
|
||||||
lastCheck := time.Unix(0, 0)
|
lastCheck := time.Unix(0, 0)
|
||||||
var update types.StateUpdate
|
var update types.StateUpdate
|
||||||
var changed bool
|
var changed bool
|
||||||
|
|
||||||
for range ticker.C {
|
for {
|
||||||
if err := h.db.Write(func(tx *gorm.DB) error {
|
select {
|
||||||
lastCheck, update, changed = db.ExpireExpiredNodes(tx, lastCheck)
|
case <-ctx.Done():
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := h.db.Write(func(tx *gorm.DB) error {
|
||||||
|
lastCheck, update, changed = db.ExpireExpiredNodes(tx, lastCheck)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Error().Err(err).Msg("database error while expiring nodes")
|
log.Error().Err(err).Msg("database error while expiring nodes")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if changed {
|
if changed {
|
||||||
log.Trace().Interface("nodes", update.ChangePatches).Msgf("expiring nodes")
|
log.Trace().Interface("nodes", update.ChangePatches).Msgf("expiring nodes")
|
||||||
|
|
||||||
ctx := types.NotifyCtx(context.Background(), "expire-expired", "na")
|
ctx := types.NotifyCtx(context.Background(), "expire-expired", "na")
|
||||||
h.nodeNotifier.NotifyAll(ctx, update)
|
h.nodeNotifier.NotifyAll(ctx, update)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -538,10 +549,13 @@ func (h *Headscale) Serve() error {
|
||||||
return errEmptyInitialDERPMap
|
return errEmptyInitialDERPMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(kradalby): These should have cancel channels and be cleaned
|
expireEphemeralCtx, expireEphemeralCancel := context.WithCancel(context.Background())
|
||||||
// up on shutdown.
|
defer expireEphemeralCancel()
|
||||||
go h.deleteExpireEphemeralNodes(updateInterval)
|
go h.deleteExpireEphemeralNodes(expireEphemeralCtx, updateInterval)
|
||||||
go h.expireExpiredMachines(updateInterval)
|
|
||||||
|
expireNodeCtx, expireNodeCancel := context.WithCancel(context.Background())
|
||||||
|
defer expireNodeCancel()
|
||||||
|
go h.expireExpiredNodes(expireNodeCtx, updateInterval)
|
||||||
|
|
||||||
if zl.GlobalLevel() == zl.TraceLevel {
|
if zl.GlobalLevel() == zl.TraceLevel {
|
||||||
zerolog.RespLog = true
|
zerolog.RespLog = true
|
||||||
|
@ -805,6 +819,9 @@ func (h *Headscale) Serve() error {
|
||||||
Str("signal", sig.String()).
|
Str("signal", sig.String()).
|
||||||
Msg("Received signal to stop, shutting down gracefully")
|
Msg("Received signal to stop, shutting down gracefully")
|
||||||
|
|
||||||
|
expireNodeCancel()
|
||||||
|
expireEphemeralCancel()
|
||||||
|
|
||||||
trace("closing map sessions")
|
trace("closing map sessions")
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for _, mapSess := range h.mapSessions {
|
for _, mapSess := range h.mapSessions {
|
||||||
|
|
Loading…
Reference in a new issue