Skip to content

Commit

Permalink
Merge pull request #45 from xmidt-org/denopink/feat/connect-to-xmidt
Browse files Browse the repository at this point in the history
feat: connect to xmidt cluster
  • Loading branch information
schmidtw authored Mar 29, 2024
2 parents 667bcef + 4ea0f1e commit 1b1515e
Show file tree
Hide file tree
Showing 16 changed files with 638 additions and 151 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*.out

# VS Code directories
.vscode
*.code-workspace
.vscode/*
.dev/*
__debug_bin*

# Dependency directories (remove the comment below to include it)
# vendor/
Expand Down
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Maintainers of this repository:

* Weston Schmidt @schmidtw
* Joel Unzain @joe94
* Owen Cabalceta @denopink
* John Bass @johnabass
* Nick Harter @njharter
81 changes: 81 additions & 0 deletions cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ package main
import (
"fmt"
"io/fs"
"net/http"
"os"
"time"

"github.com/goschtalt/goschtalt"
"github.com/xmidt-org/arrange/arrangehttp"
"github.com/xmidt-org/retry"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/wrp-go/v3"
"gopkg.in/dealancer/validate.v2"
)

// Config is the configuration for the xmidt-agent.
type Config struct {
Websocket Websocket
Identity Identity
OperationalState OperationalState
XmidtCredentials XmidtCredentials
Expand All @@ -26,6 +29,45 @@ type Config struct {
Storage Storage
}

type Websocket struct {
// Disable determines whether or not to disable xmidt-agent's websocket
Disable bool
// URLPath is the device registration url path
URLPath string
// AdditionalHeaders are any additional headers for the WS connection.
AdditionalHeaders http.Header
// FetchURLTimeout is the timeout for the fetching the WS url. If this is not set, the default is 30 seconds.
FetchURLTimeout time.Duration
// PingInterval is the ping interval allowed for the WS connection.
PingInterval time.Duration
// PingTimeout is the ping timeout for the WS connection.
PingTimeout time.Duration
// ConnectTimeout is the connect timeout for the WS connection.
ConnectTimeout time.Duration
// KeepAliveInterval is the keep alive interval for the WS connection.
KeepAliveInterval time.Duration
// IdleConnTimeout is the idle connection timeout for the WS connection.
IdleConnTimeout time.Duration
// TLSHandshakeTimeout is the TLS handshake timeout for the WS connection.
TLSHandshakeTimeout time.Duration
// ExpectContinueTimeout is the expect continue timeout for the WS connection.
ExpectContinueTimeout time.Duration
// MaxMessageBytes is the largest allowable message to send or receive.
MaxMessageBytes int64
// (optional) DisableV4 determines whether or not to allow IPv4 for the WS connection.
// If this is not set, the default is false (IPv4 is enabled).
// Either V4 or V6 can be disabled, but not both.
DisableV4 bool
// (optional) DisableV6 determines whether or not to allow IPv6 for the WS connection.
// If this is not set, the default is false (IPv6 is enabled).
// Either V4 or V6 can be disabled, but not both.
DisableV6 bool
// RetryPolicy sets the retry policy factory used for delaying between retry attempts for reconnection.
RetryPolicy retry.Config
// Once sets whether or not to only attempt to connect once.
Once bool
}

// Identity contains the information that identifies the device.
type Identity struct {
// DeviceID is the unique identifier for the device. Generally this is a
Expand Down Expand Up @@ -209,4 +251,43 @@ var defaultConfig = Config{
},
},
},
Websocket: Websocket{
URLPath: "api/v2/device",
FetchURLTimeout: 30 * time.Second,
PingInterval: 30 * time.Second,
PingTimeout: 90 * time.Second,
ConnectTimeout: 30 * time.Second,
KeepAliveInterval: 30 * time.Second,
IdleConnTimeout: 10 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxMessageBytes: 256 * 1024,
/*
This retry policy gives us a very good approximation of the prior
policy. The important things about this policy are:
1. The backoff increases up to the max.
2. There is jitter that spreads the load so windows do not overlap.
iteration | parodus | this implementation
----------+-----------+----------------
0 | 0-1s | 0.666 - 1.333
1 | 1s-3s | 1.333 - 2.666
2 | 3s-7s | 2.666 - 5.333
3 | 7s-15s | 5.333 - 10.666
4 | 15s-31s | 10.666 - 21.333
5 | 31s-63s | 21.333 - 42.666
6 | 63s-127s | 42.666 - 85.333
7 | 127s-255s | 85.333 - 170.666
8 | 255s-511s | 170.666 - 341.333
9 | 255s-511s | 341.333
n | 255s-511s | 341.333
*/
RetryPolicy: retry.Config{
Interval: time.Second,
Multiplier: 2.0,
Jitter: 1.0 / 3.0,
MaxInterval: 341*time.Second + 333*time.Millisecond,
},
},
}
2 changes: 1 addition & 1 deletion cmd/xmidt-agent/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func provideCredentials(in credsIn) (*credentials.Credentials, error) {
credentials.RefetchPercent(in.Creds.RefetchPercent),
credentials.AddFetchListener(event.FetchListenerFunc(
func(e event.Fetch) {
logger.Info("fetch",
logger.Debug("fetch",
zap.String("origin", e.Origin),
zap.Time("at", e.At),
zap.Duration("duration", e.Duration),
Expand Down
24 changes: 17 additions & 7 deletions cmd/xmidt-agent/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"strings"

"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/jwtxt"
"github.com/xmidt-org/xmidt-agent/internal/jwtxt/event"
"go.uber.org/fx"
Expand All @@ -20,13 +21,18 @@ type instructionsIn struct {
ID Identity
Logger *zap.Logger
}
type instructionsOut struct {
fx.Out
JWTXT *jwtxt.Instructions
DeviceID wrp.DeviceID
}

func provideInstructions(in instructionsIn) (*jwtxt.Instructions, error) {
func provideInstructions(in instructionsIn) (instructionsOut, error) {
// If no PEMs are provided then the jwtxt can't be used because it won't
// have any keys to use.
if in.Service.URL == "" ||
(in.Service.JwtTxtRedirector.PEMFiles == nil && in.Service.JwtTxtRedirector.PEMs == nil) {
return nil, nil
return instructionsOut{}, nil
}

logger := in.Logger.Named("jwtxt")
Expand All @@ -38,7 +44,7 @@ func provideInstructions(in instructionsIn) (*jwtxt.Instructions, error) {
jwtxt.Timeout(in.Service.JwtTxtRedirector.Timeout),
jwtxt.WithFetchListener(event.FetchListenerFunc(
func(fe event.Fetch) {
logger.Info("fetch",
logger.Debug("fetch",
zap.String("fqdn", fe.FQDN),
zap.String("server", fe.Server),
zap.Bool("found", fe.Found),
Expand All @@ -61,12 +67,12 @@ func provideInstructions(in instructionsIn) (*jwtxt.Instructions, error) {
block, rest := pem.Decode([]byte(item))

if block == nil || strings.TrimSpace(string(rest)) != "" {
return nil, jwtxt.ErrInvalidInput
return instructionsOut{}, jwtxt.ErrInvalidInput
}

buf := pem.EncodeToMemory(block)
if buf == nil {
return nil, jwtxt.ErrInvalidInput
return instructionsOut{}, jwtxt.ErrInvalidInput
}

pems = append(pems, buf)
Expand All @@ -78,11 +84,15 @@ func provideInstructions(in instructionsIn) (*jwtxt.Instructions, error) {
for _, pemFile := range in.Service.JwtTxtRedirector.PEMFiles {
data, err := os.ReadFile(pemFile)
if err != nil {
return nil, err
return instructionsOut{}, err
}
opts = append(opts, jwtxt.WithPEMs(data))
}
}

return jwtxt.New(opts...)
jwtxt, err := jwtxt.New(opts...)

return instructionsOut{
JWTXT: jwtxt,
DeviceID: in.ID.DeviceID}, err
}
Loading

0 comments on commit 1b1515e

Please sign in to comment.