Skip to content

Commit

Permalink
Selenium Grid scaler return count of pending and on-going sessions
Browse files Browse the repository at this point in the history
Signed-off-by: Viet Nguyen Duc <[email protected]>
  • Loading branch information
VietND96 committed Dec 2, 2024
1 parent dfa1a32 commit 46e4550
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 141 deletions.
102 changes: 49 additions & 53 deletions pkg/scalers/selenium_grid_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type seleniumGridScalerMetadata struct {
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, default=linux"`
NodeMaxSessions int `keda:"name=nodeMaxSessions, order=triggerMetadata, default=1"`
TargetQueueLength int64 `keda:"name=targetQueueLength, order=triggerMetadata;resolvedEnv, default=1"`
NodeMaxSessions int64 `keda:"name=nodeMaxSessions, order=triggerMetadata, default=1"`

TargetValue int64
}

type SeleniumResponse struct {
Expand All @@ -54,9 +55,9 @@ type Data struct {
}

type Grid struct {
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
TotalSlots int `json:"totalSlots"`
SessionCount int64 `json:"sessionCount"`
MaxSession int64 `json:"maxSession"`
TotalSlots int64 `json:"totalSlots"`
}

type NodesInfo struct {
Expand All @@ -70,17 +71,17 @@ type SessionsInfo struct {
type Nodes []struct {
ID string `json:"id"`
Status string `json:"status"`
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
SessionCount int64 `json:"sessionCount"`
MaxSession int64 `json:"maxSession"`
SlotCount int64 `json:"slotCount"`
Stereotypes string `json:"stereotypes"`
Sessions Sessions `json:"sessions"`
}

type ReservedNodes struct {
ID string `json:"id"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
MaxSession int64 `json:"maxSession"`
SlotCount int64 `json:"slotCount"`
}

type Sessions []struct {
Expand All @@ -101,13 +102,12 @@ type Capability struct {
}

type Stereotypes []struct {
Slots int `json:"slots"`
Slots int64 `json:"slots"`
Stereotype Capability `json:"stereotype"`
}

const (
DefaultBrowserVersion string = "latest"
DefaultTargetQueueLength int64 = 1
DefaultBrowserVersion string = "latest"
)

func NewSeleniumGridScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand Down Expand Up @@ -135,7 +135,9 @@ func NewSeleniumGridScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}

func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*seleniumGridScalerMetadata, error) {
meta := &seleniumGridScalerMetadata{}
meta := &seleniumGridScalerMetadata{
TargetValue: 1,
}

if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
Expand All @@ -147,9 +149,6 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen
meta.SessionBrowserName = meta.BrowserName
}

if meta.TargetQueueLength < 1 {
meta.TargetQueueLength = DefaultTargetQueueLength
}
return meta, nil
}

Expand All @@ -162,15 +161,14 @@ func (s *seleniumGridScaler) Close(context.Context) error {
}

func (s *seleniumGridScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueLen, err := s.getSessionsQueueLength(ctx, s.logger)
newRequestNodes, onGoingSessions, err := s.getSessionsQueueLength(ctx, s.logger)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error requesting selenium grid endpoint: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueLen))
metric := GenerateMetricInMili(metricName, float64(newRequestNodes+onGoingSessions))

// If the number of sessions queued is equal to or greater than the targetQueueLength, the scaler will scale up.
return []external_metrics.ExternalMetricValue{metric}, queueLen >= s.metadata.TargetQueueLength, nil
return []external_metrics.ExternalMetricValue{metric}, (newRequestNodes + onGoingSessions) > s.metadata.ActivationThreshold, nil
}

func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
Expand All @@ -179,26 +177,26 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueLength),
Target: GetMetricTarget(s.metricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *seleniumGridScaler) getSessionsQueueLength(ctx context.Context, logger logr.Logger) (int64, error) {
func (s *seleniumGridScaler) getSessionsQueueLength(ctx context.Context, logger logr.Logger) (int64, int64, error) {
body, err := json.Marshal(map[string]string{
"query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, stereotypes, sessions { id, capabilities, slot { id, stereotype } } } }, sessionsInfo { sessionQueueRequests } }",
})

if err != nil {
return -1, err
return -1, -1, err
}

req, err := http.NewRequestWithContext(ctx, "POST", s.metadata.URL, bytes.NewBuffer(body))
if err != nil {
return -1, err
return -1, -1, err
}

if (s.metadata.AuthType == "" || strings.EqualFold(s.metadata.AuthType, "Basic")) && s.metadata.Username != "" && s.metadata.Password != "" {
Expand All @@ -209,28 +207,28 @@ func (s *seleniumGridScaler) getSessionsQueueLength(ctx context.Context, logger

res, err := s.httpClient.Do(req)
if err != nil {
return -1, err
return -1, -1, err
}

if res.StatusCode != http.StatusOK {
msg := fmt.Sprintf("selenium grid returned %d", res.StatusCode)
return -1, errors.New(msg)
return -1, -1, errors.New(msg)
}

defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return -1, err
return -1, -1, err
}
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
newRequestNodes, onGoingSession, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
if err != nil {
return -1, err
return -1, -1, err
}
return v, nil
return newRequestNodes, onGoingSession, nil
}

func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int {
var matchingSlots int
func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int64 {
var matchingSlots int64
for _, stereotype := range stereotypes {
if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSlots += stereotype.Slots
Expand All @@ -239,8 +237,8 @@ func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability,
return matchingSlots
}

func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int {
var matchingSessions int
func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int64 {
var matchingSessions int64
for _, session := range sessions {
var capability = Capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
Expand Down Expand Up @@ -277,7 +275,7 @@ func checkCapabilitiesMatch(capability Capability, requestCapability Capability,
return browserNameMatches && browserVersionMatches && platformNameMatches
}

func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int {
func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int64) int64 {
for _, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
return reservedNode.SlotCount
Expand All @@ -286,7 +284,7 @@ func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availa
return availableSlots
}

func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int, maxSession int) []ReservedNodes {
func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int64, maxSession int64) []ReservedNodes {
for i, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
// Update remaining available slots for the reserved node
Expand All @@ -298,17 +296,15 @@ func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotC
return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount, MaxSession: maxSession})
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int, logger logr.Logger) (int64, error) {
// The returned count of the number of new Nodes will be scaled up
var count int64
func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int64, logger logr.Logger) (int64, int64, error) {
// Track number of available slots of existing Nodes in the Grid can be reserved for the matched requests
var availableSlots int
var availableSlots int64
// Track number of matched requests in the sessions queue will be served by this scaler
var queueSlots int
var queueSlots int64

var seleniumResponse = SeleniumResponse{}
if err := json.Unmarshal(b, &seleniumResponse); err != nil {
return 0, err
return 0, 0, err
}

var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests
Expand All @@ -317,6 +313,7 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
var reservedNodes []ReservedNodes
// Track list of new Nodes will be scaled up with number of available slots following scaler parameter `nodeMaxSessions`
var newRequestNodes []ReservedNodes
var onGoingSessions int64
for requestIndex, sessionQueueRequest := range sessionQueueRequests {
var isRequestMatched bool
var requestCapability = Capability{}
Expand All @@ -335,20 +332,22 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}

var isRequestReserved bool
var sumOfCurrentSessionsMatch int64
// Check if the matched request can be assigned to available slots of existing Nodes in the Grid
for _, node := range nodes {
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
sumOfCurrentSessionsMatch += currentSessionsMatch
// Check if node is UP and has available slots (maxSession > sessionCount)
if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 {
var stereotypes = Stereotypes{}
var availableSlotsMatch int
var availableSlotsMatch int64
if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil {
// Count available slots that match the request capability and scaler metadata
availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName)
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err))
}
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
// Count remaining available slots can be reserved for this request
var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount)
// Reserve one available slot for the request if available slots match is greater than current sessions match
Expand All @@ -360,6 +359,9 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}
}
}
if sumOfCurrentSessionsMatch > onGoingSessions {
onGoingSessions = sumOfCurrentSessionsMatch
}
// Check if the matched request can be assigned to available slots of new Nodes will be scaled up, since the scaler parameter `nodeMaxSessions` can be greater than 1
if !isRequestReserved {
for _, newRequestNode := range newRequestNodes {
Expand All @@ -376,11 +378,5 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}
}

if queueSlots > availableSlots {
count = int64(len(newRequestNodes))
} else {
count = 0
}

return count, nil
return int64(len(newRequestNodes)), onGoingSessions, nil
}
Loading

0 comments on commit 46e4550

Please sign in to comment.