Merge pull request #542 from mpldr/issue-342-send-on-closed-channel
This commit is contained in:
commit
367f8489db
2 changed files with 46 additions and 23 deletions
|
@ -5,6 +5,7 @@
|
||||||
### Changes
|
### Changes
|
||||||
|
|
||||||
- Fix labels cardinality error when registering unknown pre-auth key [#519](https://github.com/juanfont/headscale/pull/519)
|
- Fix labels cardinality error when registering unknown pre-auth key [#519](https://github.com/juanfont/headscale/pull/519)
|
||||||
|
- Fix send on closed channel crash in polling [#542](https://github.com/juanfont/headscale/pull/542)
|
||||||
|
|
||||||
## 0.15.0 (2022-03-20)
|
## 0.15.0 (2022-03-20)
|
||||||
|
|
||||||
|
|
68
poll.go
68
poll.go
|
@ -175,32 +175,13 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
|
||||||
Str("machine", machine.Name).
|
Str("machine", machine.Name).
|
||||||
Msg("Loading or creating update channel")
|
Msg("Loading or creating update channel")
|
||||||
|
|
||||||
// TODO: could probably remove all that duplication once generics land.
|
|
||||||
closeChanWithLog := func(channel interface{}, name string) {
|
|
||||||
log.Trace().
|
|
||||||
Str("handler", "PollNetMap").
|
|
||||||
Str("machine", machine.Name).
|
|
||||||
Str("channel", "Done").
|
|
||||||
Msg(fmt.Sprintf("Closing %s channel", name))
|
|
||||||
|
|
||||||
switch c := channel.(type) {
|
|
||||||
case (chan struct{}):
|
|
||||||
close(c)
|
|
||||||
|
|
||||||
case (chan []byte):
|
|
||||||
close(c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const chanSize = 8
|
const chanSize = 8
|
||||||
updateChan := make(chan struct{}, chanSize)
|
updateChan := make(chan struct{}, chanSize)
|
||||||
defer closeChanWithLog(updateChan, "updateChan")
|
|
||||||
|
|
||||||
pollDataChan := make(chan []byte, chanSize)
|
pollDataChan := make(chan []byte, chanSize)
|
||||||
defer closeChanWithLog(pollDataChan, "pollDataChan")
|
defer closeChanWithLog(pollDataChan, machine.Name, "pollDataChan")
|
||||||
|
|
||||||
keepAliveChan := make(chan []byte)
|
keepAliveChan := make(chan []byte)
|
||||||
defer closeChanWithLog(keepAliveChan, "keepAliveChan")
|
|
||||||
|
|
||||||
if req.OmitPeers && !req.Stream {
|
if req.OmitPeers && !req.Stream {
|
||||||
log.Info().
|
log.Info().
|
||||||
|
@ -273,7 +254,27 @@ func (h *Headscale) PollNetMapStream(
|
||||||
updateChan chan struct{},
|
updateChan chan struct{},
|
||||||
) {
|
) {
|
||||||
{
|
{
|
||||||
ctx, cancel := context.WithCancel(ctx.Request.Context())
|
machine, err := h.GetMachineByMachineKey(machineKey)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
log.Warn().
|
||||||
|
Str("handler", "PollNetMap").
|
||||||
|
Msgf("Ignoring request, cannot find machine with key %s", machineKey.String())
|
||||||
|
ctx.String(http.StatusUnauthorized, "")
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Error().
|
||||||
|
Str("handler", "PollNetMap").
|
||||||
|
Msgf("Failed to fetch machine from the database with Machine key: %s", machineKey.String())
|
||||||
|
ctx.String(http.StatusInternalServerError, "")
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.WithValue(ctx.Request.Context(), "machineName", machine.Name)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go h.scheduledPollWorker(
|
go h.scheduledPollWorker(
|
||||||
|
@ -564,8 +565,8 @@ func (h *Headscale) PollNetMapStream(
|
||||||
|
|
||||||
func (h *Headscale) scheduledPollWorker(
|
func (h *Headscale) scheduledPollWorker(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
updateChan chan<- struct{},
|
updateChan chan struct{},
|
||||||
keepAliveChan chan<- []byte,
|
keepAliveChan chan []byte,
|
||||||
machineKey key.MachinePublic,
|
machineKey key.MachinePublic,
|
||||||
mapRequest tailcfg.MapRequest,
|
mapRequest tailcfg.MapRequest,
|
||||||
machine *Machine,
|
machine *Machine,
|
||||||
|
@ -573,6 +574,17 @@ func (h *Headscale) scheduledPollWorker(
|
||||||
keepAliveTicker := time.NewTicker(keepAliveInterval)
|
keepAliveTicker := time.NewTicker(keepAliveInterval)
|
||||||
updateCheckerTicker := time.NewTicker(updateCheckInterval)
|
updateCheckerTicker := time.NewTicker(updateCheckInterval)
|
||||||
|
|
||||||
|
defer closeChanWithLog(
|
||||||
|
updateChan,
|
||||||
|
fmt.Sprint(ctx.Value("machineName")),
|
||||||
|
"updateChan",
|
||||||
|
)
|
||||||
|
defer closeChanWithLog(
|
||||||
|
keepAliveChan,
|
||||||
|
fmt.Sprint(ctx.Value("machineName")),
|
||||||
|
"updateChan",
|
||||||
|
)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -606,3 +618,13 @@ func (h *Headscale) scheduledPollWorker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func closeChanWithLog[C chan []byte | chan struct{}](channel C, machine, name string) {
|
||||||
|
log.Trace().
|
||||||
|
Str("handler", "PollNetMap").
|
||||||
|
Str("machine", machine).
|
||||||
|
Str("channel", "Done").
|
||||||
|
Msg(fmt.Sprintf("Closing %s channel", name))
|
||||||
|
|
||||||
|
close(channel)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue