Skip to content

Commit

Permalink
feat(go): create a not private version for go
Browse files Browse the repository at this point in the history
Creates another version of go implementation of kafka handling, which is not a private repository
  • Loading branch information
Janek Stoeck authored and c0olix committed Sep 4, 2022
1 parent 9186a5d commit cdb6373
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 16 deletions.
16 changes: 8 additions & 8 deletions cmd/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ var generateCmd = &cobra.Command{
logger.Fatal("Unable to get packageFlag flag: empty package name found: \"\"")
}

if langFlag == "go" && flavorFlag == "mosaic" {
generateMosaicKafkaGoCode(inputFlag, outputFlag, createDirFlag, packageFlag)
} else if langFlag == "java" && flavorFlag == "mosaic" {
generateMosaicKafkaJavaCode(inputFlag, outputFlag, createDirFlag, packageFlag)
if langFlag == "go" {
generateMosaicKafkaGoCode(inputFlag, outputFlag, createDirFlag, packageFlag, flavorFlag)
} else if langFlag == "java" {
generateMosaicKafkaJavaCode(inputFlag, outputFlag, createDirFlag, packageFlag, flavorFlag)
} else {
logger.Fatalf("unsupported flags given")
}
Expand All @@ -67,13 +67,13 @@ var generateCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(generateCmd)
generateCmd.Flags().StringP("lang", "l", "go", "What kind of code should be generated?")
generateCmd.Flags().StringP("flavor", "f", "mosaic", "Which flavor should be used?")
generateCmd.Flags().StringP("flavor", "f", "", "Which (if) flavor should be used?")
generateCmd.Flags().BoolP("createDir", "c", false, "Should directory be created if not present (recursive)?")
generateCmd.Flags().StringP("output", "o", "", "Where should the generated code saved to? Attention: Go=File, Java=Dir!")
generateCmd.Flags().StringP("packageName", "p", "", "Which package name should the generated code have?")
}

