diff --git a/package-lock.json b/package-lock.json index b68d919..17aabc8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4272,6 +4272,14 @@ "resolved": "https://registry.npmjs.org/@types/http-errors/-/http-errors-2.0.1.tgz", "integrity": "sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==" }, + "node_modules/@types/http-proxy": { + "version": "1.17.11", + "resolved": "https://registry.npmjs.org/@types/http-proxy/-/http-proxy-1.17.11.tgz", + "integrity": "sha512-HC8G7c1WmaF2ekqpnFq626xd3Zz0uvaqFmBJNRZCGEZCXkvSdJoNFn/8Ygbd9fKNQj8UzLdCETaI0UWPAjK7IA==", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/ioredis4": { "name": "@types/ioredis", "version": "4.28.10", @@ -7118,8 +7126,7 @@ "node_modules/eventemitter3": { "version": "4.0.7", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==", - "dev": true + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" }, "node_modules/execa": { "version": "5.1.1", @@ -8253,6 +8260,19 @@ "node": ">= 0.8" } }, + "node_modules/http-proxy": { + "version": "1.18.1", + "resolved": "https://registry.npmjs.org/http-proxy/-/http-proxy-1.18.1.tgz", + "integrity": "sha512-7mz/721AbnJwIVbnaSv1Cz3Am0ZLT/UBwkC92VlxhXv/k/BBQfM2fXElQNC27BVGr0uwUpplYPQM9LnaBMR5NQ==", + "dependencies": { + "eventemitter3": "^4.0.0", + "follow-redirects": "^1.0.0", + "requires-port": "^1.0.0" + }, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/http-proxy-agent": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/http-proxy-agent/-/http-proxy-agent-5.0.0.tgz", @@ -8267,6 +8287,40 @@ "node": ">= 6" } }, + "node_modules/http-proxy-middleware": { + "version": "2.0.6", + "resolved": "https://registry.npmjs.org/http-proxy-middleware/-/http-proxy-middleware-2.0.6.tgz", + "integrity": "sha512-ya/UeJ6HVBYxrgYotAZo1KvPWlgB48kUJLDePFeneHsVujFaW5WNj2NgWCAE//B1Dl02BIfYlpNgBy8Kf8Rjmw==", + "dependencies": { + "@types/http-proxy": "^1.17.8", + "http-proxy": "^1.18.1", + "is-glob": "^4.0.1", + "is-plain-obj": "^3.0.0", + "micromatch": "^4.0.2" + }, + "engines": { + "node": ">=12.0.0" + }, + "peerDependencies": { + "@types/express": "^4.17.13" + }, + "peerDependenciesMeta": { + "@types/express": { + "optional": true + } + } + }, + "node_modules/http-proxy-middleware/node_modules/is-plain-obj": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/is-plain-obj/-/is-plain-obj-3.0.0.tgz", + "integrity": "sha512-gwsOE28k+23GP1B6vFl1oVh/WOzmawBrKwo5Ev6wMKzPkaXaCDIQKzLnvsA42DRlbVTWorkgTKIviAKCWkfUwA==", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/https-proxy-agent": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz", @@ -13632,6 +13686,11 @@ "node": ">=6" } }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==" + }, "node_modules/resolve": { "version": "1.22.1", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.1.tgz", @@ -15869,6 +15928,7 @@ "@traceloop/instrument-opentelemetry": "^0.5.0", "express": "^4.18.2", "google-protobuf": "^3.0.0", + "http-proxy-middleware": "^2.0.6", "pg": "^8.9.0", "typeorm": "^0.3.12", "uuid": "^9.0.0" @@ -18973,6 +19033,7 @@ "@types/uuid": "^9.0.0", "express": "^4.18.2", "google-protobuf": "^3.0.0", + "http-proxy-middleware": "*", "pg": "^8.9.0", "rollup": "^3.20.0", "rollup-plugin-swc3": "^0.8.0", @@ -19233,6 +19294,14 @@ "resolved": "https://registry.npmjs.org/@types/http-errors/-/http-errors-2.0.1.tgz", "integrity": "sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==" }, + "@types/http-proxy": { + "version": "1.17.11", + "resolved": "https://registry.npmjs.org/@types/http-proxy/-/http-proxy-1.17.11.tgz", + "integrity": "sha512-HC8G7c1WmaF2ekqpnFq626xd3Zz0uvaqFmBJNRZCGEZCXkvSdJoNFn/8Ygbd9fKNQj8UzLdCETaI0UWPAjK7IA==", + "requires": { + "@types/node": "*" + } + }, "@types/ioredis4": { "version": "npm:@types/ioredis@4.28.10", "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.28.10.tgz", @@ -21355,8 +21424,7 @@ "eventemitter3": { "version": "4.0.7", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==", - "dev": true + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" }, "execa": { "version": "5.1.1", @@ -22234,6 +22302,16 @@ } } }, + "http-proxy": { + "version": "1.18.1", + "resolved": "https://registry.npmjs.org/http-proxy/-/http-proxy-1.18.1.tgz", + "integrity": "sha512-7mz/721AbnJwIVbnaSv1Cz3Am0ZLT/UBwkC92VlxhXv/k/BBQfM2fXElQNC27BVGr0uwUpplYPQM9LnaBMR5NQ==", + "requires": { + "eventemitter3": "^4.0.0", + "follow-redirects": "^1.0.0", + "requires-port": "^1.0.0" + } + }, "http-proxy-agent": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/http-proxy-agent/-/http-proxy-agent-5.0.0.tgz", @@ -22245,6 +22323,25 @@ "debug": "4" } }, + "http-proxy-middleware": { + "version": "2.0.6", + "resolved": "https://registry.npmjs.org/http-proxy-middleware/-/http-proxy-middleware-2.0.6.tgz", + "integrity": "sha512-ya/UeJ6HVBYxrgYotAZo1KvPWlgB48kUJLDePFeneHsVujFaW5WNj2NgWCAE//B1Dl02BIfYlpNgBy8Kf8Rjmw==", + "requires": { + "@types/http-proxy": "^1.17.8", + "http-proxy": "^1.18.1", + "is-glob": "^4.0.1", + "is-plain-obj": "^3.0.0", + "micromatch": "^4.0.2" + }, + "dependencies": { + "is-plain-obj": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/is-plain-obj/-/is-plain-obj-3.0.0.tgz", + "integrity": "sha512-gwsOE28k+23GP1B6vFl1oVh/WOzmawBrKwo5Ev6wMKzPkaXaCDIQKzLnvsA42DRlbVTWorkgTKIviAKCWkfUwA==" + } + } + }, "https-proxy-agent": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz", @@ -26383,6 +26480,11 @@ "resolve": "^1.22.1" } }, + "requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==" + }, "resolve": { "version": "1.22.1", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.1.tgz", diff --git a/package.json b/package.json index 82252f0..cb5d0d8 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "start:otel-receiver": "lerna run --scope @traceloop/otel-receiver start", "start:test-servers": "lerna run --scope @traceloop/test-servers start", "docker:instrument-opentelemetry": "docker build -f packages/instrument-opentelemetry/Dockerfile . -t instrument-opentelemetry", - "docker:test-servers": "docker build -f packages/test-servers/Dockerfile . -t test-servers", + "docker:test-servers": "docker build --platform linux/amd64 -f packages/test-servers/Dockerfile . -t test-servers", "test": "jest", "test-ci": "concurrently -k --success \"command-1\" --hide 0 \"npm:start:test-servers\" \"npm:test\"", "release": "npm run build && lerna publish --conventional-commits --no-private" diff --git a/packages/instrument-opentelemetry/src/tracing.ts b/packages/instrument-opentelemetry/src/tracing.ts index 77a9ac4..8cf8a0b 100644 --- a/packages/instrument-opentelemetry/src/tracing.ts +++ b/packages/instrument-opentelemetry/src/tracing.ts @@ -14,6 +14,9 @@ if (process.env.OTEL_EXPORTER_OTLP_ENDPOINT) { ? new ProtoExporter({ url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, timeoutMillis: 100, + // headers: { + // authorization: 'testtttt', + // }, }) : new GRPCExporter({ url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, diff --git a/packages/test-servers/Dockerfile b/packages/test-servers/Dockerfile index 6b38d09..b06fce7 100644 --- a/packages/test-servers/Dockerfile +++ b/packages/test-servers/Dockerfile @@ -11,6 +11,7 @@ COPY lerna.json ./ COPY tsconfig.json ./ COPY packages/test-servers/. ./packages/test-servers/ +RUN npm install @traceloop/instrument-opentelemetry RUN npm run build -CMD node ./packages/test-servers/dist/index.js \ No newline at end of file +CMD ["sh", "-c", "cd packages/test-servers && npm run start"] \ No newline at end of file diff --git a/packages/test-servers/README.md b/packages/test-servers/README.md new file mode 100644 index 0000000..e6e759c --- /dev/null +++ b/packages/test-servers/README.md @@ -0,0 +1,17 @@ +# test-servers + +This is a test environment which sends telemetry data (by default to Traceloop servers). +A single container (listening on multiple ports) mimics different microservices that communicate via http and gRPC. + +Services include: + +- gateway +- users service +- gigs service +- orders service +- emails service +- bi grpc service + +## Architecture + +![architecture](./public/architecture.png) diff --git a/packages/test-servers/package.json b/packages/test-servers/package.json index 10abf9e..5c49521 100644 --- a/packages/test-servers/package.json +++ b/packages/test-servers/package.json @@ -8,11 +8,15 @@ }, "scripts": { "prebuild": "rm -rf dist", - "build": "rollup -c && cp ./src/helloworld.proto ./dist/helloworld.proto", - "start:orders": "ORDERS_SERVICE=TRUE SERVICE_NAME=orders-service PORT=3000 OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4123/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", - "start:emails": "EMAILS_SERVICE=TRUE SERVICE_NAME=emails-service PORT=3001 OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4123/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", - "start:grpc": "GRPC_SERVICE=TRUE SERVICE_NAME=grpc-service PORT=50051 OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4123/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", - "start": "concurrently \"npm:start:orders\" \"npm:start:emails\" \"npm:start:grpc\"" + "build": "rollup -c && cp ./src/bi.proto ./dist/bi.proto", + "start:emails": "EMAILS_SERVICE=TRUE SERVICE_NAME=test-servers-emails OTEL_EXPORTER_OTLP_ENDPOINT=https://api-staging.traceloop.dev/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", + "start:grpc": "GRPC_SERVICE=TRUE SERVICE_NAME=test-servers-grpc OTEL_EXPORTER_OTLP_ENDPOINT=https://api-staging.traceloop.dev/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", + "start:users": "USERS_SERVICE=TRUE SERVICE_NAME=test-servers-users OTEL_EXPORTER_OTLP_ENDPOINT=https://api-staging.traceloop.dev/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", + "start:gigs": "GIGS_SERVICE=TRUE SERVICE_NAME=test-servers-gigs OTEL_EXPORTER_OTLP_ENDPOINT=https://api-staging.traceloop.dev/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", + "start:orders": "ORDERS_SERVICE=TRUE SERVICE_NAME=test-servers-orders OTEL_EXPORTER_OTLP_ENDPOINT=https://api-staging.traceloop.dev/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", + "start:gateway": "GATEWAY_SERVICE=TRUE SERVICE_NAME=test-servers-gateway PORT=3000 OTEL_EXPORTER_OTLP_ENDPOINT=https://api-staging.traceloop.dev/v1/traces OTEL_EXPORTER_TYPE=PROTO node -r @traceloop/instrument-opentelemetry dist/index.js", + "start:uninstrumented": "NOT_INSTRUMENTED=TRUE node dist/index.js", + "start": "concurrently \"npm:start:users\" \"npm:start:gigs\" \"npm:start:orders\" \"npm:start:gateway\" \"npm:start:emails\" \"npm:start:grpc\" \"npm:start:uninstrumented\"" }, "devDependencies": { "@types/express": "^4.17.17", @@ -25,6 +29,7 @@ "@traceloop/instrument-opentelemetry": "^0.5.0", "express": "^4.18.2", "google-protobuf": "^3.0.0", + "http-proxy-middleware": "^2.0.6", "pg": "^8.9.0", "typeorm": "^0.3.12", "uuid": "^9.0.0" diff --git a/packages/test-servers/public/architecture.png b/packages/test-servers/public/architecture.png new file mode 100644 index 0000000..c06bd27 Binary files /dev/null and b/packages/test-servers/public/architecture.png differ diff --git a/packages/test-servers/src/bi-grpc-service.ts b/packages/test-servers/src/bi-grpc-service.ts new file mode 100644 index 0000000..a23d6dc --- /dev/null +++ b/packages/test-servers/src/bi-grpc-service.ts @@ -0,0 +1,32 @@ +import grpc from '@grpc/grpc-js'; +import protoLoader from '@grpc/proto-loader'; +import { GRPC_SERVICE_PORT } from './constants'; + +const PROTO_PATH = __dirname + '/bi.proto'; +const packageDefinition = protoLoader.loadSync(PROTO_PATH, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, +}); +const bi_proto = grpc.loadPackageDefinition(packageDefinition).bi; + +export const biGrpcService = new grpc.Server(); + +function reportBi(call: any, callback: any) { + callback(null, { message: 'Received BI event from ' + call.request.name }); +} +biGrpcService.addService((bi_proto as any).Bi.service, { + reportBi: reportBi, +}); + +const client = new (bi_proto as any).Bi( + `localhost:${GRPC_SERVICE_PORT}`, + grpc.credentials.createInsecure(), +); + +// should be called from other services (makes an rpc call to the bi grpc service) +export const sendBiEvent = (name: string, id: string) => { + client.reportBi({ name: name, id: id }, function (_: any, response: any) {}); // eslint-disable-line +}; diff --git a/packages/test-servers/src/bi.proto b/packages/test-servers/src/bi.proto new file mode 100644 index 0000000..b13d86c --- /dev/null +++ b/packages/test-servers/src/bi.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_outer_classname = "ReportBiProto"; +option objc_class_prefix = "HLW"; + +package bi; + +service Bi { + rpc ReportBi (ReportBiEvent) returns (ReportBiReply) {} +} + +// The request message containing the BI data. +message ReportBiEvent { + string name = 1; + string id = 2; +} + +// The response message. +message ReportBiReply { + string message = 1; +} diff --git a/packages/test-servers/src/constants.ts b/packages/test-servers/src/constants.ts new file mode 100644 index 0000000..b061352 --- /dev/null +++ b/packages/test-servers/src/constants.ts @@ -0,0 +1,9 @@ +export const USERS_SERVICE_PORT = 3001, + GIGS_SERVICE_PORT = 3002, + ORDERS_SERVICE_PORT = 3003, + EMAILS_SERVICE_PORT = 3004, + GRPC_SERVICE_PORT = 50051; + +export const GATEWAY_SERVICE_PORT = process.env.PORT + ? Number(process.env.PORT) + : 3000; diff --git a/packages/test-servers/src/email-service.ts b/packages/test-servers/src/email-service.ts new file mode 100644 index 0000000..b41caf9 --- /dev/null +++ b/packages/test-servers/src/email-service.ts @@ -0,0 +1,21 @@ +import express from 'express'; +import axios from 'axios'; +import { EMAILS_SERVICE_PORT } from './constants'; + +export const emailsService = express(); + +emailsService.post('/emails/send', (req, res) => { + res.send('Email sent!'); +}); + +// should be called from other services (makes an http call to the emails service) +export const sendEmail = async (body: any) => { + try { + return await axios.post( + `http://localhost:${EMAILS_SERVICE_PORT}/emails/send`, + body, + ); + } catch (err) { + console.error('Error sending email', err); + } +}; diff --git a/packages/test-servers/src/gateway.ts b/packages/test-servers/src/gateway.ts new file mode 100644 index 0000000..5c9bd5a --- /dev/null +++ b/packages/test-servers/src/gateway.ts @@ -0,0 +1,35 @@ +import express from 'express'; +import { createProxyMiddleware } from 'http-proxy-middleware'; +import { + USERS_SERVICE_PORT, + GIGS_SERVICE_PORT, + ORDERS_SERVICE_PORT, +} from './constants'; + +export const gatewayService = express(); + +if (process.env.GATEWAY_SERVICE) { + gatewayService.use( + '/users', + createProxyMiddleware({ + target: `http://localhost:${USERS_SERVICE_PORT}`, + changeOrigin: true, + }), + ); + + gatewayService.use( + '/gigs', + createProxyMiddleware({ + target: `http://localhost:${GIGS_SERVICE_PORT}`, + changeOrigin: true, + }), + ); + + gatewayService.use( + '/orders', + createProxyMiddleware({ + target: `http://localhost:${ORDERS_SERVICE_PORT}`, + changeOrigin: true, + }), + ); +} diff --git a/packages/test-servers/src/gig-service.ts b/packages/test-servers/src/gig-service.ts new file mode 100644 index 0000000..8899eb6 --- /dev/null +++ b/packages/test-servers/src/gig-service.ts @@ -0,0 +1,109 @@ +import express from 'express'; +import { v4 as uuidv4 } from 'uuid'; +import { initializeDbIfNeeded, postgresDb } from './postgres'; +import { sendEmail } from './email-service'; + +export const gigsService = express(); +gigsService.use(express.json()); + +gigsService.get('/gigs:gigId', async (req, res) => { + await initializeDbIfNeeded(); + + const gigId = req.params.gigId; + console.log(`Getting gig ${gigId}`); + + try { + const gigRes = await postgresDb.query( + `SELECT * FROM gigs WHERE id = '${gigId}'`, + ); + if (!gigRes?.length) { + throw new Error(`Gig ${gigId} not found`); + } + + res.status(200); + res.send(gigRes[0]); + } catch (err) { + console.error('Error getting gig', err); + res.status(500); + res.send({ error: 'Error getting gig', message: err.message }); + } +}); + +gigsService.delete('/gigs/:gigId', async (req, res) => { + await initializeDbIfNeeded(); + + const gigId = req.params.gigId; + console.log(`Deleting gig ${gigId}`); + + try { + const gigRes = await postgresDb.query( + `SELECT * FROM gigs WHERE id = '${gigId}'`, + ); + if (!gigRes?.length) { + throw new Error(`Gig ${gigId} not found`); + } + + await postgresDb.query(`DELETE FROM gigs WHERE id = '${gigId}'`); + + res.status(200); + res.send({ message: `Gig ${gigId} deleted` }); + } catch (err) { + console.error('Error deleting gig', err); + res.status(500); + res.send({ error: 'Error deleting gig', message: err.message }); + } +}); + +// required body params: +// - user_id +// - title +gigsService.post('/gigs/create', async (req, res) => { + await initializeDbIfNeeded(); + + const gigId = uuidv4(); + console.log( + `Creating gig ${gigId} with title ${req.body.title} for user ${req.body.user_id}`, + ); + + try { + // check user exists + const userRes = await postgresDb.query( + `SELECT * FROM users WHERE id = '${req.body.user_id}'`, + ); + if (!userRes?.length) { + throw new Error(`User ${req.body.user_id} not found`); + } + const userId = userRes[0].id; + + // check gig doesn't already exist with the same title + const existingGigs = await postgresDb.query( + `SELECT * FROM gigs WHERE user_id = '${userId}' AND title = '${req.body.title}'`, + ); + if (existingGigs?.length) { + throw new Error( + `A gig with the title "${req.body.title}" already exists for user ${req.body.user_id}`, + ); + } + + // create gig + await postgresDb.query( + `INSERT INTO gigs (id, title, user_id) VALUES ('${gigId}', '${req.body.title}', '${userId}')`, + ); + + // send email + sendEmail({ + message: 'Gig created!', + title: req.body.title, + user: req.body.user_id, + id: gigId, + }); + + res.status(201); + res.send({ gigId }); + } catch (err) { + console.error('Error creating gig', err); + + res.status(500); + res.send({ error: 'Error creating gig', message: err.message }); + } +}); diff --git a/packages/test-servers/src/helloworld.proto b/packages/test-servers/src/helloworld.proto deleted file mode 100644 index 688974b..0000000 --- a/packages/test-servers/src/helloworld.proto +++ /dev/null @@ -1,24 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "io.grpc.examples.helloworld"; -option java_outer_classname = "HelloWorldProto"; -option objc_class_prefix = "HLW"; - -package helloworld; - -// The greeting service definition. -service Greeter { - // Sends a greeting - rpc SayHello (HelloRequest) returns (HelloReply) {} -} - -// The request message containing the user's name. -message HelloRequest { - string name = 1; -} - -// The response message containing the greetings -message HelloReply { - string message = 1; -} diff --git a/packages/test-servers/src/index.ts b/packages/test-servers/src/index.ts index f3928e8..241a64e 100644 --- a/packages/test-servers/src/index.ts +++ b/packages/test-servers/src/index.ts @@ -1,140 +1,64 @@ -import axios from 'axios'; -import express from 'express'; -import { DataSource } from 'typeorm'; -import { v4 as uuidv4 } from 'uuid'; import grpc from '@grpc/grpc-js'; -import protoLoader from '@grpc/proto-loader'; - -// --- Protos --- -const PROTO_PATH = __dirname + '/helloworld.proto'; - -const packageDefinition = protoLoader.loadSync(PROTO_PATH, { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true, -}); -const hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld; - -// --- Postgres --- -const ordersDataSource = new DataSource({ - type: 'postgres', - host: process.env.POSTGRES_HOST || 'localhost', - port: process.env.POSTGRES_PORT ? parseInt(process.env.POSTGRES_PORT) : 5432, - username: process.env.POSTGRES_USERNAME || 'postgres', - password: process.env.POSTGRES_PASSWORD || 'postgres', - database: process.env.POSTGRES_DATABASE || 'postgres', -}); - -const postgresSchema = process.env.POSTGRES_SCHEMA || 'public'; -let ordersDataSourceInitialized = false; - -const initializeOrdersDatasource = async () => - ordersDataSource - .initialize() - .then(async () => { - console.log('Orders data Source has been initialized!'); - await ordersDataSource.query( - `CREATE TABLE IF NOT EXISTS ${postgresSchema}.orders (id varchar(50), price_in_cents int)`, - ); - ordersDataSourceInitialized = true; - console.log('Orders table has been created!'); - }) - .catch((err) => { - console.error('Error during orders data source initialization', err); - }); - -if (process.env.ORDERS_SERVICE) { - initializeOrdersDatasource(); -} - -// --- Orders Service --- -const ordersService = express(); - -ordersService.post('/orders/create', async (req, res) => { - if (!ordersDataSourceInitialized) { - await initializeOrdersDatasource(); - } - - const orderId = uuidv4(); - console.log('Creating order...'); - - try { - ordersDataSource.query( - `INSERT INTO orders (id, price_in_cents) VALUES ('${orderId}', 1000)`, - ); - } catch (err) { - console.error('Error creating order', err); +import { usersService } from './user-service'; +import { gigsService } from './gig-service'; +import { ordersService } from './orders-service'; +import { emailsService } from './email-service'; +import { biGrpcService } from './bi-grpc-service'; +import { gatewayService } from './gateway'; +import { initializeSyntheticTraffic } from './synthetic-traffic'; +import { + USERS_SERVICE_PORT, + GIGS_SERVICE_PORT, + ORDERS_SERVICE_PORT, + EMAILS_SERVICE_PORT, + GRPC_SERVICE_PORT, + GATEWAY_SERVICE_PORT, +} from './constants'; + +// --- Initialize Services --- +process.env.GATEWAY_SERVICE && + gatewayService.listen(GATEWAY_SERVICE_PORT, () => { console.log( - 'Omitting order creation, please make sure the database is running', + `Gateway service listening at http://localhost:${GATEWAY_SERVICE_PORT}`, ); - } - - // make http call - console.log('Order created! Sending email...'); - const EMAILS_SERVICE_URL = - process.env.EMAILS_SERVICE_URL || 'http://localhost:3001'; - await axios.post(`${EMAILS_SERVICE_URL}/emails/send`, { - email: 'test', - nestedObject: { test: 'test' }, }); - // make grpc call - console.log('Making gRPC call'); - const GRPC_SERVICE_URL = process.env.GRPC_SERVICE_URL || 'localhost:50051'; - const client = new (hello_proto as any).Greeter( - GRPC_SERVICE_URL, - grpc.credentials.createInsecure(), - ); - - client.sayHello({ name: 'name' }, function (_: any, response: any) { - console.log('Greeting:', response.message); +process.env.USERS_SERVICE && + usersService.listen(USERS_SERVICE_PORT, () => { + console.log( + `Users service listening at http://localhost:${USERS_SERVICE_PORT}`, + ); }); - res.send('Order created!'); -}); - -// --- Emails Service --- -const emailsService = express(); - -emailsService.post('/emails/send', (req, res) => { - console.log('Email sent!'); - res.send('Email sent!'); -}); - -// --- gRPC Service --- -const grpcServer = new grpc.Server(); - -function sayHello(call: any, callback: any) { - callback(null, { message: 'Hello ' + call.request.name }); -} -grpcServer.addService((hello_proto as any).Greeter.service, { - sayHello: sayHello, -}); - -// --- Initialize Service --- -const PORT = process.env.PORT || 3000; +process.env.GIGS_SERVICE && + gigsService.listen(GIGS_SERVICE_PORT, () => { + console.log( + `Gigs service listening at http://localhost:${GIGS_SERVICE_PORT}`, + ); + }); -if (process.env.ORDERS_SERVICE) { - ordersService.listen(PORT, () => { - console.log(`Orders service listening at http://localhost:${PORT}`); +process.env.ORDERS_SERVICE && + ordersService.listen(ORDERS_SERVICE_PORT, () => { + console.log( + `Orders service listening at http://localhost:${ORDERS_SERVICE_PORT}`, + ); }); -} -if (process.env.EMAILS_SERVICE) { - emailsService.listen(PORT, () => { - console.log(`Emails service listening at http://localhost:${PORT}`); +process.env.EMAILS_SERVICE && + emailsService.listen(EMAILS_SERVICE_PORT, () => { + console.log( + `Emails service listening at http://localhost:${EMAILS_SERVICE_PORT}`, + ); }); -} -if (process.env.GRPC_SERVICE) { - grpcServer.bindAsync( - `0.0.0.0:${PORT}`, +process.env.GRPC_SERVICE && + biGrpcService.bindAsync( + `0.0.0.0:${GRPC_SERVICE_PORT}`, grpc.ServerCredentials.createInsecure(), () => { - console.log(`gRPC service listening on port ${PORT}`); - grpcServer.start(); + console.log(`gRPC service listening on port ${GRPC_SERVICE_PORT}`); + biGrpcService.start(); }, ); -} + +process.env.NOT_INSTRUMENTED && initializeSyntheticTraffic(); diff --git a/packages/test-servers/src/orders-service.ts b/packages/test-servers/src/orders-service.ts new file mode 100644 index 0000000..e4a9679 --- /dev/null +++ b/packages/test-servers/src/orders-service.ts @@ -0,0 +1,112 @@ +import express from 'express'; +import { v4 as uuidv4 } from 'uuid'; +import { initializeDbIfNeeded, postgresDb } from './postgres'; +import { sendEmail } from './email-service'; +import { sendBiEvent } from './bi-grpc-service'; + +export const ordersService = express(); +ordersService.use(express.json()); + +ordersService.get('/orders/:orderId', async (req, res) => { + await initializeDbIfNeeded(); + + const orderId = req.params.orderId; + console.log(`Getting order ${orderId}`); + + try { + const orderRes = await postgresDb.query( + `SELECT * FROM orders WHERE id = '${orderId}'`, + ); + if (!orderRes?.length) { + throw new Error(`Order ${orderId} not found`); + } + + res.status(200); + res.send(orderRes[0]); + } catch (err) { + console.error('Error getting order', err); + res.status(500); + res.send({ error: 'Error getting order', message: err.message }); + } +}); + +ordersService.delete('/orders/:orderId', async (req, res) => { + await initializeDbIfNeeded(); + + const orderId = req.params.orderId; + console.log(`Deleting order ${orderId}`); + + try { + const orderRes = await postgresDb.query( + `SELECT * FROM orders WHERE id = '${orderId}'`, + ); + if (!orderRes?.length) { + throw new Error(`Order ${orderId} not found`); + } + + await postgresDb.query(`DELETE FROM orders WHERE id = '${orderId}'`); + + res.status(200); + res.send({ message: `Order ${orderId} deleted` }); + } catch (err) { + console.error('Error deleting order', err); + res.status(500); + res.send({ error: 'Error deleting order', message: err.message }); + } +}); + +// required body params: +// - gig_id +// - buyer_id +ordersService.post('/orders/create', async (req, res) => { + await initializeDbIfNeeded(); + + const orderId = uuidv4(); + console.log( + `Creating order ${orderId} with gig id ${req.body.gig_id} for buyer ${req.body.buyer_id}`, + ); + + try { + // check buyer exists + const userRes = await postgresDb.query( + `SELECT * FROM users WHERE id = '${req.body.buyer_id}'`, + ); + if (!userRes?.length) { + throw new Error(`User ${req.body.buyer_id} not found`); + } + const buyerId = userRes[0].id; + + // check gig exists + const existingGigs = await postgresDb.query( + `SELECT * FROM gigs WHERE id = '${req.body.gig_id}'`, + ); + if (!existingGigs?.length) { + throw new Error(`A gig with the id "${req.body.gig_id}" was not found.`); + } + const gigId = existingGigs[0].id; + const sellerId = existingGigs[0].user_id; + + // create order + postgresDb.query( + `INSERT INTO orders (id, gig_id, seller_id, buyer_id) VALUES ('${orderId}', '${gigId}', '${sellerId}', '${buyerId}}')`, + ); + + sendEmail({ + message: 'Order created!', + gigId, + buyerId, + sellerId, + id: orderId, + }); + + sendBiEvent('order_created', orderId); + + res.status(201); + res.send({ orderId }); + } catch (err) { + console.error('Error creating order', err); + + res.status(500); + res.send({ error: 'Error creating order', message: err.message }); + } +}); diff --git a/packages/test-servers/src/postgres.ts b/packages/test-servers/src/postgres.ts new file mode 100644 index 0000000..3e3a49d --- /dev/null +++ b/packages/test-servers/src/postgres.ts @@ -0,0 +1,52 @@ +import { DataSource } from 'typeorm'; + +// --- Postgres --- +export const postgresDb = new DataSource({ + type: 'postgres', + host: process.env.TEST_SERVERS_POSTGRES_HOST || 'localhost', + port: process.env.TEST_SERVERS_POSTGRES_PORT + ? parseInt(process.env.TEST_SERVERS_POSTGRES_PORT) + : 5432, + username: process.env.TEST_SERVERS_POSTGRES_USERNAME || 'postgres', + password: process.env.TEST_SERVERS_POSTGRES_PASSWORD || 'postgres', + database: process.env.TEST_SERVERS_POSTGRES_DATABASE || 'postgres', + ssl: { + rejectUnauthorized: false, + }, +}); + +const postgresSchema = process.env.TEST_SERVER_POSTGRES_SCHEMA || 'public'; +let dbInitialized = false; + +const initializeDb = async () => + postgresDb + .initialize() + .then(async () => { + console.log('Database initialized!'); + try { + await postgresDb.query( + `CREATE TABLE IF NOT EXISTS ${postgresSchema}.users (id varchar(50), name varchar(50))`, + ); + + await postgresDb.query( + `CREATE TABLE IF NOT EXISTS ${postgresSchema}.gigs (id varchar(50), user_id varchar(50), title varchar(50))`, + ); + + await postgresDb.query( + `CREATE TABLE IF NOT EXISTS ${postgresSchema}.orders (id varchar(50), gig_id varchar(50), seller_id varchar(50), buyer_id varchar(50))`, + ); + + dbInitialized = true; + } catch (err) { + console.error('Error creating postgres table', err); + } + }) + .catch((err) => { + console.error('Error during data source initialization', err); + }); + +export const initializeDbIfNeeded = async () => { + if (!dbInitialized) { + await initializeDb(); + } +}; diff --git a/packages/test-servers/src/synthetic-traffic/crud-operations.ts b/packages/test-servers/src/synthetic-traffic/crud-operations.ts new file mode 100644 index 0000000..7a55731 --- /dev/null +++ b/packages/test-servers/src/synthetic-traffic/crud-operations.ts @@ -0,0 +1,57 @@ +import axios from 'axios'; +import { GATEWAY_SERVICE_PORT } from '../constants'; +import { randNumber } from './utils'; +import { postgresDb, initializeDbIfNeeded } from '../postgres'; + +export const createUser = async () => { + const userName = `synthetic_user-${randNumber(1, 1000)}`; + const user = await axios.post<{ userId: string }>( + `http://localhost:${GATEWAY_SERVICE_PORT}/users/create`, + { + name: userName, + }, + ); + + return user.data; +}; + +export const createGig = async (sellerId: string) => { + const gigTitle = `synthetic_gig-${randNumber(1, 1000)}`; + const gig = await axios.post<{ gigId: string }>( + `http://localhost:${GATEWAY_SERVICE_PORT}/gigs/create`, + { + title: gigTitle, + user_id: sellerId, + }, + ); + + return gig.data; +}; + +export const createOrder = async (gigId: string, buyerId: string) => { + const order = await axios.post<{ orderId: string }>( + `http://localhost:${GATEWAY_SERVICE_PORT}/orders/create`, + { + gig_id: gigId, + buyer_id: buyerId, + }, + ); + + return order.data; +}; + +export const deleteOrder = async (orderId: string) => { + console.log(`Deleting order ${orderId}`); + await initializeDbIfNeeded(); + await postgresDb.query(`DELETE FROM orders WHERE id = '${orderId}'`); +}; + +export const deleteUser = async (userId: string) => { + await initializeDbIfNeeded(); + await postgresDb.query(`DELETE FROM users WHERE id = '${userId}'`); +}; + +export const deleteGig = async (gigId: string) => { + await initializeDbIfNeeded(); + await postgresDb.query(`DELETE FROM gigs WHERE id = '${gigId}'`); +}; diff --git a/packages/test-servers/src/synthetic-traffic/index.ts b/packages/test-servers/src/synthetic-traffic/index.ts new file mode 100644 index 0000000..a34e95f --- /dev/null +++ b/packages/test-servers/src/synthetic-traffic/index.ts @@ -0,0 +1,60 @@ +import { wait } from './utils'; +import { + createUser, + createGig, + createOrder, + deleteGig, + deleteOrder, + deleteUser, +} from './crud-operations'; + +const INTERVAL_MINUTES = 5; +const OPERATION_INTERVAL_MS = 5000; + +let sellerId, buyerId, gigId, orderId: string; + +export const initializeSyntheticTraffic = async () => { + await wait(OPERATION_INTERVAL_MS); + + console.log('Synthetic traffic started'); + syntheticTrafficFlow(); + setInterval(syntheticTrafficFlow, INTERVAL_MINUTES * 60 * 1000); +}; + +const syntheticTrafficFlow = async () => { + try { + const seller = await createUser(); + sellerId = seller.userId; + + await wait(OPERATION_INTERVAL_MS); + + const buyer = await createUser(); + buyerId = buyer.userId; + + await wait(OPERATION_INTERVAL_MS); + + const gig = await createGig(sellerId); + gigId = gig.gigId; + + await wait(OPERATION_INTERVAL_MS); + + const order = await createOrder(gigId, buyerId); + orderId = order.orderId; + + await wait(OPERATION_INTERVAL_MS); + + await cleanup(); + } catch (err) { + console.error('Synthetic traffic error', err); + + await cleanup(); + } +}; + +const cleanup = async () => + await Promise.allSettled([ + deleteOrder(orderId), + deleteGig(gigId), + deleteUser(buyerId), + deleteUser(sellerId), + ]); diff --git a/packages/test-servers/src/synthetic-traffic/utils.ts b/packages/test-servers/src/synthetic-traffic/utils.ts new file mode 100644 index 0000000..46efc03 --- /dev/null +++ b/packages/test-servers/src/synthetic-traffic/utils.ts @@ -0,0 +1,5 @@ +export const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +export const randNumber = (min: number, max: number) => { + return Math.floor(Math.random() * (max - min + 1)) + min; +}; diff --git a/packages/test-servers/src/user-service.ts b/packages/test-servers/src/user-service.ts new file mode 100644 index 0000000..f1f7ed5 --- /dev/null +++ b/packages/test-servers/src/user-service.ts @@ -0,0 +1,96 @@ +import express from 'express'; +import { v4 as uuidv4 } from 'uuid'; +import { initializeDbIfNeeded, postgresDb } from './postgres'; +import { sendEmail } from './email-service'; +import { sendBiEvent } from './bi-grpc-service'; + +export const usersService = express(); +usersService.use(express.json()); + +usersService.get('/users/:userId', async (req, res) => { + await initializeDbIfNeeded(); + + const userId = req.params.userId; + console.log(`Getting user ${userId}`); + + try { + const userRes = await postgresDb.query( + `SELECT * FROM users WHERE id = '${userId}'`, + ); + if (!userRes?.length) { + throw new Error(`User ${userId} not found`); + } + + res.status(200); + res.send(userRes[0]); + } catch (err) { + console.error('Error getting user', err); + res.status(500); + res.send({ error: 'Error getting user', message: err.message }); + } +}); + +usersService.delete('/users/:userId', async (req, res) => { + await initializeDbIfNeeded(); + + const userId = req.params.userId; + console.log(`Deleting user ${userId}`); + + try { + const userRes = await postgresDb.query( + `SELECT * FROM users WHERE id = '${userId}'`, + ); + if (!userRes?.length) { + throw new Error(`User ${userId} not found`); + } + + await postgresDb.query(`DELETE FROM users WHERE id = '${userId}'`); + + res.status(200); + res.send({ message: `User ${userId} deleted` }); + } catch (err) { + console.error('Error deleting user', err); + res.status(500); + res.send({ error: 'Error deleting user', message: err.message }); + } +}); + +// required body params: +// - name +usersService.post('/users/create', async (req, res) => { + await initializeDbIfNeeded(); + + const userId = uuidv4(); + console.log(`Creating user ${userId} with name ${req.body.name}`); + + try { + // check if user exists + const userRes = await postgresDb.query( + `SELECT * FROM users WHERE name = '${req.body.name}'`, + ); + if (userRes?.length) { + throw new Error(`User ${req.body.name} already exists.`); + } + + // create user + await postgresDb.query( + `INSERT INTO users (id, name) VALUES ('${userId}', '${req.body.name}')`, + ); + + sendEmail({ + message: 'User created!', + name: req.body.name, + id: userId, + }); + + sendBiEvent('user_created', userId); + + res.status(201); + res.send({ userId }); + } catch (err) { + console.error('Error creating user', err); + + res.status(500); + res.send({ error: 'Error creating user', message: err.message }); + } +});