Skip to content

Commit

Permalink
Revert add supported task types for agent service by default for task…
Browse files Browse the repository at this point in the history
… types (#4162)


Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Future Outlier <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2023
1 parent d9586b0 commit a9f5d24
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 15 deletions.
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
},
SupportedTaskTypes: []string{"task_type_1", "task_type_2"},
}

configSection = pluginsConfig.MustRegisterSubSection("agent-service", &defaultConfig)
Expand All @@ -65,6 +66,9 @@ type Config struct {

// Maps task types to their agents. {TaskType: AgentId}
AgentForTaskTypes map[string]string `json:"agentForTaskTypes" pflag:"-,"`

// SupportedTaskTypes is a list of task types that are supported by this plugin.
SupportedTaskTypes []string `json:"supportedTaskTypes" pflag:"-,Defines a list of task types that are supported by this plugin."`
}

type Agent struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestEndToEnd(t *testing.T) {
tr.OnRead(context.Background()).Return(nil, fmt.Errorf("read fail"))
tCtx.OnTaskReader().Return(tr)

agentPlugin := newAgentPlugin(SupportedTaskTypes{})
agentPlugin := newAgentPlugin()
pluginEntry := pluginmachinery.CreateRemotePlugin(agentPlugin)
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test3"))
assert.NoError(t, err)
Expand Down
12 changes: 4 additions & 8 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (

type GetClientFunc func(ctx context.Context, agent *Agent, connectionCache map[*Agent]*grpc.ClientConn) (service.AsyncAgentServiceClient, error)

type TaskType = string
type SupportedTaskTypes []TaskType
type Plugin struct {
metricScope promutils.Scope
cfg *Config
Expand Down Expand Up @@ -298,10 +296,8 @@ func getFinalContext(ctx context.Context, operation string, agent *Agent) (conte
return context.WithTimeout(ctx, timeout)
}

func newAgentPlugin(supportedTaskTypes SupportedTaskTypes) webapi.PluginEntry {
if len(supportedTaskTypes) == 0 {
supportedTaskTypes = SupportedTaskTypes{"default_supported_task_type"}
}
func newAgentPlugin() webapi.PluginEntry {
supportedTaskTypes := GetConfig().SupportedTaskTypes

return webapi.PluginEntry{
ID: "agent-service",
Expand All @@ -317,9 +313,9 @@ func newAgentPlugin(supportedTaskTypes SupportedTaskTypes) webapi.PluginEntry {
}
}

func RegisterAgentPlugin(supportedTaskTypes SupportedTaskTypes) {
func RegisterAgentPlugin() {
gob.Register(ResourceMetaWrapper{})
gob.Register(ResourceWrapper{})

pluginmachinery.PluginRegistry().RegisterRemotePlugin(newAgentPlugin(supportedTaskTypes))
pluginmachinery.PluginRegistry().RegisterRemotePlugin(newAgentPlugin())
}
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestPlugin(t *testing.T) {
})

t.Run("test newAgentPlugin", func(t *testing.T) {
p := newAgentPlugin(SupportedTaskTypes{})
p := newAgentPlugin()
assert.NotNil(t, p)
assert.Equal(t, "agent-service", p.ID)
assert.NotNil(t, p.PluginLoader)
Expand Down
4 changes: 1 addition & 3 deletions flytepropeller/pkg/controller/nodes/task/plugin_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
)

const AgentServiceKey = "agent-service"

var once sync.Once

func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface,
Expand All @@ -27,8 +25,8 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
}

// Register the GRPC plugin after the config is loaded
once.Do(func() { agent.RegisterAgentPlugin() })
pluginsConfigMeta, err := cfg.GetEnabledPlugins()
once.Do(func() { agent.RegisterAgentPlugin(pluginsConfigMeta.AllDefaultForTaskTypes[AgentServiceKey]) })

if err != nil {
return nil, nil, err
Expand Down
1 change: 0 additions & 1 deletion rsts/deployment/agents/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ Discover the process of setting up Agents for Flyte.

bigquery
mmcloud

2 changes: 1 addition & 1 deletion rsts/deployment/plugins/webapi/databricks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,4 @@ Wait for the upgrade to complete. You can check the status of the deployment pod
Make sure you enable `custom containers
<https://docs.databricks.com/administration-guide/clusters/container-services.html>`__
on your Databricks cluster before you trigger the workflow.
on your Databricks cluster before you trigger the workflow.

0 comments on commit a9f5d24

Please sign in to comment.