func generateMosaicKafkaGoCode(path string, out string, createDir bool, packageName string) {
func generateMosaicKafkaGoCode(path string, out string, createDir bool, packageName string, flavor string) {
if createDir {
file := filepath.Dir(out)
err := os.MkdirAll(file, os.ModePerm)
Expand All @@ -85,7 +85,7 @@ func generateMosaicKafkaGoCode(path string, out string, createDir bool, packageN
if err != nil {
logger.WithField("stack", fmt.Sprintf("%+v", err)).Fatalf("unable to generate code: %v", err)
}
output, err := generator.Generate()
output, err := generator.Generate(flavor)
if err != nil {
logger.WithField("stack", fmt.Sprintf("%+v", err)).Fatalf("unable to generate code: %v", err)
}
Expand All @@ -99,7 +99,7 @@ func generateMosaicKafkaGoCode(path string, out string, createDir bool, packageN
}
}

func generateMosaicKafkaJavaCode(path string, out string, createDir bool, packageName string) {
func generateMosaicKafkaJavaCode(path string, out string, createDir bool, packageName string, flavor string) {
if createDir {
err := os.MkdirAll(out, os.ModePerm)
if err != nil {
Expand Down
29 changes: 21 additions & 8 deletions generator/go/mosaicGoKafkaGenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ var typeConversionGoMap = map[string]string{
var templateFiles embed.FS

type MosaicKafkaGoCodeGenerator struct {
template *template.Template
data map[string]interface{}
log *logrus.Logger
mosaicTemplate *template.Template
normalTemplate *template.Template
data map[string]interface{}
log *logrus.Logger
}

func (thiz *MosaicKafkaGoCodeGenerator) getImports(data map[string]interface{}) []string {
Expand Down Expand Up @@ -202,16 +203,28 @@ func NewMosaicKafkaGoCodeGenerator(asyncApiSpecPath string, packageName string,
goKafkaGenerator.data = spec

tmpl := template.Must(template.New("mosaic-kafka-go-code.tmpl").Funcs(fns).ParseFS(templateFiles, "templates/mosaic-kafka-go-code.tmpl"))
goKafkaGenerator.template = tmpl
goKafkaGenerator.mosaicTemplate = tmpl

normalTmpl := template.Must(template.New("kafka-go-code.tmpl").Funcs(fns).ParseFS(templateFiles, "templates/kafka-go-code.tmpl"))
goKafkaGenerator.normalTemplate = normalTmpl
return &goKafkaGenerator, nil
}

func (thiz *MosaicKafkaGoCodeGenerator) Generate() ([]byte, error) {
func (thiz *MosaicKafkaGoCodeGenerator) Generate(flavor string) ([]byte, error) {
var tpl bytes.Buffer
err := thiz.template.Execute(&tpl, thiz.data)
if err != nil {
return nil, err
switch flavor {
case "mosaic":
err := thiz.mosaicTemplate.Execute(&tpl, thiz.data)
if err != nil {
return nil, err
}
default:
err := thiz.normalTemplate.Execute(&tpl, thiz.data)
if err != nil {
return nil, err
}
}

p, err := format.Source(tpl.Bytes())
if err != nil {
return nil, err
Expand Down
231 changes: 231 additions & 0 deletions generator/go/templates/kafka-go-code.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Package {{.packageName}} provides primitives to interact with the asyncAPI.
//
// Code generated by github.com/c0olix/asyncApiCodeGen DO NOT EDIT.
package {{.packageName}}
{{ $messages := getMessages .}} {{$objects := getObjects $messages}} {{$items := getItemObjects $messages}}
import (
"context"
"encoding/json"
"github.com/c0olix/goChan"
"github.com/go-playground/validator/v10"
"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
"reflect"
{{- range $imp := getImports .}}
"{{$imp}}"
{{- end}}
)

const (
{{- range $channelName, $channel := .channels }}
{{ lower $channelName | camel}}TopicName = "{{$channelName}}"
{{- end}}
)

var validate *validator.Validate = validator.New()

// KafkaTopicConfig Holds configuration needed fo create a kafka topic
type KafkaTopicConfig struct {
TopicGroup string
NumPartitions int
ReplicationFactor int
}

func UnmarshalEvent(proto interface{}, messageArray []byte) (interface{}, error) {
eventType := reflect.TypeOf(proto)
value := reflect.New(eventType).Interface()
err := json.Unmarshal(messageArray, value)
return value, err
}

{{ range $message := $messages }}
// {{$message.name}} {{$message.description}}
type {{$message.name}} struct {
{{- range $propertyName, $property := $message.payload.properties}} {{$required := checkRequired $propertyName $message.payload.required}}
{{camel $propertyName}} {{if not $required -}}*{{- end}}{{ convertToGoType $property -}} `json:"{{$propertyName}}{{if not $required -}},omitempty{{- end}}"{{validations $property $required }}`
{{- end}}
}

// Validate validates the {{$message.name}}) struct for its requirements
func (thiz {{$message.name}}) Validate() error{
return validate.Struct(thiz)
}

// New{{$message.name}} creates a new struct of the {{$message.name}} type and validates its content
func New{{$message.name}}({{range $propertyName, $property := $message.payload.properties}}{{$required := checkRequired $propertyName $message.payload.required}}{{$propertyName}} {{if not $required}}*{{end}}{{ convertToGoType $property}},{{end}}) (*{{$message.name}},error) {
{{lowerCamel $message.name}} := {{$message.name}}{
{{- range $propertyName, $property := $message.payload.properties}}
{{camel $propertyName}}: {{$propertyName}},
{{- end}}
}
err := {{lowerCamel $message.name}}.Validate()
if err != nil {
return nil, err
}
return &{{lowerCamel $message.name}}, nil
}
{{ end}}

{{ range $object := $objects }}
// {{$object.title}} Nested object for {{$object.parent}} type
type {{$object.title}} struct {
{{- range $objPropertyName, $objProperty := $object.properties}} {{$required := checkRequired $objPropertyName $object.required}}
{{camel $objPropertyName}} {{if not $required -}}*{{- end}}{{ convertToGoType $objProperty -}} `json:"{{$objPropertyName}}{{if not $required -}},omitempty{{- end}}"{{validations $objProperty $required }}`
{{- end}}
}

// Validate validates the {{$object.title}} struct for its requirements
func (thiz {{$object.title}}) Validate() error{
return validate.Struct(thiz)
}

// New{{$object.title}} creates a new struct of the {{$object.title}} type and validates its content
func New{{$object.title}}({{range $propertyName, $property := $object.properties}}{{$required := checkRequired $propertyName $object.required}}{{$propertyName}} {{if not $required}}*{{end}}{{ convertToGoType $property}},{{end}}) (*{{$object.title}},error) {
{{lowerCamel $object.title}} := {{$object.title}}{
{{- range $propertyName, $property := $object.properties}}
{{camel $propertyName}}: {{$propertyName}},
{{- end}}
}
err := {{lowerCamel $object.title}}.Validate()
if err != nil {
return nil, err
}
return &{{lowerCamel $object.title}}, nil
}
{{- end}}
{{ range $item := $items }}

// {{$item.items.title}} Nested object for {{$item.parent}} type
type {{$item.items.title}} struct {
{{- range $objPropertyName, $objProperty := $item.items.properties}} {{$required := checkRequired $objPropertyName $item.items.required}}
{{camel $objPropertyName}} {{if not $required -}}*{{- end}}{{ convertToGoType $objProperty -}} `json:"{{$objPropertyName}}{{if not $required -}},omitempty{{- end}}"{{validations $objProperty $required }}`
{{- end}}
}

// Validate validates the {{$item.items.title}} struct for its requirements
func (thiz {{$item.items.title}}) Validate() error{
return validate.Struct(thiz)
}

// New{{$item.items.title}} creates a new struct of the {{$item.items.title}} type and validates its content
func New{{$item.items.title}}({{range $propertyName, $property := $item.items.properties}}{{$required := checkRequired $propertyName $item.items.required}}{{$propertyName}} {{if not $required}}*{{end}}{{ convertToGoType $property}},{{end}}) (*{{$item.items.title}},error) {
{{lowerCamel $item.items.title}} := {{$item.items.title}}{
{{- range $propertyName, $property := $item.items.properties}}
{{camel $propertyName}}: {{$propertyName}},
{{- end}}
}
err := {{lowerCamel $item.items.title}}.Validate()
if err != nil {
return nil, err
}
return &{{lowerCamel $item.items.title}}, nil
}
{{- end}}


// ConsumerInterface Interface for all events to be consumed by application
type ConsumerInterface interface {
{{- range $channelName, $channel := .channels }}
{{- if $channel.publish}}
{{camel $channel.publish.operationId}}(handler goChan.Handler)
{{- end}}
{{- end}}
}

// ProducerInterface Interface for all events to be produced by application
type ProducerInterface interface {
{{- range $channelName, $channel := .channels }}
{{- if $channel.subscribe}}
{{camel $channel.subscribe.operationId}}(ctx context.Context, event {{$channel.subscribe.message.name}}) error
{{- end}}
{{- end}}
}

// DefaultConsumer implements ConsumerInterface and consumes events with go kafka mosaic style flavor
type DefaultConsumer struct {
{{- range $channelName, $channel := .channels }}
{{- if $channel.publish}}
{{ lower $channelName | camel}}Topic goChan.ChannelInterface
{{- end}}
{{- end}}
}

// DefaultProducer implements ProducerInterface and produces events with go kafka mosaic style flavor
type DefaultProducer struct {
{{- range $channelName, $channel := .channels }}
{{- if $channel.subscribe}}
{{ lower $channelName | camel}}Topic goChan.ChannelInterface
{{- end}}
{{- end}}
}

// NewDefaultConsumer wires all needed dependencies to create a DefaultConsumer
func NewDefaultConsumer(manager goChan.ManagerInterface, errorCallback func(ctx context.Context, err error), config KafkaTopicConfig,mw ...goChan.Middleware) (*DefaultConsumer, error) {
{{- range $channelName, $channel := .channels }}
{{- if $channel.publish}}
{{ lower $channelName | camel}}Topic, err := manager.CreateChannel({{ lower $channelName | camel}}TopicName, errorCallback, config)
if err != nil {
return nil, err
}
{{ lower $channelName | camel}}Topic.SetReaderMiddleWares(mw...)
{{- end}}
{{- end}}
return &DefaultConsumer{
{{- range $channelName, $channel := .channels }}
{{- if $channel.publish}}
{{ lower $channelName | camel}}Topic: {{ lower $channelName | camel}}Topic,
{{- end}}
{{- end}}
}, nil
}

// NewDefaultProducer wires all needed dependencies to create a DefaultProducer
func NewDefaultProducer(manager goChan.ManagerInterface, errorCallback func(ctx context.Context, err error), config KafkaTopicConfig, mw ...goChan.Middleware) (*DefaultProducer, error) {
{{- range $channelName, $channel := .channels }}
{{- if $channel.subscribe}}
{{ lower $channelName | camel}}Topic, err := manager.CreateChannel({{ lower $channelName | camel}}TopicName, errorCallback, config)
if err != nil {
return nil, err
}
{{ lower $channelName | camel}}Topic.SetWriterMiddleWares(mw...)
{{- end}}
{{- end}}
return &DefaultProducer{
{{- range $channelName, $channel := .channels }}
{{- if $channel.subscribe}}
{{ lower $channelName | camel}}Topic: {{ lower $channelName | camel}}Topic,
{{- end}}
{{- end}}
}, nil
}

{{- range $channelName, $channel := .channels }}
{{- if $channel.publish}}
//{{camel $channel.publish.operationId}} is the go kafka mosaic style flavored implementation of the ConsumerInterface registered on DefaultConsumer
// to consume the {{$channel.publish.message.name}} Event.
func (d DefaultConsumer) {{camel $channel.publish.operationId}} (handler goChan.Handler) {
d.{{ lower $channelName | camel}}Topic.Consume(handler)
}
{{- end}}
{{ end}}

{{- range $channelName, $channel := .channels }}
{{- if $channel.subscribe}}
//{{camel $channel.subscribe.operationId}} is the go kafka mosaic style flavored implementation of the ConsumerInterface registered on DefaultConsumer
// to produce the {{$channel.subscribe.message.name}} Event.
func (d DefaultProducer) {{camel $channel.subscribe.operationId}}(ctx context.Context, event {{$channel.subscribe.message.name}}) error {
eventData, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "unable to marshal eventdata")
}
msg := kafka.Message{
Value: eventData,
}
err = d.{{ lower $channelName | camel}}Topic.Produce(ctx, msg)
if err != nil {
return err
}
return nil
}
{{- end}}
{{- end}}

0 comments on commit cdb6373

Please sign in to comment.