From 3fb1ae60a9fa54b1dbbd1c0652904a332d680833 Mon Sep 17 00:00:00 2001 From: Lucas Bajolet Date: Fri, 14 Jun 2024 13:59:16 -0400 Subject: [PATCH] rpc: move protobuf definitions to common cli/srv Instead of a global variable to define if the components of a plugin/packer should use protobuf/msgpack instead of gob, we define variables that we add to the commonClient and commonServer structures. --- plugin/server.go | 4 ++-- plugin/set.go | 10 +++++----- rpc/client.go | 22 ++++++++++++++++++++++ rpc/client_test.go | 2 +- rpc/common.go | 22 ++++++++++++++-------- rpc/datasource.go | 8 ++++---- rpc/server.go | 13 +++++++++++-- 7 files changed, 59 insertions(+), 22 deletions(-) diff --git a/plugin/server.go b/plugin/server.go index 5c657efab..2f068eff1 100644 --- a/plugin/server.go +++ b/plugin/server.go @@ -62,7 +62,7 @@ var ErrManuallyStartedPlugin = errors.New( // Server waits for a connection to this plugin and returns a Packer // RPC server that you can use to register components and serve them. -func Server() (*packrpc.PluginServer, error) { +func Server(useProto bool) (*packrpc.PluginServer, error) { if os.Getenv(MagicCookieKey) != MagicCookieValue { return nil, ErrManuallyStartedPlugin } @@ -110,7 +110,7 @@ func Server() (*packrpc.PluginServer, error) { // Serve a single connection log.Println("Serving a plugin connection...") - return packrpc.NewServer(conn) + return packrpc.NewServer(conn, useProto) } func serverListener() (net.Listener, error) { diff --git a/plugin/set.go b/plugin/set.go index efb61cca3..fb63ca147 100644 --- a/plugin/set.go +++ b/plugin/set.go @@ -12,7 +12,6 @@ import ( "sort" packersdk "github.com/hashicorp/packer-plugin-sdk/packer" - "github.com/hashicorp/packer-plugin-sdk/rpc" pluginVersion "github.com/hashicorp/packer-plugin-sdk/version" ) @@ -34,6 +33,7 @@ type Set struct { version string sdkVersion string apiVersion string + useProto bool Builders map[string]packersdk.Builder PostProcessors map[string]packersdk.PostProcessor Provisioners map[string]packersdk.Provisioner @@ -116,7 +116,7 @@ func (i *Set) Run() error { return i.RunCommand(args...) } -func useProtobuf(args ...string) []string { +func (i *Set) useProtobuf(args ...string) []string { protobufPos := -1 for i, arg := range args { if arg == "--protobuf" { @@ -129,7 +129,7 @@ func useProtobuf(args ...string) []string { return args } - rpc.UseProto = true + i.useProto = true if protobufPos == 0 { return args[1:] @@ -147,7 +147,7 @@ func (i *Set) RunCommand(args ...string) error { return fmt.Errorf("needs at least one argument") } - args = useProtobuf(args...) + args = i.useProtobuf(args...) switch args[0] { case "describe": @@ -164,7 +164,7 @@ func (i *Set) RunCommand(args ...string) error { } func (i *Set) start(kind, name string) error { - server, err := Server() + server, err := Server(i.useProto) if err != nil { return err } diff --git a/rpc/client.go b/rpc/client.go index 5c660c638..9f233d82d 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -21,6 +21,9 @@ type Client struct { mux *muxBroker client *rpc.Client closeMux bool + // UseProto makes it so that clients started from this will use + // protobuf/msgpack for serialisation instead of gob + UseProto bool } func NewClient(rwc io.ReadWriteCloser) (*Client, error) { @@ -76,6 +79,9 @@ func (c *Client) Artifact() packer.Artifact { commonClient: commonClient{ endpoint: DefaultArtifactEndpoint, client: c.client, + // force to false as the server doesn't embed a commonServer + // and thus is only capable of using gob. + useProto: false, }, } } @@ -86,6 +92,9 @@ func (c *Client) Build() packer.Build { endpoint: DefaultBuildEndpoint, client: c.client, mux: c.mux, + // force to false as the server doesn't embed a commonServer + // and thus is only capable of using gob. + useProto: false, }, } } @@ -96,6 +105,7 @@ func (c *Client) Builder() packer.Builder { endpoint: DefaultBuilderEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -106,6 +116,9 @@ func (c *Client) Communicator() packer.Communicator { endpoint: DefaultCommunicatorEndpoint, client: c.client, mux: c.mux, + // force to false as the server doesn't embed a commonServer + // and thus is only capable of using gob. + useProto: false, }, } } @@ -116,6 +129,9 @@ func (c *Client) Hook() packer.Hook { endpoint: DefaultHookEndpoint, client: c.client, mux: c.mux, + // force to false as the server doesn't embed a commonServer + // and thus is only capable of using gob. + useProto: false, }, } } @@ -126,6 +142,7 @@ func (c *Client) PostProcessor() packer.PostProcessor { endpoint: DefaultPostProcessorEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -136,6 +153,7 @@ func (c *Client) Provisioner() packer.Provisioner { endpoint: DefaultProvisionerEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -146,6 +164,7 @@ func (c *Client) Datasource() packer.Datasource { endpoint: DefaultDatasourceEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -155,6 +174,9 @@ func (c *Client) Ui() packer.Ui { commonClient: commonClient{ endpoint: DefaultUiEndpoint, client: c.client, + // force to false as the server doesn't embed a commonServer + // and thus is only capable of using gob. + useProto: false, }, endpoint: DefaultUiEndpoint, } diff --git a/rpc/client_test.go b/rpc/client_test.go index 52e6f4acd..52eccd16b 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -43,7 +43,7 @@ func testConn(t *testing.T) (net.Conn, net.Conn) { func testClientServer(t *testing.T) (*Client, *PluginServer) { clientConn, serverConn := testConn(t) - server, err := NewServer(serverConn) + server, err := NewServer(serverConn, false) if err != nil { t.Fatalf("err: %v", err) } diff --git a/rpc/common.go b/rpc/common.go index 33bb3f0d6..a97193b50 100644 --- a/rpc/common.go +++ b/rpc/common.go @@ -16,12 +16,6 @@ import ( "google.golang.org/protobuf/proto" ) -// UseProto lets us determine whether or not we should use protobuf for serialising -// data over RPC instead of gob. -// -// This is controlled by Packer using the `--use-proto` flag on plugin commands. -var UseProto bool = false - // commonClient allows to rpc call funcs that can be defined on the different // build blocks of Packer. type commonClient struct { @@ -31,6 +25,12 @@ type commonClient struct { endpoint string client *rpc.Client mux *muxBroker + + // useProto lets us determine whether or not we should use protobuf for serialising + // data over RPC instead of gob. + // + // This is controlled by Packer using the `--use-proto` flag on plugin commands. + useProto bool } type commonServer struct { @@ -39,6 +39,12 @@ type commonServer struct { selfConfigurable interface { ConfigSpec() hcldec.ObjectSpec } + + // useProto lets us determine whether or not we should use protobuf for serialising + // data over RPC instead of gob. + // + // This is controlled by Packer using the `--use-proto` flag on plugin commands. + useProto bool } type ConfigSpecResponse struct { @@ -65,7 +71,7 @@ func (p *commonClient) ConfigSpec() hcldec.ObjectSpec { // // This will be a breaking change, as older plugins won't be able to // communicate with Packer any longer. - if !UseProto { + if !p.useProto { log.Printf("[DEBUG] - common: receiving ConfigSpec as gob") res := hcldec.ObjectSpec{} err := gob.NewDecoder(bytes.NewReader(resp.ConfigSpec)).Decode(&res) @@ -87,7 +93,7 @@ func (p *commonClient) ConfigSpec() hcldec.ObjectSpec { func (s *commonServer) ConfigSpec(_ interface{}, reply *ConfigSpecResponse) error { spec := s.selfConfigurable.ConfigSpec() - if !UseProto { + if !s.useProto { log.Printf("[DEBUG] - common: sending ConfigSpec as gob") b := &bytes.Buffer{} err := gob.NewEncoder(b).Encode(spec) diff --git a/rpc/datasource.go b/rpc/datasource.go index 66b877d75..74b35ee0b 100644 --- a/rpc/datasource.go +++ b/rpc/datasource.go @@ -55,7 +55,7 @@ func (d *datasource) OutputSpec() hcldec.ObjectSpec { panic(err.Error()) } - if !UseProto { + if !d.useProto { log.Printf("[DEBUG] - datasource: receiving OutputSpec as gob") res := hcldec.ObjectSpec{} err := gob.NewDecoder(bytes.NewReader(resp.OutputSpec)).Decode(&res) @@ -85,7 +85,7 @@ func (d *datasource) Execute() (cty.Value, error) { return cty.NilVal, err } - if !UseProto { + if !d.useProto { log.Printf("[DEBUG] - datasource: receiving Execute as gob") res := cty.Value{} err := gob.NewDecoder(bytes.NewReader(resp.Value)).Decode(&res) @@ -132,7 +132,7 @@ func (d *DatasourceServer) Configure(args *DatasourceConfigureArgs, reply *Datas func (d *DatasourceServer) OutputSpec(args *DatasourceConfigureArgs, reply *OutputSpecResponse) error { spec := d.d.OutputSpec() - if !UseProto { + if !d.useProto { log.Printf("[DEBUG] - datasource: sending OutputSpec as gob") b := &bytes.Buffer{} err := gob.NewEncoder(b).Encode(spec) @@ -154,7 +154,7 @@ func (d *DatasourceServer) Execute(args *interface{}, reply *ExecuteResponse) er spec, err := d.d.Execute() reply.Error = NewBasicError(err) - if !UseProto { + if !d.useProto { log.Printf("[DEBUG] - datasource: sending Execute as gob") b := &bytes.Buffer{} err = gob.NewEncoder(b).Encode(spec) diff --git a/rpc/server.go b/rpc/server.go index 759f0594f..4d2546676 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -35,16 +35,20 @@ type PluginServer struct { streamId uint32 server *rpc.Server closeMux bool + // useProto is set by the `Set` type, and determines in the servers + // spawned through this instance use protobuf/msgpack or gob + useProto bool } // NewServer returns a new Packer RPC server. -func NewServer(conn io.ReadWriteCloser) (*PluginServer, error) { +func NewServer(conn io.ReadWriteCloser, useProto bool) (*PluginServer, error) { mux, err := newMuxBrokerServer(conn) if err != nil { return nil, err } result := newServerWithMux(mux, 0) result.closeMux = true + result.useProto = useProto go mux.Run() return result, nil } @@ -85,6 +89,7 @@ func (s *PluginServer) RegisterBuilder(b packer.Builder) error { commonServer: commonServer{ selfConfigurable: b, mux: s.mux, + useProto: s.useProto, }, builder: b, }) @@ -94,7 +99,8 @@ func (s *PluginServer) RegisterCommunicator(c packer.Communicator) error { return s.server.RegisterName(DefaultCommunicatorEndpoint, &CommunicatorServer{ c: c, commonServer: commonServer{ - mux: s.mux, + mux: s.mux, + useProto: s.useProto, }, }) } @@ -111,6 +117,7 @@ func (s *PluginServer) RegisterPostProcessor(p packer.PostProcessor) error { commonServer: commonServer{ selfConfigurable: p, mux: s.mux, + useProto: s.useProto, }, p: p, }) @@ -121,6 +128,7 @@ func (s *PluginServer) RegisterProvisioner(p packer.Provisioner) error { commonServer: commonServer{ selfConfigurable: p, mux: s.mux, + useProto: s.useProto, }, p: p, }) @@ -131,6 +139,7 @@ func (s *PluginServer) RegisterDatasource(d packer.Datasource) error { commonServer: commonServer{ selfConfigurable: d, mux: s.mux, + useProto: s.useProto, }, d: d, })