Skip to content

Commit

Permalink
Find and Invoke Slurm APIs using reflect to avoid having version spec…
Browse files Browse the repository at this point in the history
…ific code
  • Loading branch information
yuva29 committed Oct 10, 2024
1 parent d62000f commit 3380550
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 75 deletions.
13 changes: 13 additions & 0 deletions redfish-exporter/slurm/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package slurm

// Method names used in the Slurm API
const (
methodPostNode = "PostNode"
methodPing = "Ping"
)

// withExecuteSuffix appends "Execute" to the given method name.
// For example, "PostNode" becomes "PostNodeExecute".
func withExecuteSuffix(method string) string {
return method + "Execute"
}
205 changes: 130 additions & 75 deletions redfish-exporter/slurm/slurm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"log"
"math"
"net/http"
"reflect"
"regexp"
"strings"
"time"

"github.com/nod-ai/ADA/redfish-exporter/api/generated/slurmrestdapi"
Expand Down Expand Up @@ -50,136 +53,188 @@ type SlurmServerConfig struct {

type Client struct {
apiClient *slurmrestdapi.APIClient // slurm URL to client mapping
helper map[string]reflect.Method
}

var apiCl *Client // singleton client
var singletonAPICl *Client // singleton client

func NewClient(slurmControlNode, slurmToken string) (*Client, error) {
c := &Client{helper: map[string]reflect.Method{}}
slConfig := &SlurmServerConfig{
URL: slurmControlNode,
Username: defaultSlurmUsername,
BearerToken: slurmToken,
}
cl := createRestClient(slConfig)
c := &Client{apiClient: cl}

// populate the methods required for ping and node update operations
t := reflect.TypeOf(cl.SlurmAPI)
postNodeRe := regexp.MustCompile(fmt.Sprintf(`%s$`, methodPostNode))
pingRe := regexp.MustCompile(fmt.Sprintf(`%s$`, methodPing))
for i := 0; i < t.NumMethod(); i++ {
method := t.Method(i)
if postNodeRe.MatchString(method.Name) {
postNodeExecuteMethod, found := t.MethodByName(withExecuteSuffix(method.Name))
if !found {
return nil, fmt.Errorf("could not find PostNodeExecute method from Slurm REST APIs")
}

if _, found := c.helper[methodPostNode]; !found {
c.helper[methodPostNode] = method
c.helper[withExecuteSuffix(methodPostNode)] = postNodeExecuteMethod
}
} else if pingRe.MatchString(method.Name) {
pingExecuteMethod, found := t.MethodByName(withExecuteSuffix(method.Name))
if !found {
return nil, fmt.Errorf("could not find PingExecute method from Slurm REST APIs")
}

if _, found := c.helper[methodPing]; !found {
c.helper[methodPing] = method
c.helper[withExecuteSuffix(methodPing)] = pingExecuteMethod
}
}
}

c.apiClient = cl

log.Printf("[slurm] created slurm client for node: %v\n", slurmControlNode)
err := c.getConnectionStatus()
if err != nil {
log.Printf("[slurm] error in getting the connection status of the slurm node: %v, err: %+v\n", slurmControlNode, err)
}

apiCl = c
singletonAPICl = c
return c, err
}

func GetClient() *Client {
return apiCl
return singletonAPICl
}

func (c *Client) ResumeNode(nodeName string) error {
apiCall := func() (interface{}, *http.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
jreq := c.apiClient.SlurmAPI.SlurmV0040PostNode(ctx, nodeName)
req := slurmrestdapi.V0040UpdateNodeMsg{State: []string{"resume"}}
jreq = jreq.V0040UpdateNodeMsg(req)
res, resp, err := c.apiClient.SlurmAPI.SlurmV0040PostNodeExecute(jreq)
cancel()
if err != nil {
return res, resp, err
} else if resp.StatusCode != 200 {
return res, resp, fmt.Errorf("invalid status code: %v", resp.StatusCode)
}
return res, resp, nil
}

_, resp, err := CallWithRetry(apiCall, maxRetries, baseDelay)
if err != nil {
return err
}
defer resp.Body.Close()

return nil
return c.updateNodeState(nodeName, "resume")
}

func (c *Client) DrainNode(nodeName string) error {
return c.updateNodeState(nodeName, "drain")
}

func (c *Client) getConnectionStatus() error {
apiCall := func() (interface{}, *http.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
jreq := c.apiClient.SlurmAPI.SlurmV0040PostNode(ctx, nodeName)
req := slurmrestdapi.V0040UpdateNodeMsg{State: []string{"drain"}}
jreq = jreq.V0040UpdateNodeMsg(req)
res, resp, err := c.apiClient.SlurmAPI.SlurmV0040PostNodeExecute(jreq)
cancel()
if err != nil {
return res, resp, err
} else if resp.StatusCode != 200 {
return res, resp, fmt.Errorf("invalid status code: %v", resp.StatusCode)
defer cancel()

// Step 1: Call the Ping method using reflection
pingVals := c.helper["Ping"].Func.Call([]reflect.Value{
reflect.ValueOf(c.apiClient.SlurmAPI),
reflect.ValueOf(ctx),
})

// Check if the call produced results
if len(pingVals) == 0 {
return nil, nil, fmt.Errorf("Ping call returned no values")
}
return res, resp, nil

// Step 2: Execute the Ping method with the request
pingResp := c.helper["PingExecute"].Func.Call([]reflect.Value{
reflect.ValueOf(c.apiClient.SlurmAPI),
pingVals[0],
})

// Extract and return the response and error
resp, _ := pingResp[1].Interface().(*http.Response)
err, _ := pingResp[2].Interface().(error)
return pingResp[0].Interface(), resp, err
}

_, resp, err := CallWithRetry(apiCall, maxRetries, baseDelay)
if err != nil {
return err
return nil
}
defer resp.Body.Close()

log.Printf("[slurm] ping success: %v\n", resp.StatusCode)
return nil
}

func (c *Client) GetNodes() ([]string, error) {
var nodes []string
func (c *Client) updateNodeState(nodeName, state string) error {
apiCall := func() (interface{}, *http.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
jreq := c.apiClient.SlurmAPI.SlurmV0039GetNodes(ctx)
res, resp, err := c.apiClient.SlurmAPI.SlurmV0039GetNodesExecute(jreq)
cancel()
if err != nil {
return res, resp, err
} else if resp.StatusCode != 200 {
return res, resp, fmt.Errorf("invalid status code: %v", resp.StatusCode)
defer cancel()

// Step 1: Call the PostNode method using reflection
postNodeVals := c.helper["PostNode"].Func.Call([]reflect.Value{
reflect.ValueOf(c.apiClient.SlurmAPI),
reflect.ValueOf(ctx),
reflect.ValueOf(nodeName),
})

// Check if the call produced results
if len(postNodeVals) == 0 {
return nil, nil, fmt.Errorf("PostNode call returned no values")
}
return res, resp, nil
}

res, resp, err := CallWithRetry(apiCall, maxRetries, baseDelay)
if err != nil {
return nodes, err
}
defer resp.Body.Close()

log.Printf("[slurm] get nodes: %+v\n", nodes)
temp := res.(*slurmrestdapi.V0039NodesResponse)
for _, node := range temp.GetNodes() {
nodes = append(nodes, *node.Name)
}
return nodes, nil
}

func (c *Client) getConnectionStatus() error {
apiCall := func() (interface{}, *http.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
jreq := c.apiClient.SlurmAPI.SlurmV0039Ping(ctx)
res, resp, err := c.apiClient.SlurmAPI.SlurmV0039PingExecute(jreq)
cancel()
if err != nil {
return res, resp, err
} else if resp.StatusCode != 200 {
return res, resp, fmt.Errorf("invalid status code: %v", resp.StatusCode)
// Step 2: Find and call the UpdateNodeMsg method on the request object
newInstance := reflect.New(postNodeVals[0].Type()).Elem()
instanceType := newInstance.Type()

for i := 0; i < instanceType.NumMethod(); i++ {
method := instanceType.Method(i)
if strings.Contains(method.Name, "UpdateNodeMsg") {
// Create a new UpdateNodeMsg request
updateNodeMsgReq := createUpdateNodeMsgRequest(method.Type)
if updateNodeMsgReq.IsValid() {
updateNodeMsgReq.FieldByName("State").Set(reflect.ValueOf([]string{state}))
}

// Step 3: Call UpdateNodeMsg with the request
updatedNodeVals := method.Func.Call([]reflect.Value{postNodeVals[0], updateNodeMsgReq})
if len(updatedNodeVals) == 0 {
return nil, nil, fmt.Errorf("UpdateNodeMsg call returned no values")
}

// Step 4: Execute the PostNode method with the updated request
postNodeResp := c.helper["PostNodeExecute"].Func.Call([]reflect.Value{
reflect.ValueOf(c.apiClient.SlurmAPI),
updatedNodeVals[0],
})

if len(postNodeResp) < 3 {
return nil, nil, fmt.Errorf("PostNodeExecute call returned insufficient values")
}

// Extract and return the response and error
resp, _ := postNodeResp[1].Interface().(*http.Response)
err, _ := postNodeResp[2].Interface().(error)
return postNodeResp[0].Interface(), resp, err
}
}
return res, resp, nil

return nil, nil, fmt.Errorf("no suitable UpdateNodeMsg method found")
}

// Retry the API call
_, resp, err := CallWithRetry(apiCall, maxRetries, baseDelay)
if err != nil {
return nil
return err
}
defer resp.Body.Close()

log.Printf("[slurm] ping success: %v\n", resp.StatusCode)
return nil
}

// Helper function to create an UpdateNodeMsg request using reflection
func createUpdateNodeMsgRequest(methodType reflect.Type) reflect.Value {
for j := 1; j < methodType.NumIn(); j++ { // Start from 1 to skip the receiver
paramType := methodType.In(j)
if strings.Contains(paramType.Name(), "UpdateNodeMsg") {
return reflect.New(paramType).Elem()
}
}
return reflect.Value{}
}

func createRestClient(c *SlurmServerConfig) *slurmrestdapi.APIClient {
cfg := slurmrestdapi.NewConfiguration()
cfg.HTTPClient = &http.Client{Timeout: slurmRestClientTimeout}
Expand Down

0 comments on commit 3380550

Please sign in to comment.