Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish adding Event Grid validation, and add local testing tools #48

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"path"
"time"

"github.com/go-chi/chi/v5"
Expand All @@ -17,18 +18,30 @@ import (

type Config struct {
Logger *zap.Logger

Subscription string
ResourceGroup string
}

func (c *Config) validate() error {
if c.Logger == nil {
return errors.New("no logger was given")
}
if c.Subscription == "" {
return errors.New("no subscription given")
}
if c.ResourceGroup == "" {
return errors.New("no resource group given")
}
return nil
}

// Server handles both validating the Event Grid subscription and handling incoming events.
type Server struct {
logger *zap.Logger

subscription string
resourceGroup string
}

func NewServer(cfg *Config) (*Server, error) {
Expand All @@ -37,12 +50,25 @@ func NewServer(cfg *Config) (*Server, error) {
}

return &Server{
logger: cfg.Logger,
logger: cfg.Logger,
subscription: cfg.Subscription,
resourceGroup: cfg.ResourceGroup,
}, nil
}

var pathToTopic = map[string]string{
"/events/processed_portfolio": "processed-portfolios",
}

func (s *Server) verifyWebhook(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
topic, ok := pathToTopic[r.URL.Path]
if !ok {
s.logger.Error("no topic found for path", zap.String("path", r.URL.Path))
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}

if r.Header.Get("aeg-event-type") != "SubscriptionValidation" {
next.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -85,6 +111,21 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler {

s.logger.Info("received SubscriptionValidation request", zap.Any("req", req))

// Validate the request event type and topic
if got, want := req.EventType, "Microsoft.EventGrid.SubscriptionValidationEvent"; got != want {
s.logger.Error("invalid topic given for path", zap.String("got_event_type", got), zap.String("expected_event_type", want))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
fullTopic := path.Join("/subscriptions", s.subscription, "resourceGroups", s.resourceGroup, "providers/Microsoft.EventGrid/topics", topic)
if req.Topic != fullTopic {
s.logger.Error("invalid topic given for path", zap.String("got_topic", req.Topic), zap.String("expected_topic", fullTopic))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

s.logger.Info("validated SubscriptionValidation, responding success", zap.String("request_id", req.Id))

resp := struct {
ValidationResponse string `json:"validationResponse"`
}{req.Data.ValidationCode}
Expand Down
3 changes: 3 additions & 0 deletions cmd/server/configs/dev.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
env dev
allowed_cors_origin https://pacta.dev.rmi.siliconally.dev
port 80

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-dev
3 changes: 3 additions & 0 deletions cmd/server/configs/local.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
env local
allowed_cors_origin http://localhost:3000

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-local

secret_postgres_host UNUSED
# Also unused
secret_postgres_port 1234
Expand Down
9 changes: 7 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/RMI/pacta/task"
"github.com/Silicon-Ally/cryptorand"
"github.com/Silicon-Ally/zaphttplog"
"github.com/go-chi/chi/v5"
chi "github.com/go-chi/chi/v5"
"github.com/go-chi/httprate"
"github.com/go-chi/jwtauth/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -62,6 +62,9 @@ func run(args []string) error {
env = fs.String("env", "", "The environment that we're running in.")
localDSN = fs.String("local_dsn", "", "If set, override the DB addresses retrieved from the secret configuration. Can only be used when running locally.")

azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from")
azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from")

// Only when running locally because the Dockerized runner can't use local `az` CLI credentials
localDockerTenantID = fs.String("local_docker_tenant_id", "", "The Azure Tenant ID the localdocker service principal lives in")
localDockerClientID = fs.String("local_docker_client_id", "", "The client ID of the localdocker service principal")
Expand Down Expand Up @@ -276,7 +279,9 @@ func run(args []string) error {
})

eventSrv, err := azevents.NewServer(&azevents.Config{
Logger: logger,
Logger: logger,
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
})
if err != nil {
return fmt.Errorf("failed to init Azure Event Grid handler: %w", err)
Expand Down
43 changes: 40 additions & 3 deletions scripts/run_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cd "$ROOT"

# We keep it around because we'll need it at some point, but it can't be empty.
VALID_FLAGS=(
"unused"
"with_public_endpoint"
)

VALID_FLAGS_NO_ARGS=(
Expand All @@ -23,6 +23,27 @@ OPTS=$(getopt \
-- "$@"
)

if ! [ -x "$(command -v sops)" ]; then
echo 'Error: sops is not installed.' >&2
exit 1
fi
if ! [ -x "$(command -v jq)" ]; then
echo 'Error: jq is not installed.' >&2
exit 1
fi

SOPS_DATA="$(sops -d "${ROOT}/secrets/local.enc.json")"
LOCAL_DOCKER_CREDS="$(echo $SOPS_DATA | jq .localdocker)"

FRPC_PID=""
function cleanup {
if [[ ! -z "${FRPC_PID}" ]]; then
echo "Stopping FRP client/proxy..."
kill $FRPC_PID
fi
}
trap cleanup EXIT

eval set --$OPTS
declare -a FLAGS=()
while [ ! $# -eq 0 ]
Expand All @@ -31,6 +52,24 @@ do
--use_azure_runner)
FLAGS+=("--use_azure_runner")
;;
--with_public_endpoint)
if ! [ -x "$(command -v frpc)" ]; then
echo 'Error: frpc is not installed, cannot run the FRP client/proxy.' >&2
exit 1
fi
FRP="$(echo $SOPS_DATA | jq .frpc)"
FRP_ADDR="$(echo $FRP | jq -r .addr)"
echo "Running FRP proxy at ${FRP_ADDR}..."
frpc http \
--server_addr="$FRP_ADDR" \
--server_port="$(echo $FRP | jq -r .port)" \
--token="$(echo $FRP | jq -r .token)" \
--local_port=8081 \
--proxy_name="webhook-$2" \
--sd="$2" &
FRPC_PID=$!
shift # Extra shift for the subdomain parameter
;;
esac
shift
done
Expand All @@ -53,8 +92,6 @@ FLAGS+=(
"--local_dsn=${LOCAL_DSN}"
)

LOCAL_DOCKER_CREDS="$(sops -d --extract '["localdocker"]' "${ROOT}/secrets/local.enc.json")"

FLAGS+=(
"--local_docker_tenant_id=$(echo $LOCAL_DOCKER_CREDS | jq -r .tenant_id)"
"--local_docker_client_id=$(echo $LOCAL_DOCKER_CREDS | jq -r .client_id)"
Expand Down
9 changes: 7 additions & 2 deletions secrets/local.enc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"client_id": "ENC[AES256_GCM,data:yex2my4EAYV3k4czIZXn4gnANaS/jmpDV8gFVKOgeII5ce+h,iv:6/YnhiTdR1sVeR2QhNPmI/q1jb3oSQDODTqxAbDlESo=,tag:7SNZU0pbTt5QweGzKh3XoA==,type:str]",
"password": "ENC[AES256_GCM,data:rA7aewwH4umPWAnzuOen8oYgLwvQsf19rfGOWARbGfW5lXSuxx4F4g==,iv:KRq/lxZ28JfUkGeBlDvmMfpVLVbPlEmOD8l/aRfG5Zo=,tag:pH0oAueSsrXaXcuXM4LlEQ==,type:str]"
},
"frpc": {
"addr": "ENC[AES256_GCM,data:mke8TeqY6AWnbiWRQa6D0w==,iv:O8tnERYeDK2ACeOzlEiHnhty9ILYoi8cCSa80ngDHg4=,tag:zA/zMc0DIATseyF5BUVymQ==,type:str]",
"port": "ENC[AES256_GCM,data:d1sh9w==,iv:bmzN6duVQylgRXAchkPavtUsdbNdd+S3f7N73ukFV2A=,tag:4nFCulM5yYdMvTY2CwiJ0g==,type:float]",
"token": "ENC[AES256_GCM,data:qe9Hy0tKKrQ7rFzOQuTlDKGDqA2489hvEbjbtYFMuJ3wd5gy,iv:gV+OasDEHtI28TVBFmyxrPjMqEv5J1jpWMAzR1N70MQ=,tag:543oFV21kJj9qBgkyewdIg==,type:str]"
},
"sops": {
"kms": null,
"gcp_kms": null,
Expand All @@ -18,8 +23,8 @@
],
"hc_vault": null,
"age": null,
"lastmodified": "2023-10-28T02:50:38Z",
"mac": "ENC[AES256_GCM,data:16VSdcgdGXVxl6xeCv/2LmwOnHyJE8Y9sOEmY2y/c/CpAgNVLF+yBtQnBSQyMlVUskuBEFQxpcm1TQXjz11ngtlRIaITqJGcPDQoIdQMUvamP8Ku69pxrc0UPZOR5qv5BkAoZapCxH5xWWQPakgaHf74NES4Bn2FUe8tR6T50JM=,iv:gSTP1rTGhui1yuCsREeNR9JFcXfVyGfLcTTInAu27qs=,tag:0rv8y1y4qvTkL6DtH50KEw==,type:str]",
"lastmodified": "2023-10-31T18:08:10Z",
"mac": "ENC[AES256_GCM,data:OtzdhIFbPxagE61i0gA871aw8Pkl/PZrwigtX2S3tisOviuYwK0h1QNZN+eKTDXyEbs7fht4ruo4m6EEmZS2nwvgquAUaoA9A9yXARfYFRPQ3D5a21WH8nlN0JMBGoD70VwGjw22Yy2XfG5FbaOnVN2eQ5QpSHaDw1umEiR0rWE=,iv:YZIof2lisX96LDntHnsyDKzQjtlel5SM+Tvpq4C9Sn0=,tag:gEGFEGNB1Fvf1VMKLokMYg==,type:str]",
"pgp": null,
"unencrypted_suffix": "_unencrypted",
"version": "3.8.1"
Expand Down