Skip to content

Commit

Permalink
Rework state and online system
Browse files Browse the repository at this point in the history
This PR further improves the state management system and tries to
make sure that we get all nodes in sync continously.

This is greatly enabled by a previous PR dropping support for older
clients that allowed us to use a Patch field only sending small diffs
for client updates.

It also reworks how the HA subnet router is handled and it should
be a bit easier to follow now.

Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Nov 27, 2023
1 parent c7d1356 commit c0d9d8b
Show file tree
Hide file tree
Showing 33 changed files with 2,770 additions and 706 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# DO NOT EDIT, generated with cmd/gh-action-integration-generator/main.go
# To regenerate, run "go generate" in cmd/gh-action-integration-generator/

name: Integration Test v2 - TestHASubnetRouterFailover

on: [pull_request]

concurrency:
group: ${{ github.workflow }}-$${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
TestHASubnetRouterFailover:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 2

- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- uses: satackey/action-docker-layer-caching@main
continue-on-error: true

- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v34
with:
files: |
*.nix
go.*
**/*.go
integration_test/
config-example.yaml
- name: Run TestHASubnetRouterFailover
uses: Wandalen/wretry.action@master
if: steps.changed-files.outputs.any_changed == 'true'
with:
attempt_limit: 5
command: |
nix develop --command -- docker run \
--tty --rm \
--volume ~/.cache/hs-integration-go:/go \
--name headscale-test-suite \
--volume $PWD:$PWD -w $PWD/integration \
--volume /var/run/docker.sock:/var/run/docker.sock \
--volume $PWD/control_logs:/tmp/control \
golang:1 \
go run gotest.tools/gotestsum@latest -- ./... \
-failfast \
-timeout 120m \
-parallel 1 \
-run "^TestHASubnetRouterFailover$"
- uses: actions/upload-artifact@v3
if: always() && steps.changed-files.outputs.any_changed == 'true'
with:
name: logs
path: "control_logs/*.log"

- uses: actions/upload-artifact@v3
if: always() && steps.changed-files.outputs.any_changed == 'true'
with:
name: pprof
path: "control_logs/*.pprof.tar"
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# DO NOT EDIT, generated with cmd/gh-action-integration-generator/main.go
# To regenerate, run "go generate" in cmd/gh-action-integration-generator/

name: Integration Test v2 - TestNodeOnlineLastSeenStatus

on: [pull_request]

concurrency:
group: ${{ github.workflow }}-$${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
TestNodeOnlineLastSeenStatus:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 2

- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- uses: satackey/action-docker-layer-caching@main
continue-on-error: true

- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v34
with:
files: |
*.nix
go.*
**/*.go
integration_test/
config-example.yaml
- name: Run TestNodeOnlineLastSeenStatus
uses: Wandalen/wretry.action@master
if: steps.changed-files.outputs.any_changed == 'true'
with:
attempt_limit: 5
command: |
nix develop --command -- docker run \
--tty --rm \
--volume ~/.cache/hs-integration-go:/go \
--name headscale-test-suite \
--volume $PWD:$PWD -w $PWD/integration \
--volume /var/run/docker.sock:/var/run/docker.sock \
--volume $PWD/control_logs:/tmp/control \
golang:1 \
go run gotest.tools/gotestsum@latest -- ./... \
-failfast \
-timeout 120m \
-parallel 1 \
-run "^TestNodeOnlineLastSeenStatus$"
- uses: actions/upload-artifact@v3
if: always() && steps.changed-files.outputs.any_changed == 'true'
with:
name: logs
path: "control_logs/*.log"

- uses: actions/upload-artifact@v3
if: always() && steps.changed-files.outputs.any_changed == 'true'
with:
name: pprof
path: "control_logs/*.pprof.tar"
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ after improving the test harness as part of adopting [#1460](https://github.com/
- API: Machine is now Node [#1553](https://github.com/juanfont/headscale/pull/1553)
- Remove support for older Tailscale clients [#1611](https://github.com/juanfont/headscale/pull/1611)
- The latest supported client is 1.32
- Headscale checks that _at least_ one DERP is defined at start [#]()
- If no DERP is configured, the server will fail to start, this can be because it cannot load the DERPMap from file or url.

### Changes

Expand Down
2 changes: 1 addition & 1 deletion cmd/headscale/cli/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func nodesToPtables(
"Ephemeral",
"Last seen",
"Expiration",
"Online",
"Connected",
"Expired",
}
if showTags {
Expand Down
31 changes: 15 additions & 16 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
errUnsupportedLetsEncryptChallengeType = errors.New(
"unknown value for Lets Encrypt challenge type",
)
errEmptyInitialDERPMap = errors.New("initial DERPMap is empty, Headscale requries at least one entry")
)

const (
Expand Down Expand Up @@ -193,7 +194,10 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
}

if derpServerKey.Equal(*noisePrivateKey) {
return nil, fmt.Errorf("DERP server private key and noise private key are the same: %w", err)
return nil, fmt.Errorf(
"DERP server private key and noise private key are the same: %w",
err,
)
}

embeddedDERPServer, err := derpServer.NewDERPServer(
Expand Down Expand Up @@ -259,20 +263,13 @@ func (h *Headscale) scheduledDERPMapUpdateWorker(cancelChan <-chan struct{}) {
h.DERPMap.Regions[region.RegionID] = &region
}

h.nodeNotifier.NotifyAll(types.StateUpdate{
stateUpdate := types.StateUpdate{
Type: types.StateDERPUpdated,
DERPMap: *h.DERPMap,
})
}
}
}

func (h *Headscale) failoverSubnetRoutes(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
for range ticker.C {
err := h.db.HandlePrimarySubnetFailover()
if err != nil {
log.Error().Err(err).Msg("failed to handle primary subnet failover")
DERPMap: h.DERPMap,
}
if stateUpdate.Valid() {
h.nodeNotifier.NotifyAll(stateUpdate)
}
}
}
}
Expand Down Expand Up @@ -505,13 +502,15 @@ func (h *Headscale) Serve() error {
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
}

if len(h.DERPMap.Regions) == 0 {
return errEmptyInitialDERPMap
}

// TODO(kradalby): These should have cancel channels and be cleaned
// up on shutdown.
go h.expireEphemeralNodes(updateInterval)
go h.expireExpiredMachines(updateInterval)

go h.failoverSubnetRoutes(updateInterval)

if zl.GlobalLevel() == zl.TraceLevel {
zerolog.RespLog = true
} else {
Expand Down
98 changes: 54 additions & 44 deletions hscontrol/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,58 @@ import (
"tailscale.com/types/key"
)

func logAuthFunc(
registerRequest tailcfg.RegisterRequest,
machineKey key.MachinePublic,
) (func(string), func(string), func(error, string)) {
return func(msg string) {
log.Info().
Caller().
Str("machine_key", machineKey.ShortString()).
Str("node_key", registerRequest.NodeKey.ShortString()).
Str("node_key_old", registerRequest.OldNodeKey.ShortString()).
Str("node", registerRequest.Hostinfo.Hostname).
Str("followup", registerRequest.Followup).
Time("expiry", registerRequest.Expiry).
Msg(msg)
},
func(msg string) {
log.Trace().
Caller().
Str("machine_key", machineKey.ShortString()).
Str("node_key", registerRequest.NodeKey.ShortString()).
Str("node_key_old", registerRequest.OldNodeKey.ShortString()).
Str("node", registerRequest.Hostinfo.Hostname).
Str("followup", registerRequest.Followup).
Time("expiry", registerRequest.Expiry).
Msg(msg)
},
func(err error, msg string) {
log.Error().
Caller().
Str("machine_key", machineKey.ShortString()).
Str("node_key", registerRequest.NodeKey.ShortString()).
Str("node_key_old", registerRequest.OldNodeKey.ShortString()).
Str("node", registerRequest.Hostinfo.Hostname).
Str("followup", registerRequest.Followup).
Time("expiry", registerRequest.Expiry).
Err(err).
Msg(msg)
}
}

// handleRegister is the logic for registering a client.
func (h *Headscale) handleRegister(
writer http.ResponseWriter,
req *http.Request,
registerRequest tailcfg.RegisterRequest,
machineKey key.MachinePublic,
) {
logInfo, logTrace, logErr := logAuthFunc(registerRequest, machineKey)
now := time.Now().UTC()
logTrace("handleRegister called, looking up machine in DB")
node, err := h.db.GetNodeByAnyKey(machineKey, registerRequest.NodeKey, registerRequest.OldNodeKey)
logTrace("handleRegister database lookup has returned")
if errors.Is(err, gorm.ErrRecordNotFound) {
// If the node has AuthKey set, handle registration via PreAuthKeys
if registerRequest.Auth.AuthKey != "" {
Expand All @@ -42,15 +85,9 @@ func (h *Headscale) handleRegister(
// is that the client will hammer headscale with requests until it gets a
// successful RegisterResponse.
if registerRequest.Followup != "" {
logTrace("register request is a followup")
if _, ok := h.registrationCache.Get(machineKey.String()); ok {
log.Debug().
Caller().
Str("node", registerRequest.Hostinfo.Hostname).
Str("machine_key", machineKey.ShortString()).
Str("node_key", registerRequest.NodeKey.ShortString()).
Str("node_key_old", registerRequest.OldNodeKey.ShortString()).
Str("follow_up", registerRequest.Followup).
Msg("Node is waiting for interactive login")
logTrace("Node is waiting for interactive login")

select {
case <-req.Context().Done():
Expand All @@ -63,26 +100,14 @@ func (h *Headscale) handleRegister(
}
}

log.Info().
Caller().
Str("node", registerRequest.Hostinfo.Hostname).
Str("machine_key", machineKey.ShortString()).
Str("node_key", registerRequest.NodeKey.ShortString()).
Str("node_key_old", registerRequest.OldNodeKey.ShortString()).
Str("follow_up", registerRequest.Followup).
Msg("New node not yet in the database")
logInfo("Node not found in databas, creating new")

givenName, err := h.db.GenerateGivenName(
machineKey,
registerRequest.Hostinfo.Hostname,
)
if err != nil {
log.Error().
Caller().
Str("func", "RegistrationHandler").
Str("hostinfo.name", registerRequest.Hostinfo.Hostname).
Err(err).
Msg("Failed to generate given name for node")
logErr(err, "Failed to generate given name for node")

return
}
Expand All @@ -101,11 +126,7 @@ func (h *Headscale) handleRegister(
}

if !registerRequest.Expiry.IsZero() {
log.Trace().
Caller().
Str("node", registerRequest.Hostinfo.Hostname).
Time("expiry", registerRequest.Expiry).
Msg("Non-zero expiry time requested")
logTrace("Non-zero expiry time requested")
newNode.Expiry = &registerRequest.Expiry
}

Expand Down Expand Up @@ -419,13 +440,12 @@ func (h *Headscale) handleNewNode(
registerRequest tailcfg.RegisterRequest,
machineKey key.MachinePublic,
) {
logInfo, logTrace, logErr := logAuthFunc(registerRequest, machineKey)

resp := tailcfg.RegisterResponse{}

// The node registration is new, redirect the client to the registration URL
log.Debug().
Caller().
Str("node", registerRequest.Hostinfo.Hostname).
Msg("The node seems to be new, sending auth url")
logTrace("The node seems to be new, sending auth url")

if h.oauth2Config != nil {
resp.AuthURL = fmt.Sprintf(
Expand All @@ -441,10 +461,7 @@ func (h *Headscale) handleNewNode(

respBody, err := json.Marshal(resp)
if err != nil {
log.Error().
Caller().
Err(err).
Msg("Cannot encode message")
logErr(err, "Cannot encode message")
http.Error(writer, "Internal server error", http.StatusInternalServerError)

return
Expand All @@ -454,17 +471,10 @@ func (h *Headscale) handleNewNode(
writer.WriteHeader(http.StatusOK)
_, err = writer.Write(respBody)
if err != nil {
log.Error().
Caller().
Err(err).
Msg("Failed to write response")
logErr(err, "Failed to write response")
}

log.Info().
Caller().
Str("AuthURL", resp.AuthURL).
Str("node", registerRequest.Hostinfo.Hostname).
Msg("Successfully sent auth url")
logInfo(fmt.Sprintf("Successfully sent auth url: %s", resp.AuthURL))
}

func (h *Headscale) handleNodeLogOut(
Expand Down
Loading

0 comments on commit c0d9d8b

Please sign in to comment.