From d6aa8471cd91755cd24b1d335cc162d9c379d3a7 Mon Sep 17 00:00:00 2001 From: Lucas Bajolet Date: Fri, 16 Feb 2024 10:41:09 -0500 Subject: [PATCH] rpc: allow using both gob/protobuf for structs As follow-up to the introduction of the protobufs for HCLSpec, we introduce a new environment variable and code to use those structures, so we don't use gob for serialising HCLSpecs. This should make the plugins and packer able to transmit data over-the-wire without using gob for the most part (the communicators still use it, and will probably need some work to replace). --- .github/workflows/go-test.yml | 3 + Makefile | 2 +- buf.gen.yaml | 7 +++ plugin/set.go | 33 +++++++++++ rpc/client.go | 42 ++++++++++++++ rpc/common.go | 101 +++++++++++++++++++++++++++++++--- rpc/datasource.go | 81 ++++++++++++++++++++++----- rpc/server.go | 18 ++++++ 8 files changed, 264 insertions(+), 23 deletions(-) create mode 100644 buf.gen.yaml diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml index 08188e34d..541941d66 100644 --- a/.github/workflows/go-test.yml +++ b/.github/workflows/go-test.yml @@ -41,6 +41,9 @@ jobs: run: | mkdir -p ${{ env.TEST_RESULTS_PATH }}/packer-plugin-sdk + - name: Install buf + uses: bufbuild/buf-setup-action@v1.33.0 + - name: Run gofmt run: | make fmt-check diff --git a/Makefile b/Makefile index 47ca7ee29..030088a43 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ generate: install-gen-deps ## Generate dynamically generated code @find ./ -type f | xargs grep -l '^// Code generated' | xargs rm -f PROJECT_ROOT="$(CURDIR)" go generate ./... go fmt bootcommand/boot_command.go -# go run ./cmd/generate-fixer-deprecations + buf generate generate-check: generate ## Check go code generation is on par @echo "==> Checking that auto-generated code is not changed..." diff --git a/buf.gen.yaml b/buf.gen.yaml new file mode 100644 index 000000000..13dca9e9a --- /dev/null +++ b/buf.gen.yaml @@ -0,0 +1,7 @@ +version: v2 +plugins: +- remote: buf.build/protocolbuffers/go:v1.28.1 + out: rpc + opt: paths=source_relative +inputs: +- proto_file: rpc/hcl_spec.proto diff --git a/plugin/set.go b/plugin/set.go index 68e6650bc..198e88a44 100644 --- a/plugin/set.go +++ b/plugin/set.go @@ -33,6 +33,7 @@ type Set struct { version string sdkVersion string apiVersion string + useProto bool Builders map[string]packersdk.Builder PostProcessors map[string]packersdk.PostProcessor Provisioners map[string]packersdk.Provisioner @@ -115,11 +116,42 @@ func (i *Set) Run() error { return i.RunCommand(args...) } +// parseProtobufFlag walks over the args to find if `--protobuf` is set. +// +// It then returns the args without it for the commands to process them. +func (i *Set) parseProtobufFlag(args ...string) []string { + protobufPos := -1 + for i, arg := range args { + if arg == "--protobuf" { + protobufPos = i + break + } + } + + if protobufPos == -1 { + return args + } + + i.useProto = true + + if protobufPos == 0 { + return args[1:] + } + + if protobufPos == len(args)-1 { + return args[:len(args)-1] + } + + return append(args[:protobufPos], args[protobufPos+1:]...) +} + func (i *Set) RunCommand(args ...string) error { if len(args) < 1 { return fmt.Errorf("needs at least one argument") } + args = i.parseProtobufFlag(args...) + switch args[0] { case "describe": return i.jsonDescribe(os.Stdout) @@ -139,6 +171,7 @@ func (i *Set) start(kind, name string) error { if err != nil { return err } + server.UseProto = i.useProto log.Printf("[TRACE] starting %s %s", kind, name) diff --git a/rpc/client.go b/rpc/client.go index 5c660c638..f4bba8cd4 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -21,6 +21,9 @@ type Client struct { mux *muxBroker client *rpc.Client closeMux bool + // UseProto makes it so that clients started from this will use + // protobuf/msgpack for serialisation instead of gob + UseProto bool } func NewClient(rwc io.ReadWriteCloser) (*Client, error) { @@ -76,6 +79,13 @@ func (c *Client) Artifact() packer.Artifact { commonClient: commonClient{ endpoint: DefaultArtifactEndpoint, client: c.client, + // Setting useProto to false is essentially a noop for + // this type of client since they don't exchange cty + // values, and there's no HCLSpec object tied to this. + // + // For documentation purposes though, we keep it visible + // in order to change this later if it becomes relevant. + useProto: false, }, } } @@ -86,6 +96,13 @@ func (c *Client) Build() packer.Build { endpoint: DefaultBuildEndpoint, client: c.client, mux: c.mux, + // Setting useProto to false is essentially a noop for + // this type of client since they don't exchange cty + // values, and there's no HCLSpec object tied to this. + // + // For documentation purposes though, we keep it visible + // in order to change this later if it becomes relevant. + useProto: false, }, } } @@ -96,6 +113,7 @@ func (c *Client) Builder() packer.Builder { endpoint: DefaultBuilderEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -106,6 +124,13 @@ func (c *Client) Communicator() packer.Communicator { endpoint: DefaultCommunicatorEndpoint, client: c.client, mux: c.mux, + // Setting useProto to false is essentially a noop for + // this type of client since they don't exchange cty + // values, and there's no HCLSpec object tied to this. + // + // For documentation purposes though, we keep it visible + // in order to change this later if it becomes relevant. + useProto: false, }, } } @@ -116,6 +141,13 @@ func (c *Client) Hook() packer.Hook { endpoint: DefaultHookEndpoint, client: c.client, mux: c.mux, + // Setting useProto to false is essentially a noop for + // this type of client since they don't exchange cty + // values, and there's no HCLSpec object tied to this. + // + // For documentation purposes though, we keep it visible + // in order to change this later if it becomes relevant. + useProto: false, }, } } @@ -126,6 +158,7 @@ func (c *Client) PostProcessor() packer.PostProcessor { endpoint: DefaultPostProcessorEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -136,6 +169,7 @@ func (c *Client) Provisioner() packer.Provisioner { endpoint: DefaultProvisionerEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -146,6 +180,7 @@ func (c *Client) Datasource() packer.Datasource { endpoint: DefaultDatasourceEndpoint, client: c.client, mux: c.mux, + useProto: c.UseProto, }, } } @@ -155,6 +190,13 @@ func (c *Client) Ui() packer.Ui { commonClient: commonClient{ endpoint: DefaultUiEndpoint, client: c.client, + // Setting useProto to false is essentially a noop for + // this type of client since they don't exchange cty + // values, and there's no HCLSpec object tied to this. + // + // For documentation purposes though, we keep it visible + // in order to change this later if it becomes relevant. + useProto: false, }, endpoint: DefaultUiEndpoint, } diff --git a/rpc/common.go b/rpc/common.go index 2a0dadc34..a97193b50 100644 --- a/rpc/common.go +++ b/rpc/common.go @@ -7,10 +7,13 @@ import ( "bytes" "encoding/gob" "fmt" + "log" "net/rpc" + "reflect" "github.com/hashicorp/hcl/v2/hcldec" "github.com/zclconf/go-cty/cty" + "google.golang.org/protobuf/proto" ) // commonClient allows to rpc call funcs that can be defined on the different @@ -22,6 +25,12 @@ type commonClient struct { endpoint string client *rpc.Client mux *muxBroker + + // useProto lets us determine whether or not we should use protobuf for serialising + // data over RPC instead of gob. + // + // This is controlled by Packer using the `--use-proto` flag on plugin commands. + useProto bool } type commonServer struct { @@ -30,6 +39,12 @@ type commonServer struct { selfConfigurable interface { ConfigSpec() hcldec.ObjectSpec } + + // useProto lets us determine whether or not we should use protobuf for serialising + // data over RPC instead of gob. + // + // This is controlled by Packer using the `--use-proto` flag on plugin commands. + useProto bool } type ConfigSpecResponse struct { @@ -48,21 +63,91 @@ func (p *commonClient) ConfigSpec() hcldec.ObjectSpec { panic(err.Error()) } - res := hcldec.ObjectSpec{} - err := gob.NewDecoder(bytes.NewReader(resp.ConfigSpec)).Decode(&res) + // Legacy: this will need to be removed when we discontinue gob-encoding + // + // This is required for backwards compatibility for now, but using + // gob to encode the spec objects will fail against the upstream cty + // library, since they removed support for it. + // + // This will be a breaking change, as older plugins won't be able to + // communicate with Packer any longer. + if !p.useProto { + log.Printf("[DEBUG] - common: receiving ConfigSpec as gob") + res := hcldec.ObjectSpec{} + err := gob.NewDecoder(bytes.NewReader(resp.ConfigSpec)).Decode(&res) + if err != nil { + panic(fmt.Errorf("failed to decode HCL spec from gob: %s", err)) + } + return res + } + + log.Printf("[DEBUG] - common: receiving ConfigSpec as protobuf") + spec, err := protobufToHCL2Spec(resp.ConfigSpec) if err != nil { - panic("ici:" + err.Error()) + panic(err) } - return res + + return spec } func (s *commonServer) ConfigSpec(_ interface{}, reply *ConfigSpecResponse) error { spec := s.selfConfigurable.ConfigSpec() - b := bytes.NewBuffer(nil) - err := gob.NewEncoder(b).Encode(spec) - reply.ConfigSpec = b.Bytes() - return err + if !s.useProto { + log.Printf("[DEBUG] - common: sending ConfigSpec as gob") + b := &bytes.Buffer{} + err := gob.NewEncoder(b).Encode(spec) + if err != nil { + return fmt.Errorf("failed to encode spec from gob: %s", err) + } + reply.ConfigSpec = b.Bytes() + + return nil + } + + log.Printf("[DEBUG] - common: sending ConfigSpec as protobuf") + rawBytes, err := hcl2SpecToProtobuf(spec) + if err != nil { + return fmt.Errorf("failed to encode HCL spec from protobuf: %s", err) + } + reply.ConfigSpec = rawBytes + + return nil +} + +// hcl2SpecToProtobuf converts a hcldec.ObjectSpec to a protobuf-serialised +// byte array so it can then be used to send to a Plugin/Packer. +func hcl2SpecToProtobuf(spec hcldec.ObjectSpec) ([]byte, error) { + ret, err := ToProto(spec) + if err != nil { + return nil, fmt.Errorf("failed to convert hcldec.Spec to hclspec.Spec: %s", err) + } + rawBytes, err := proto.Marshal(ret) + if err != nil { + return nil, fmt.Errorf("failed to serialise hclspec.Spec to protobuf: %s", err) + } + + return rawBytes, nil +} + +// protobufToHCL2Spec converts a protobuf-encoded spec to a usable hcldec.Spec. +func protobufToHCL2Spec(serData []byte) (hcldec.ObjectSpec, error) { + confSpec := &Spec{} + err := proto.Unmarshal(serData, confSpec) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal hclspec.Spec from raw protobuf: %q", err) + } + spec, err := confSpec.FromProto() + if err != nil { + return nil, fmt.Errorf("failed to decode HCL spec: %q", err) + } + + obj, ok := spec.(*hcldec.ObjectSpec) + if !ok { + return nil, fmt.Errorf("decoded HCL spec is not an object spec: %s", reflect.TypeOf(spec).String()) + } + + return *obj, nil } func init() { diff --git a/rpc/datasource.go b/rpc/datasource.go index 443eeee85..74b35ee0b 100644 --- a/rpc/datasource.go +++ b/rpc/datasource.go @@ -7,10 +7,12 @@ import ( "bytes" "encoding/gob" "fmt" + "log" "github.com/hashicorp/hcl/v2/hcldec" "github.com/hashicorp/packer-plugin-sdk/packer" "github.com/zclconf/go-cty/cty" + "github.com/zclconf/go-cty/cty/msgpack" ) // An implementation of packer.Datasource where the data source is actually @@ -52,10 +54,21 @@ func (d *datasource) OutputSpec() hcldec.ObjectSpec { err := fmt.Errorf("Datasource.OutputSpec failed: %v", err) panic(err.Error()) } - res := hcldec.ObjectSpec{} - err := gob.NewDecoder(bytes.NewReader(resp.OutputSpec)).Decode(&res) + + if !d.useProto { + log.Printf("[DEBUG] - datasource: receiving OutputSpec as gob") + res := hcldec.ObjectSpec{} + err := gob.NewDecoder(bytes.NewReader(resp.OutputSpec)).Decode(&res) + if err != nil { + panic(fmt.Sprintf("datasource: failed to deserialise HCL spec from gob: %s", err)) + } + return res + } + + log.Printf("[DEBUG] - datasource: receiving OutputSpec as gob") + res, err := protobufToHCL2Spec(resp.OutputSpec) if err != nil { - panic("ici:" + err.Error()) + panic(fmt.Sprintf("datasource: failed to deserialise HCL spec from protobuf: %s", err)) } return res } @@ -66,20 +79,35 @@ type ExecuteResponse struct { } func (d *datasource) Execute() (cty.Value, error) { - res := new(cty.Value) resp := new(ExecuteResponse) if err := d.client.Call(d.endpoint+".Execute", new(interface{}), resp); err != nil { err := fmt.Errorf("Datasource.Execute failed: %v", err) - return *res, err + return cty.NilVal, err + } + + if !d.useProto { + log.Printf("[DEBUG] - datasource: receiving Execute as gob") + res := cty.Value{} + err := gob.NewDecoder(bytes.NewReader(resp.Value)).Decode(&res) + if err != nil { + return res, fmt.Errorf("failed to unmarshal cty.Value from gob blob: %s", err) + } + if resp.Error != nil { + err = resp.Error + } + return res, err } - err := gob.NewDecoder(bytes.NewReader(resp.Value)).Decode(&res) + + log.Printf("[DEBUG] - datasource: receiving Execute as msgpack") + res, err := msgpack.Unmarshal(resp.Value, cty.DynamicPseudoType) if err != nil { - return *res, err + return cty.NilVal, fmt.Errorf("failed to unmarshal cty.Value from msgpack blob: %s", err) } + if resp.Error != nil { err = resp.Error } - return *res, err + return res, err } // DatasourceServer wraps a packer.Datasource implementation and makes it @@ -103,18 +131,43 @@ func (d *DatasourceServer) Configure(args *DatasourceConfigureArgs, reply *Datas func (d *DatasourceServer) OutputSpec(args *DatasourceConfigureArgs, reply *OutputSpecResponse) error { spec := d.d.OutputSpec() - b := bytes.NewBuffer(nil) - err := gob.NewEncoder(b).Encode(spec) - reply.OutputSpec = b.Bytes() + + if !d.useProto { + log.Printf("[DEBUG] - datasource: sending OutputSpec as gob") + b := &bytes.Buffer{} + err := gob.NewEncoder(b).Encode(spec) + reply.OutputSpec = b.Bytes() + return err + } + + log.Printf("[DEBUG] - datasource: sending OutputSpec as protobuf") + ret, err := hcl2SpecToProtobuf(spec) + if err != nil { + return err + } + reply.OutputSpec = ret + return err } func (d *DatasourceServer) Execute(args *interface{}, reply *ExecuteResponse) error { spec, err := d.d.Execute() reply.Error = NewBasicError(err) - b := bytes.NewBuffer(nil) - err = gob.NewEncoder(b).Encode(spec) - reply.Value = b.Bytes() + + if !d.useProto { + log.Printf("[DEBUG] - datasource: sending Execute as gob") + b := &bytes.Buffer{} + err = gob.NewEncoder(b).Encode(spec) + reply.Value = b.Bytes() + if reply.Error != nil { + err = reply.Error + } + return err + } + + log.Printf("[DEBUG] - datasource: sending Execute as msgpack") + raw, err := msgpack.Marshal(spec, cty.DynamicPseudoType) + reply.Value = raw if reply.Error != nil { err = reply.Error } diff --git a/rpc/server.go b/rpc/server.go index 759f0594f..d3bd11ed2 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -35,6 +35,13 @@ type PluginServer struct { streamId uint32 server *rpc.Server closeMux bool + // UseProto forces the server to use protobuf/msgpack for serialization + // instead of gob. + // Setting UseProto on a server endpoint that only supports gob is + // treated as a noop. + // This field is set by the plugin `Set` type for plugins who support + // protocol version v2. + UseProto bool } // NewServer returns a new Packer RPC server. @@ -85,6 +92,7 @@ func (s *PluginServer) RegisterBuilder(b packer.Builder) error { commonServer: commonServer{ selfConfigurable: b, mux: s.mux, + useProto: s.UseProto, }, builder: b, }) @@ -95,6 +103,13 @@ func (s *PluginServer) RegisterCommunicator(c packer.Communicator) error { c: c, commonServer: commonServer{ mux: s.mux, + // Setting useProto to false is essentially a noop for + // this type of server since they don't exchange cty + // values, and there's no HCLSpec object tied to this. + // + // For documentation purposes though, we keep it visible + // in order to change this later if it becomes relevant. + useProto: false, }, }) } @@ -111,6 +126,7 @@ func (s *PluginServer) RegisterPostProcessor(p packer.PostProcessor) error { commonServer: commonServer{ selfConfigurable: p, mux: s.mux, + useProto: s.UseProto, }, p: p, }) @@ -121,6 +137,7 @@ func (s *PluginServer) RegisterProvisioner(p packer.Provisioner) error { commonServer: commonServer{ selfConfigurable: p, mux: s.mux, + useProto: s.UseProto, }, p: p, }) @@ -131,6 +148,7 @@ func (s *PluginServer) RegisterDatasource(d packer.Datasource) error { commonServer: commonServer{ selfConfigurable: d, mux: s.mux, + useProto: s.UseProto, }, d: d, })