Skip to content

Commit

Permalink
introduce a simple batcher, simplify mapresponsing
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Mar 1, 2024
1 parent f4a063f commit 48993bb
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 83 deletions.
2 changes: 1 addition & 1 deletion hscontrol/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (h *Headscale) handleNodeLogOut(
ctx := types.NotifyCtx(context.Background(), "logout-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: []tailcfg.NodeID{tailcfg.NodeID(node.ID)},
Removed: []types.NodeID{node.ID},
})

return
Expand Down
4 changes: 2 additions & 2 deletions hscontrol/db/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func ExpireEphemeralNodes(tx *gorm.DB,
return types.StateUpdate{}, false
}

expired := make([]tailcfg.NodeID, 0)
expired := make([]types.NodeID, 0)
for _, user := range users {
nodes, err := ListNodesByUser(tx, user.Name)
if err != nil {
Expand All @@ -702,7 +702,7 @@ func ExpireEphemeralNodes(tx *gorm.DB,
if node.IsEphemeral() && node.LastSeen != nil &&
time.Now().
After(node.LastSeen.Add(inactivityThreshhold)) {
expired = append(expired, tailcfg.NodeID(node.ID))
expired = append(expired, node.ID)

log.Info().
Str("node", node.Hostname).
Expand Down
2 changes: 1 addition & 1 deletion hscontrol/grpcv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (api headscaleV1APIServer) DeleteNode(
ctx = types.NotifyCtx(ctx, "cli-deletenode", node.Hostname)
api.h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: []tailcfg.NodeID{tailcfg.NodeID(node.ID)},
Removed: []types.NodeID{node.ID},
})

return &v1.DeleteNodeResponse{}, nil
Expand Down
179 changes: 101 additions & 78 deletions hscontrol/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (h *Headscale) newMapSession(
// to receive a message to make sure we dont block the entire
// notifier.
// 12 is arbitrarily chosen.
chanSize := 3
chanSize := 3000
if size, ok := envknob.LookupInt("HEADSCALE_TUNING_POLL_QUEUE_SIZE"); ok {
chanSize = size
}
Expand Down Expand Up @@ -181,30 +181,6 @@ func (m *mapSession) serve() {
m.h.pollNetMapStreamWG.Add(1)
defer m.h.pollNetMapStreamWG.Done()

m.tracef("Sending initial map")

mapResp, err := m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy)
if err != nil {
m.errf(err, "Failed to create MapResponse")
http.Error(m.w, "", http.StatusInternalServerError)

return
}

// Send the client an update to make sure we send an initial mapresponse
_, err = m.w.Write(mapResp)
if err != nil {
m.errf(err, "Could not write the map response")

return
}

if flusher, ok := m.w.(http.Flusher); ok {
flusher.Flush()
} else {
return
}

if len(m.node.Routes) > 0 {
go m.pollFailoverRoutes("new node", m.node)
}
Expand All @@ -214,14 +190,33 @@ func (m *mapSession) serve() {
ctx, cancel := context.WithCancel(context.WithValue(m.ctx, nodeNameContextKey, m.node.Hostname))
defer cancel()

// TODO(kradalby): Make this available through a tuning envvar
wait := time.Second

// Add a circuit breaker, if the loop is not interrupted
// inbetween listening for the channels, some updates
// might get stale and stucked in the "changed" map
// defined below.
blockBreaker := time.NewTicker(wait)

// true means changed, false means removed
var changed map[types.NodeID]bool
var messages []string
var derp bool

// Set full to true to immediatly send a full mapresponse
full := true
prev := time.Now()

// Loop through updates and continuously send them to the
// client.
for {
m.tracef("waiting for update on stream channel")
select {
case update := <-m.ch:
m.tracef("received stream update: %d %s", update.Type, update.Message)
var data []byte
var err error
var data []byte
var err error

// If a full update has been requested, then send it immediately
// otherwise wait for the "batching"
if full || (changed != nil && time.Since(prev) > wait) {
// Ensure the node object is updated, for example, there
// might have been a hostinfo update in a sidechannel
// which contains data needed to generate a map response.
Expand All @@ -232,69 +227,97 @@ func (m *mapSession) serve() {
return
}

startMapResp := time.Now()
switch update.Type {
case types.StateFullUpdate:
if full {
m.tracef("Sending Full MapResponse")
data, err = m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy)
case types.StatePeerChanged:
m.tracef(fmt.Sprintf("Sending Changed MapResponse: %s", update.Message))
data, err = m.mapper.PeerChangedResponse(m.req, m.node, update.ChangeNodes, m.h.ACLPolicy, update.Message)
case types.StatePeerChangedPatch:
m.tracef("Sending PeerChangedPatch MapResponse")
data, err = m.mapper.PeerChangedPatchResponse(m.req, m.node, update.ChangePatches, m.h.ACLPolicy)
case types.StatePeerRemoved:
m.tracef("Sending PeerRemoved MapResponse")
data, err = m.mapper.PeerRemovedResponse(m.req, m.node, update.Removed)
case types.StateSelfUpdate:
if len(update.ChangeNodes) == 1 {
m.tracef("Sending SelfUpdate MapResponse")
m.node, err = m.h.db.GetNodeByID(m.node.ID)
if err != nil {
m.errf(err, "could not update node from db for selfupdate")

return
}
data, err = m.mapper.ReadOnlyMapResponse(m.req, m.node, m.h.ACLPolicy, types.SelfUpdateIdentifier)
} else {
m.warnf("SelfUpdate contained too many nodes, this is likely a bug in the code, please report.")
}
case types.StateDERPUpdated:
} else if derp {
m.tracef("Sending DERPUpdate MapResponse")
data, err = m.mapper.DERPMapResponse(m.req, m.node, update.DERPMap)
data, err = m.mapper.DERPMapResponse(m.req, m.node, m.h.DERPMap)
} else {
m.tracef(fmt.Sprintf("Sending Changed MapResponse: %v", messages))
data, err = m.mapper.PeerChangedResponse(m.req, m.node, changed, m.h.ACLPolicy, messages...)
}

// reset
changed = nil
messages = nil
full = false
derp = false
prev = time.Now()
}

if err != nil {
m.errf(err, "Could not get the create map update")

return
}

// log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response")

// Only send update if there is change
if data != nil {
startWrite := time.Now()
_, err = m.w.Write(data)
if err != nil {
m.errf(err, "Could not get the create map update")
m.errf(err, "Could not write the map response")

updateRequestsSentToNode.WithLabelValues(m.node.User.Name, m.node.Hostname, "failed").
Inc()

return
}

log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response")
if flusher, ok := m.w.(http.Flusher); ok {
flusher.Flush()
} else {
log.Error().Msg("Failed to create http flusher")

// Only send update if there is change
if data != nil {
startWrite := time.Now()
_, err = m.w.Write(data)
if err != nil {
m.errf(err, "Could not write the map response")
return
}
log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", m.node.MachineKey.String()).Msg("finished writing mapresp to node")

updateRequestsSentToNode.WithLabelValues(m.node.User.Name, m.node.Hostname, "failed").
Inc()
m.infof("update sent")
}

return
}
// consume channels with update, keep alives or "batch" blocking signals
select {
// Avoid infinite block that would potentially leave
// some updates in the changed map.
case <-blockBreaker.C:
continue

if flusher, ok := m.w.(http.Flusher); ok {
flusher.Flush()
} else {
log.Error().Msg("Failed to create http flusher")
// Consume all updates sent to node
case update := <-m.ch:
m.tracef("received stream update: %d %s", update.Type, update.Message)

return
switch update.Type {
case types.StateFullUpdate:
full = true
case types.StatePeerChanged:
if changed == nil {
changed = make(map[types.NodeID]bool)
}
log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished writing mapresp to node")

m.infof("update sent")
for _, nodeID := range update.ChangeNodes {
changed[nodeID] = true
}
messages = append(messages, update.Message)
case types.StatePeerChangedPatch:
// TODO(kradalby):
case types.StatePeerRemoved:
if changed == nil {
changed = make(map[types.NodeID]bool)
}
for _, nodeID := range update.Removed {
changed[nodeID] = false
}
case types.StateSelfUpdate:
// create the map so an empty (self) update is sent
if changed == nil {
changed = make(map[types.NodeID]bool)
}
messages = append(messages, update.Message)
case types.StateDERPUpdated:
derp = true
}

case <-keepAliveTicker.C:
Expand Down
2 changes: 1 addition & 1 deletion hscontrol/types/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type StateUpdate struct {
// Removed must be set when Type is StatePeerRemoved and
// contain a list of the nodes that has been removed from
// the network.
Removed []tailcfg.NodeID
Removed []NodeID

// DERPMap must be set when Type is StateDERPUpdated and
// contain the new DERP Map.
Expand Down

0 comments on commit 48993bb

Please sign in to comment.