From 47ccc473dab78591f8a8ea26fefa028562faa3d8 Mon Sep 17 00:00:00 2001 From: Brandon Sprague Date: Tue, 31 Oct 2023 12:50:07 -0700 Subject: [PATCH] Finish adding Event Grid validation, and add local testing tools This PR finishes up the work of validating requests from Event Grid, and adds support for running the server with a public-facing endpoint via the `--with_public_endpoint=` flag, which is proxied via [frp](https://github.com/fatedier/frp). I didn't use our [frpembed](https://github.com/Silicon-Ally/frpembed) library because there's an issue with `quic-go` not supporting Go 1.20 (may be fixed, didn't want to go down the rabbithole) --- azure/azevents/azevents.go | 43 ++++++++++++++++++++++++++++++++++- cmd/server/configs/dev.conf | 3 +++ cmd/server/configs/local.conf | 3 +++ cmd/server/main.go | 9 ++++++-- scripts/run_server.sh | 43 ++++++++++++++++++++++++++++++++--- secrets/local.enc.json | 9 ++++++-- 6 files changed, 102 insertions(+), 8 deletions(-) diff --git a/azure/azevents/azevents.go b/azure/azevents/azevents.go index 3b0d897..b91b121 100644 --- a/azure/azevents/azevents.go +++ b/azure/azevents/azevents.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net/http" + "path" "time" "github.com/go-chi/chi/v5" @@ -17,18 +18,30 @@ import ( type Config struct { Logger *zap.Logger + + Subscription string + ResourceGroup string } func (c *Config) validate() error { if c.Logger == nil { return errors.New("no logger was given") } + if c.Subscription == "" { + return errors.New("no subscription given") + } + if c.ResourceGroup == "" { + return errors.New("no resource group given") + } return nil } // Server handles both validating the Event Grid subscription and handling incoming events. type Server struct { logger *zap.Logger + + subscription string + resourceGroup string } func NewServer(cfg *Config) (*Server, error) { @@ -37,12 +50,25 @@ func NewServer(cfg *Config) (*Server, error) { } return &Server{ - logger: cfg.Logger, + logger: cfg.Logger, + subscription: cfg.Subscription, + resourceGroup: cfg.ResourceGroup, }, nil } +var pathToTopic = map[string]string{ + "/events/processed_portfolio": "processed-portfolios", +} + func (s *Server) verifyWebhook(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + topic, ok := pathToTopic[r.URL.Path] + if !ok { + s.logger.Error("no topic found for path", zap.String("path", r.URL.Path)) + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + return + } + if r.Header.Get("aeg-event-type") != "SubscriptionValidation" { next.ServeHTTP(w, r) return @@ -85,6 +111,21 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler { s.logger.Info("received SubscriptionValidation request", zap.Any("req", req)) + // Validate the request event type and topic + if got, want := req.EventType, "Microsoft.EventGrid.SubscriptionValidationEvent"; got != want { + s.logger.Error("invalid topic given for path", zap.String("got_event_type", got), zap.String("expected_event_type", want)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + fullTopic := path.Join("/subscriptions", s.subscription, "resourceGroups", s.resourceGroup, "providers/Microsoft.EventGrid/topics", topic) + if req.Topic != fullTopic { + s.logger.Error("invalid topic given for path", zap.String("got_topic", req.Topic), zap.String("expected_topic", fullTopic)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + + s.logger.Info("validated SubscriptionValidation, responding success", zap.String("request_id", req.Id)) + resp := struct { ValidationResponse string `json:"validationResponse"` }{req.Data.ValidationCode} diff --git a/cmd/server/configs/dev.conf b/cmd/server/configs/dev.conf index f41f323..b972132 100644 --- a/cmd/server/configs/dev.conf +++ b/cmd/server/configs/dev.conf @@ -1,3 +1,6 @@ env dev allowed_cors_origin https://pacta.dev.rmi.siliconally.dev port 80 + +azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9 +azure_event_resource_group rmi-pacta-dev diff --git a/cmd/server/configs/local.conf b/cmd/server/configs/local.conf index c5fced3..35e210d 100644 --- a/cmd/server/configs/local.conf +++ b/cmd/server/configs/local.conf @@ -1,6 +1,9 @@ env local allowed_cors_origin http://localhost:3000 +azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9 +azure_event_resource_group rmi-pacta-local + secret_postgres_host UNUSED # Also unused secret_postgres_port 1234 diff --git a/cmd/server/main.go b/cmd/server/main.go index 15075f9..7102290 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -26,7 +26,7 @@ import ( "github.com/RMI/pacta/task" "github.com/Silicon-Ally/cryptorand" "github.com/Silicon-Ally/zaphttplog" - "github.com/go-chi/chi/v5" + chi "github.com/go-chi/chi/v5" "github.com/go-chi/httprate" "github.com/go-chi/jwtauth/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -62,6 +62,9 @@ func run(args []string) error { env = fs.String("env", "", "The environment that we're running in.") localDSN = fs.String("local_dsn", "", "If set, override the DB addresses retrieved from the secret configuration. Can only be used when running locally.") + azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from") + azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from") + // Only when running locally because the Dockerized runner can't use local `az` CLI credentials localDockerTenantID = fs.String("local_docker_tenant_id", "", "The Azure Tenant ID the localdocker service principal lives in") localDockerClientID = fs.String("local_docker_client_id", "", "The client ID of the localdocker service principal") @@ -276,7 +279,9 @@ func run(args []string) error { }) eventSrv, err := azevents.NewServer(&azevents.Config{ - Logger: logger, + Logger: logger, + Subscription: *azEventSubscription, + ResourceGroup: *azEventResourceGroup, }) if err != nil { return fmt.Errorf("failed to init Azure Event Grid handler: %w", err) diff --git a/scripts/run_server.sh b/scripts/run_server.sh index 8704113..58f6336 100755 --- a/scripts/run_server.sh +++ b/scripts/run_server.sh @@ -6,7 +6,7 @@ cd "$ROOT" # We keep it around because we'll need it at some point, but it can't be empty. VALID_FLAGS=( - "unused" + "with_public_endpoint" ) VALID_FLAGS_NO_ARGS=( @@ -23,6 +23,27 @@ OPTS=$(getopt \ -- "$@" ) +if ! [ -x "$(command -v sops)" ]; then + echo 'Error: sops is not installed.' >&2 + exit 1 +fi +if ! [ -x "$(command -v jq)" ]; then + echo 'Error: jq is not installed.' >&2 + exit 1 +fi + +SOPS_DATA="$(sops -d "${ROOT}/secrets/local.enc.json")" +LOCAL_DOCKER_CREDS="$(echo $SOPS_DATA | jq .localdocker)" + +FRPC_PID="" +function cleanup { + if [[ ! -z "${FRPC_PID}" ]]; then + echo "Stopping FRP client/proxy..." + kill $FRPC_PID + fi +} +trap cleanup EXIT + eval set --$OPTS declare -a FLAGS=() while [ ! $# -eq 0 ] @@ -31,6 +52,24 @@ do --use_azure_runner) FLAGS+=("--use_azure_runner") ;; + --with_public_endpoint) + if ! [ -x "$(command -v frpc)" ]; then + echo 'Error: frpc is not installed, cannot run the FRP client/proxy.' >&2 + exit 1 + fi + FRP="$(echo $SOPS_DATA | jq .frpc)" + FRP_ADDR="$(echo $FRP | jq -r .addr)" + echo "Running FRP proxy at ${FRP_ADDR}..." + frpc http \ + --server_addr="$FRP_ADDR" \ + --server_port="$(echo $FRP | jq -r .port)" \ + --token="$(echo $FRP | jq -r .token)" \ + --local_port=8081 \ + --proxy_name="webhook-$2" \ + --sd="$2" & + FRPC_PID=$! + shift # Extra shift for the subdomain parameter + ;; esac shift done @@ -53,8 +92,6 @@ FLAGS+=( "--local_dsn=${LOCAL_DSN}" ) -LOCAL_DOCKER_CREDS="$(sops -d --extract '["localdocker"]' "${ROOT}/secrets/local.enc.json")" - FLAGS+=( "--local_docker_tenant_id=$(echo $LOCAL_DOCKER_CREDS | jq -r .tenant_id)" "--local_docker_client_id=$(echo $LOCAL_DOCKER_CREDS | jq -r .client_id)" diff --git a/secrets/local.enc.json b/secrets/local.enc.json index 8ba7e9b..0c3f1f1 100644 --- a/secrets/local.enc.json +++ b/secrets/local.enc.json @@ -4,6 +4,11 @@ "client_id": "ENC[AES256_GCM,data:yex2my4EAYV3k4czIZXn4gnANaS/jmpDV8gFVKOgeII5ce+h,iv:6/YnhiTdR1sVeR2QhNPmI/q1jb3oSQDODTqxAbDlESo=,tag:7SNZU0pbTt5QweGzKh3XoA==,type:str]", "password": "ENC[AES256_GCM,data:rA7aewwH4umPWAnzuOen8oYgLwvQsf19rfGOWARbGfW5lXSuxx4F4g==,iv:KRq/lxZ28JfUkGeBlDvmMfpVLVbPlEmOD8l/aRfG5Zo=,tag:pH0oAueSsrXaXcuXM4LlEQ==,type:str]" }, + "frpc": { + "addr": "ENC[AES256_GCM,data:mke8TeqY6AWnbiWRQa6D0w==,iv:O8tnERYeDK2ACeOzlEiHnhty9ILYoi8cCSa80ngDHg4=,tag:zA/zMc0DIATseyF5BUVymQ==,type:str]", + "port": "ENC[AES256_GCM,data:d1sh9w==,iv:bmzN6duVQylgRXAchkPavtUsdbNdd+S3f7N73ukFV2A=,tag:4nFCulM5yYdMvTY2CwiJ0g==,type:float]", + "token": "ENC[AES256_GCM,data:qe9Hy0tKKrQ7rFzOQuTlDKGDqA2489hvEbjbtYFMuJ3wd5gy,iv:gV+OasDEHtI28TVBFmyxrPjMqEv5J1jpWMAzR1N70MQ=,tag:543oFV21kJj9qBgkyewdIg==,type:str]" + }, "sops": { "kms": null, "gcp_kms": null, @@ -18,8 +23,8 @@ ], "hc_vault": null, "age": null, - "lastmodified": "2023-10-28T02:50:38Z", - "mac": "ENC[AES256_GCM,data:16VSdcgdGXVxl6xeCv/2LmwOnHyJE8Y9sOEmY2y/c/CpAgNVLF+yBtQnBSQyMlVUskuBEFQxpcm1TQXjz11ngtlRIaITqJGcPDQoIdQMUvamP8Ku69pxrc0UPZOR5qv5BkAoZapCxH5xWWQPakgaHf74NES4Bn2FUe8tR6T50JM=,iv:gSTP1rTGhui1yuCsREeNR9JFcXfVyGfLcTTInAu27qs=,tag:0rv8y1y4qvTkL6DtH50KEw==,type:str]", + "lastmodified": "2023-10-31T18:08:10Z", + "mac": "ENC[AES256_GCM,data:OtzdhIFbPxagE61i0gA871aw8Pkl/PZrwigtX2S3tisOviuYwK0h1QNZN+eKTDXyEbs7fht4ruo4m6EEmZS2nwvgquAUaoA9A9yXARfYFRPQ3D5a21WH8nlN0JMBGoD70VwGjw22Yy2XfG5FbaOnVN2eQ5QpSHaDw1umEiR0rWE=,iv:YZIof2lisX96LDntHnsyDKzQjtlel5SM+Tvpq4C9Sn0=,tag:gEGFEGNB1Fvf1VMKLokMYg==,type:str]", "pgp": null, "unencrypted_suffix": "_unencrypted", "version": "3.8.1"