Refactor tls and wire up grpc, grpc gateway/api
This commit moves the TLS configuration into a seperate function. It also wires up the gRPC interface and prepares handing the API endpoints to the grpc gateway.
This commit is contained in:
parent
caa4d33cbd
commit
2f045b20fb
1 changed files with 105 additions and 22 deletions
127
app.go
127
app.go
|
@ -1,8 +1,11 @@
|
||||||
package headscale
|
package headscale
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
@ -11,12 +14,16 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||||
|
apiV1 "github.com/juanfont/headscale/gen/go/v1"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"github.com/soheilhy/cmux"
|
||||||
ginprometheus "github.com/zsais/go-gin-prometheus"
|
ginprometheus "github.com/zsais/go-gin-prometheus"
|
||||||
"golang.org/x/crypto/acme"
|
"golang.org/x/crypto/acme"
|
||||||
"golang.org/x/crypto/acme/autocert"
|
"golang.org/x/crypto/acme/autocert"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
"google.golang.org/grpc"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"inet.af/netaddr"
|
"inet.af/netaddr"
|
||||||
"tailscale.com/tailcfg"
|
"tailscale.com/tailcfg"
|
||||||
|
@ -24,7 +31,7 @@ import (
|
||||||
"tailscale.com/types/wgkey"
|
"tailscale.com/types/wgkey"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config contains the initial Headscale configuration
|
// Config contains the initial Headscale configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
ServerURL string
|
ServerURL string
|
||||||
Addr string
|
Addr string
|
||||||
|
@ -64,7 +71,7 @@ type DERPConfig struct {
|
||||||
UpdateFrequency time.Duration
|
UpdateFrequency time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Headscale represents the base app of the service
|
// Headscale represents the base app of the service.
|
||||||
type Headscale struct {
|
type Headscale struct {
|
||||||
cfg Config
|
cfg Config
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
|
@ -82,12 +89,13 @@ type Headscale struct {
|
||||||
lastStateChange sync.Map
|
lastStateChange sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHeadscale returns the Headscale app
|
// NewHeadscale returns the Headscale app.
|
||||||
func NewHeadscale(cfg Config) (*Headscale, error) {
|
func NewHeadscale(cfg Config) (*Headscale, error) {
|
||||||
content, err := os.ReadFile(cfg.PrivateKeyPath)
|
content, err := os.ReadFile(cfg.PrivateKeyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
privKey, err := wgkey.ParsePrivate(string(content))
|
privKey, err := wgkey.ParsePrivate(string(content))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -136,14 +144,14 @@ func NewHeadscale(cfg Config) (*Headscale, error) {
|
||||||
return &h, nil
|
return &h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Redirect to our TLS url
|
// Redirect to our TLS url.
|
||||||
func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
|
func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
|
||||||
target := h.cfg.ServerURL + req.URL.RequestURI()
|
target := h.cfg.ServerURL + req.URL.RequestURI()
|
||||||
http.Redirect(w, req, target, http.StatusFound)
|
http.Redirect(w, req, target, http.StatusFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
// expireEphemeralNodes deletes ephemeral machine records that have not been
|
// expireEphemeralNodes deletes ephemeral machine records that have not been
|
||||||
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout
|
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
|
||||||
func (h *Headscale) expireEphemeralNodes(milliSeconds int64) {
|
func (h *Headscale) expireEphemeralNodes(milliSeconds int64) {
|
||||||
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
|
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
|
@ -155,18 +163,23 @@ func (h *Headscale) expireEphemeralNodesWorker() {
|
||||||
namespaces, err := h.ListNamespaces()
|
namespaces, err := h.ListNamespaces()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Error listing namespaces")
|
log.Error().Err(err).Msg("Error listing namespaces")
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ns := range *namespaces {
|
for _, ns := range *namespaces {
|
||||||
machines, err := h.ListMachinesInNamespace(ns.Name)
|
machines, err := h.ListMachinesInNamespace(ns.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Str("namespace", ns.Name).Msg("Error listing machines in namespace")
|
log.Error().Err(err).Str("namespace", ns.Name).Msg("Error listing machines in namespace")
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range *machines {
|
for _, m := range *machines {
|
||||||
if m.AuthKey != nil && m.LastSeen != nil && m.AuthKey.Ephemeral &&
|
if m.AuthKey != nil && m.LastSeen != nil && m.AuthKey.Ephemeral &&
|
||||||
time.Now().After(m.LastSeen.Add(h.cfg.EphemeralNodeInactivityTimeout)) {
|
time.Now().After(m.LastSeen.Add(h.cfg.EphemeralNodeInactivityTimeout)) {
|
||||||
log.Info().Str("machine", m.Name).Msg("Ephemeral client removed from database")
|
log.Info().Str("machine", m.Name).Msg("Ephemeral client removed from database")
|
||||||
|
|
||||||
err = h.db.Unscoped().Delete(m).Error
|
err = h.db.Unscoped().Delete(m).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().
|
log.Error().
|
||||||
|
@ -176,12 +189,13 @@ func (h *Headscale) expireEphemeralNodesWorker() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.setLastStateChangeToNow(ns.Name)
|
h.setLastStateChangeToNow(ns.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchForKVUpdates checks the KV DB table for requests to perform tailnet upgrades
|
// WatchForKVUpdates checks the KV DB table for requests to perform tailnet upgrades
|
||||||
// This is a way to communitate the CLI with the headscale server
|
// This is a way to communitate the CLI with the headscale server.
|
||||||
func (h *Headscale) watchForKVUpdates(milliSeconds int64) {
|
func (h *Headscale) watchForKVUpdates(milliSeconds int64) {
|
||||||
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
|
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
|
@ -194,24 +208,60 @@ func (h *Headscale) watchForKVUpdatesWorker() {
|
||||||
// more functions will come here in the future
|
// more functions will come here in the future
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve launches a GIN server with the Headscale API
|
// Serve launches a GIN server with the Headscale API.
|
||||||
func (h *Headscale) Serve() error {
|
func (h *Headscale) Serve() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
l, err := net.Listen("tcp", h.cfg.Addr)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the cmux object that will multiplex 2 protocols on the same port.
|
||||||
|
// The two following listeners will be served on the same port below gracefully.
|
||||||
|
m := cmux.New(l)
|
||||||
|
// Match gRPC requests here
|
||||||
|
grpcListener := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
|
||||||
|
// Otherwise match regular http requests.
|
||||||
|
httpListener := m.Match(cmux.Any())
|
||||||
|
|
||||||
|
// Now create the grpc server with those options.
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
|
||||||
|
// TODO(kradalby): register the new server when we have authentication ready
|
||||||
|
// apiV1.RegisterHeadscaleServiceServer(grpcServer, newHeadscaleV1APIServer(h))
|
||||||
|
|
||||||
|
grpcGatewayMux := runtime.NewServeMux()
|
||||||
|
|
||||||
|
opts := []grpc.DialOption{grpc.WithInsecure()}
|
||||||
|
|
||||||
|
err = apiV1.RegisterHeadscaleServiceHandlerFromEndpoint(ctx, grpcGatewayMux, h.cfg.Addr, opts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
r := gin.Default()
|
r := gin.Default()
|
||||||
|
|
||||||
p := ginprometheus.NewPrometheus("gin")
|
p := ginprometheus.NewPrometheus("gin")
|
||||||
p.Use(r)
|
p.Use(r)
|
||||||
|
|
||||||
r.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{"healthy": "ok"}) })
|
r.GET("/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"healthy": "ok"}) })
|
||||||
r.GET("/key", h.KeyHandler)
|
r.GET("/key", h.KeyHandler)
|
||||||
r.GET("/register", h.RegisterWebAPI)
|
r.GET("/register", h.RegisterWebAPI)
|
||||||
r.POST("/machine/:id/map", h.PollNetMapHandler)
|
r.POST("/machine/:id/map", h.PollNetMapHandler)
|
||||||
r.POST("/machine/:id", h.RegistrationHandler)
|
r.POST("/machine/:id", h.RegistrationHandler)
|
||||||
r.GET("/apple", h.AppleMobileConfig)
|
r.GET("/apple", h.AppleMobileConfig)
|
||||||
r.GET("/apple/:platform", h.ApplePlatformConfig)
|
r.GET("/apple/:platform", h.ApplePlatformConfig)
|
||||||
var err error
|
|
||||||
|
|
||||||
go h.watchForKVUpdates(5000)
|
r.Any("/api/v1/*any", gin.WrapF(grpcGatewayMux.ServeHTTP))
|
||||||
go h.expireEphemeralNodes(5000)
|
r.StaticFile("/swagger/swagger.json", "gen/openapiv2/v1/headscale.swagger.json")
|
||||||
|
|
||||||
|
updateMillisecondsWait := int64(5000)
|
||||||
|
|
||||||
// Fetch an initial DERP Map before we start serving
|
// Fetch an initial DERP Map before we start serving
|
||||||
h.DERPMap = GetDERPMap(h.cfg.DERP)
|
h.DERPMap = GetDERPMap(h.cfg.DERP)
|
||||||
|
@ -222,7 +272,11 @@ func (h *Headscale) Serve() error {
|
||||||
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
|
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &http.Server{
|
// I HATE THIS
|
||||||
|
go h.watchForKVUpdates(updateMillisecondsWait)
|
||||||
|
go h.expireEphemeralNodes(updateMillisecondsWait)
|
||||||
|
|
||||||
|
httpServer := &http.Server{
|
||||||
Addr: h.cfg.Addr,
|
Addr: h.cfg.Addr,
|
||||||
Handler: r,
|
Handler: r,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
|
@ -233,6 +287,29 @@ func (h *Headscale) Serve() error {
|
||||||
WriteTimeout: 0,
|
WriteTimeout: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tlsConfig, err := h.getTLSSettings()
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to set up TLS configuration")
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tlsConfig != nil {
|
||||||
|
httpServer.TLSConfig = tlsConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
g := new(errgroup.Group)
|
||||||
|
|
||||||
|
g.Go(func() error { return grpcServer.Serve(grpcListener) })
|
||||||
|
g.Go(func() error { return httpServer.Serve(httpListener) })
|
||||||
|
g.Go(func() error { return m.Serve() })
|
||||||
|
|
||||||
|
log.Info().Msgf("listening and serving (multiplexed HTTP and gRPC) on: %s", h.cfg.Addr)
|
||||||
|
|
||||||
|
return g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Headscale) getTLSSettings() (*tls.Config, error) {
|
||||||
if h.cfg.TLSLetsEncryptHostname != "" {
|
if h.cfg.TLSLetsEncryptHostname != "" {
|
||||||
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
|
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
|
||||||
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
|
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
|
||||||
|
@ -248,13 +325,11 @@ func (h *Headscale) Serve() error {
|
||||||
Email: h.cfg.ACMEEmail,
|
Email: h.cfg.ACMEEmail,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.TLSConfig = m.TLSConfig()
|
|
||||||
|
|
||||||
if h.cfg.TLSLetsEncryptChallengeType == "TLS-ALPN-01" {
|
if h.cfg.TLSLetsEncryptChallengeType == "TLS-ALPN-01" {
|
||||||
// Configuration via autocert with TLS-ALPN-01 (https://tools.ietf.org/html/rfc8737)
|
// Configuration via autocert with TLS-ALPN-01 (https://tools.ietf.org/html/rfc8737)
|
||||||
// The RFC requires that the validation is done on port 443; in other words, headscale
|
// The RFC requires that the validation is done on port 443; in other words, headscale
|
||||||
// must be reachable on port 443.
|
// must be reachable on port 443.
|
||||||
err = s.ListenAndServeTLS("", "")
|
return m.TLSConfig(), nil
|
||||||
} else if h.cfg.TLSLetsEncryptChallengeType == "HTTP-01" {
|
} else if h.cfg.TLSLetsEncryptChallengeType == "HTTP-01" {
|
||||||
// Configuration via autocert with HTTP-01. This requires listening on
|
// Configuration via autocert with HTTP-01. This requires listening on
|
||||||
// port 80 for the certificate validation in addition to the headscale
|
// port 80 for the certificate validation in addition to the headscale
|
||||||
|
@ -264,22 +339,30 @@ func (h *Headscale) Serve() error {
|
||||||
Err(http.ListenAndServe(h.cfg.TLSLetsEncryptListen, m.HTTPHandler(http.HandlerFunc(h.redirect)))).
|
Err(http.ListenAndServe(h.cfg.TLSLetsEncryptListen, m.HTTPHandler(http.HandlerFunc(h.redirect)))).
|
||||||
Msg("failed to set up a HTTP server")
|
Msg("failed to set up a HTTP server")
|
||||||
}()
|
}()
|
||||||
err = s.ListenAndServeTLS("", "")
|
|
||||||
|
return m.TLSConfig(), nil
|
||||||
} else {
|
} else {
|
||||||
return errors.New("unknown value for TLSLetsEncryptChallengeType")
|
return nil, errors.New("unknown value for TLSLetsEncryptChallengeType")
|
||||||
}
|
}
|
||||||
} else if h.cfg.TLSCertPath == "" {
|
} else if h.cfg.TLSCertPath == "" {
|
||||||
if !strings.HasPrefix(h.cfg.ServerURL, "http://") {
|
if !strings.HasPrefix(h.cfg.ServerURL, "http://") {
|
||||||
log.Warn().Msg("Listening without TLS but ServerURL does not start with http://")
|
log.Warn().Msg("Listening without TLS but ServerURL does not start with http://")
|
||||||
}
|
}
|
||||||
err = s.ListenAndServe()
|
|
||||||
|
return nil, nil
|
||||||
} else {
|
} else {
|
||||||
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
|
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
|
||||||
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
|
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
|
||||||
}
|
}
|
||||||
err = s.ListenAndServeTLS(h.cfg.TLSCertPath, h.cfg.TLSKeyPath)
|
var err error
|
||||||
|
tlsConfig := &tls.Config{}
|
||||||
|
tlsConfig.ClientAuth = tls.RequireAnyClientCert
|
||||||
|
tlsConfig.NextProtos = []string{"http/1.1"}
|
||||||
|
tlsConfig.Certificates = make([]tls.Certificate, 1)
|
||||||
|
tlsConfig.Certificates[0], err = tls.LoadX509KeyPair(h.cfg.TLSCertPath, h.cfg.TLSKeyPath)
|
||||||
|
|
||||||
|
return tlsConfig, err
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Headscale) setLastStateChangeToNow(namespace string) {
|
func (h *Headscale) setLastStateChangeToNow(namespace string) {
|
||||||
|
|
Loading…
Reference in a new issue