Skip to content

Commit

Permalink
rpc: move protobuf definitions to common cli/srv
Browse files Browse the repository at this point in the history
Instead of a global variable to define if the components of a
plugin/packer should use protobuf/msgpack instead of gob, we define
variables that we add to the commonClient and commonServer structures.
  • Loading branch information
lbajolet-hashicorp committed Jun 18, 2024
1 parent 791cfd0 commit 3fb1ae6
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 22 deletions.
4 changes: 2 additions & 2 deletions plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var ErrManuallyStartedPlugin = errors.New(

// Server waits for a connection to this plugin and returns a Packer
// RPC server that you can use to register components and serve them.
func Server() (*packrpc.PluginServer, error) {
func Server(useProto bool) (*packrpc.PluginServer, error) {
if os.Getenv(MagicCookieKey) != MagicCookieValue {
return nil, ErrManuallyStartedPlugin
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func Server() (*packrpc.PluginServer, error) {

// Serve a single connection
log.Println("Serving a plugin connection...")
return packrpc.NewServer(conn)
return packrpc.NewServer(conn, useProto)
}

func serverListener() (net.Listener, error) {
Expand Down
10 changes: 5 additions & 5 deletions plugin/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sort"

packersdk "github.com/hashicorp/packer-plugin-sdk/packer"
"github.com/hashicorp/packer-plugin-sdk/rpc"
pluginVersion "github.com/hashicorp/packer-plugin-sdk/version"
)

Expand All @@ -34,6 +33,7 @@ type Set struct {
version string
sdkVersion string
apiVersion string
useProto bool
Builders map[string]packersdk.Builder
PostProcessors map[string]packersdk.PostProcessor
Provisioners map[string]packersdk.Provisioner
Expand Down Expand Up @@ -116,7 +116,7 @@ func (i *Set) Run() error {
return i.RunCommand(args...)
}

func useProtobuf(args ...string) []string {
func (i *Set) useProtobuf(args ...string) []string {
protobufPos := -1
for i, arg := range args {
if arg == "--protobuf" {
Expand All @@ -129,7 +129,7 @@ func useProtobuf(args ...string) []string {
return args
}

rpc.UseProto = true
i.useProto = true

if protobufPos == 0 {
return args[1:]
Expand All @@ -147,7 +147,7 @@ func (i *Set) RunCommand(args ...string) error {
return fmt.Errorf("needs at least one argument")
}

args = useProtobuf(args...)
args = i.useProtobuf(args...)

switch args[0] {
case "describe":
Expand All @@ -164,7 +164,7 @@ func (i *Set) RunCommand(args ...string) error {
}

func (i *Set) start(kind, name string) error {
server, err := Server()
server, err := Server(i.useProto)
if err != nil {
return err
}
Expand Down
22 changes: 22 additions & 0 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type Client struct {
mux *muxBroker
client *rpc.Client
closeMux bool
// UseProto makes it so that clients started from this will use
// protobuf/msgpack for serialisation instead of gob
UseProto bool
}

func NewClient(rwc io.ReadWriteCloser) (*Client, error) {
Expand Down Expand Up @@ -76,6 +79,9 @@ func (c *Client) Artifact() packer.Artifact {
commonClient: commonClient{
endpoint: DefaultArtifactEndpoint,
client: c.client,
// force to false as the server doesn't embed a commonServer
// and thus is only capable of using gob.
useProto: false,
},
}
}
Expand All @@ -86,6 +92,9 @@ func (c *Client) Build() packer.Build {
endpoint: DefaultBuildEndpoint,
client: c.client,
mux: c.mux,
// force to false as the server doesn't embed a commonServer
// and thus is only capable of using gob.
useProto: false,
},
}
}
Expand All @@ -96,6 +105,7 @@ func (c *Client) Builder() packer.Builder {
endpoint: DefaultBuilderEndpoint,
client: c.client,
mux: c.mux,
useProto: c.UseProto,
},
}
}
Expand All @@ -106,6 +116,9 @@ func (c *Client) Communicator() packer.Communicator {
endpoint: DefaultCommunicatorEndpoint,
client: c.client,
mux: c.mux,
// force to false as the server doesn't embed a commonServer
// and thus is only capable of using gob.
useProto: false,
},
}
}
Expand All @@ -116,6 +129,9 @@ func (c *Client) Hook() packer.Hook {
endpoint: DefaultHookEndpoint,
client: c.client,
mux: c.mux,
// force to false as the server doesn't embed a commonServer
// and thus is only capable of using gob.
useProto: false,
},
}
}
Expand All @@ -126,6 +142,7 @@ func (c *Client) PostProcessor() packer.PostProcessor {
endpoint: DefaultPostProcessorEndpoint,
client: c.client,
mux: c.mux,
useProto: c.UseProto,
},
}
}
Expand All @@ -136,6 +153,7 @@ func (c *Client) Provisioner() packer.Provisioner {
endpoint: DefaultProvisionerEndpoint,
client: c.client,
mux: c.mux,
useProto: c.UseProto,
},
}
}
Expand All @@ -146,6 +164,7 @@ func (c *Client) Datasource() packer.Datasource {
endpoint: DefaultDatasourceEndpoint,
client: c.client,
mux: c.mux,
useProto: c.UseProto,
},
}
}
Expand All @@ -155,6 +174,9 @@ func (c *Client) Ui() packer.Ui {
commonClient: commonClient{
endpoint: DefaultUiEndpoint,
client: c.client,
// force to false as the server doesn't embed a commonServer
// and thus is only capable of using gob.
useProto: false,
},
endpoint: DefaultUiEndpoint,
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func testConn(t *testing.T) (net.Conn, net.Conn) {
func testClientServer(t *testing.T) (*Client, *PluginServer) {
clientConn, serverConn := testConn(t)

server, err := NewServer(serverConn)
server, err := NewServer(serverConn, false)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
22 changes: 14 additions & 8 deletions rpc/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ import (
"google.golang.org/protobuf/proto"
)

// UseProto lets us determine whether or not we should use protobuf for serialising
// data over RPC instead of gob.
//
// This is controlled by Packer using the `--use-proto` flag on plugin commands.
var UseProto bool = false

// commonClient allows to rpc call funcs that can be defined on the different
// build blocks of Packer.
type commonClient struct {
Expand All @@ -31,6 +25,12 @@ type commonClient struct {
endpoint string
client *rpc.Client
mux *muxBroker

// useProto lets us determine whether or not we should use protobuf for serialising
// data over RPC instead of gob.
//
// This is controlled by Packer using the `--use-proto` flag on plugin commands.
useProto bool
}

type commonServer struct {
Expand All @@ -39,6 +39,12 @@ type commonServer struct {
selfConfigurable interface {
ConfigSpec() hcldec.ObjectSpec
}

// useProto lets us determine whether or not we should use protobuf for serialising
// data over RPC instead of gob.
//
// This is controlled by Packer using the `--use-proto` flag on plugin commands.
useProto bool
}

type ConfigSpecResponse struct {
Expand All @@ -65,7 +71,7 @@ func (p *commonClient) ConfigSpec() hcldec.ObjectSpec {
//
// This will be a breaking change, as older plugins won't be able to
// communicate with Packer any longer.
if !UseProto {
if !p.useProto {
log.Printf("[DEBUG] - common: receiving ConfigSpec as gob")
res := hcldec.ObjectSpec{}
err := gob.NewDecoder(bytes.NewReader(resp.ConfigSpec)).Decode(&res)
Expand All @@ -87,7 +93,7 @@ func (p *commonClient) ConfigSpec() hcldec.ObjectSpec {
func (s *commonServer) ConfigSpec(_ interface{}, reply *ConfigSpecResponse) error {
spec := s.selfConfigurable.ConfigSpec()

if !UseProto {
if !s.useProto {
log.Printf("[DEBUG] - common: sending ConfigSpec as gob")
b := &bytes.Buffer{}
err := gob.NewEncoder(b).Encode(spec)
Expand Down
8 changes: 4 additions & 4 deletions rpc/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (d *datasource) OutputSpec() hcldec.ObjectSpec {
panic(err.Error())
}

if !UseProto {
if !d.useProto {
log.Printf("[DEBUG] - datasource: receiving OutputSpec as gob")
res := hcldec.ObjectSpec{}
err := gob.NewDecoder(bytes.NewReader(resp.OutputSpec)).Decode(&res)
Expand Down Expand Up @@ -85,7 +85,7 @@ func (d *datasource) Execute() (cty.Value, error) {
return cty.NilVal, err
}

if !UseProto {
if !d.useProto {
log.Printf("[DEBUG] - datasource: receiving Execute as gob")
res := cty.Value{}
err := gob.NewDecoder(bytes.NewReader(resp.Value)).Decode(&res)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (d *DatasourceServer) Configure(args *DatasourceConfigureArgs, reply *Datas
func (d *DatasourceServer) OutputSpec(args *DatasourceConfigureArgs, reply *OutputSpecResponse) error {
spec := d.d.OutputSpec()

if !UseProto {
if !d.useProto {
log.Printf("[DEBUG] - datasource: sending OutputSpec as gob")
b := &bytes.Buffer{}
err := gob.NewEncoder(b).Encode(spec)
Expand All @@ -154,7 +154,7 @@ func (d *DatasourceServer) Execute(args *interface{}, reply *ExecuteResponse) er
spec, err := d.d.Execute()
reply.Error = NewBasicError(err)

if !UseProto {
if !d.useProto {
log.Printf("[DEBUG] - datasource: sending Execute as gob")
b := &bytes.Buffer{}
err = gob.NewEncoder(b).Encode(spec)
Expand Down
13 changes: 11 additions & 2 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,20 @@ type PluginServer struct {
streamId uint32
server *rpc.Server
closeMux bool
// useProto is set by the `Set` type, and determines in the servers
// spawned through this instance use protobuf/msgpack or gob
useProto bool
}

// NewServer returns a new Packer RPC server.
func NewServer(conn io.ReadWriteCloser) (*PluginServer, error) {
func NewServer(conn io.ReadWriteCloser, useProto bool) (*PluginServer, error) {
mux, err := newMuxBrokerServer(conn)
if err != nil {
return nil, err
}
result := newServerWithMux(mux, 0)
result.closeMux = true
result.useProto = useProto
go mux.Run()
return result, nil
}
Expand Down Expand Up @@ -85,6 +89,7 @@ func (s *PluginServer) RegisterBuilder(b packer.Builder) error {
commonServer: commonServer{
selfConfigurable: b,
mux: s.mux,
useProto: s.useProto,
},
builder: b,
})
Expand All @@ -94,7 +99,8 @@ func (s *PluginServer) RegisterCommunicator(c packer.Communicator) error {
return s.server.RegisterName(DefaultCommunicatorEndpoint, &CommunicatorServer{
c: c,
commonServer: commonServer{
mux: s.mux,
mux: s.mux,
useProto: s.useProto,
},
})
}
Expand All @@ -111,6 +117,7 @@ func (s *PluginServer) RegisterPostProcessor(p packer.PostProcessor) error {
commonServer: commonServer{
selfConfigurable: p,
mux: s.mux,
useProto: s.useProto,
},
p: p,
})
Expand All @@ -121,6 +128,7 @@ func (s *PluginServer) RegisterProvisioner(p packer.Provisioner) error {
commonServer: commonServer{
selfConfigurable: p,
mux: s.mux,
useProto: s.useProto,
},
p: p,
})
Expand All @@ -131,6 +139,7 @@ func (s *PluginServer) RegisterDatasource(d packer.Datasource) error {
commonServer: commonServer{
selfConfigurable: d,
mux: s.mux,
useProto: s.useProto,
},
d: d,
})
Expand Down

0 comments on commit 3fb1ae6

Please sign in to comment.