diff --git a/flake.nix b/flake.nix index f9f70a972..c0c714183 100644 --- a/flake.nix +++ b/flake.nix @@ -279,6 +279,8 @@ }; }; + nativelink-bridge = pkgs.callPackage ./web/bridge/image.nix {inherit buildImage pullImage pkgs;}; + nativelink-worker-init = pkgs.callPackage ./tools/nativelink-worker-init.nix {inherit buildImage self nativelink-image;}; rbe-autogen = pkgs.callPackage ./local-remote-execution/rbe-autogen.nix { @@ -419,6 +421,7 @@ nativelink-worker-init nativelink-x86_64-linux publish-ghcr + nativelink-bridge ; default = nativelink; diff --git a/nativelink-config/examples/basic_bes.json b/nativelink-config/examples/basic_bes.json new file mode 100644 index 000000000..c4e91f098 --- /dev/null +++ b/nativelink-config/examples/basic_bes.json @@ -0,0 +1,177 @@ +{ + "stores": { + "AC_MAIN_STORE": { + "filesystem": { + "content_path": "/tmp/nativelink/data-worker-test/content_path-ac", + "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-ac", + "eviction_policy": { + "max_bytes": 100000000000 + } + } + }, + "BEP_STORE": { + "redis_store": { + "addresses": [ + "redis://@localhost:6379/0" + ], + "response_timeout_s": 5, + "connection_timeout_s": 5, + "experimental_pub_sub_channel": "build_event", + "key_prefix": "nativelink:", + "mode": "standard" + } + }, + "WORKER_FAST_SLOW_STORE": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data-worker-test/content_path-cas", + "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas", + "eviction_policy": { + "max_bytes": 100000000000 + } + } + }, + "slow": { + "noop": {} + } + } + } + }, + "schedulers": { + "MAIN_SCHEDULER": { + "simple": { + "supported_platform_properties": { + "cpu_count": "minimum", + "memory_kb": "minimum", + "network_kbps": "minimum", + "disk_read_iops": "minimum", + "disk_read_bps": "minimum", + "disk_write_iops": "minimum", + "disk_write_bps": "minimum", + "shm_size": "minimum", + "gpu_count": "minimum", + "gpu_model": "exact", + "cpu_vendor": "exact", + "cpu_arch": "exact", + "cpu_model": "exact", + "kernel_version": "exact", + "OSFamily": "priority", + "container-image": "priority" + } + } + } + }, + "workers": [ + { + "local": { + "worker_api_endpoint": { + "uri": "grpc://127.0.0.1:50062" + }, + "cas_fast_slow_store": "WORKER_FAST_SLOW_STORE", + "upload_action_result": { + "ac_store": "AC_MAIN_STORE" + }, + "work_directory": "/tmp/nativelink/work", + "platform_properties": { + "cpu_count": { + "values": [ + "16" + ] + }, + "memory_kb": { + "values": [ + "500000" + ] + }, + "network_kbps": { + "values": [ + "100000" + ] + }, + "cpu_arch": { + "values": [ + "x86_64" + ] + }, + "OSFamily": { + "values": [ + "" + ] + }, + "container-image": { + "values": [ + "" + ] + } + } + } + } + ], + "servers": [ + { + "name": "public", + "listener": { + "http": { + "socket_address": "0.0.0.0:50052" + } + }, + "services": { + "cas": { + "main": { + "cas_store": "WORKER_FAST_SLOW_STORE" + } + }, + "ac": { + "main": { + "ac_store": "AC_MAIN_STORE" + } + }, + "execution": { + "main": { + "cas_store": "WORKER_FAST_SLOW_STORE", + "scheduler": "MAIN_SCHEDULER" + } + }, + "capabilities": { + "main": { + "remote_execution": { + "scheduler": "MAIN_SCHEDULER" + } + } + }, + "bytestream": { + "cas_stores": { + "main": "WORKER_FAST_SLOW_STORE" + } + } + } + }, + { + "name": "private_workers_servers", + "listener": { + "http": { + "socket_address": "0.0.0.0:50062" + } + }, + "services": { + "experimental_prometheus": { + "path": "/metrics" + }, + "experimental_bep": { + "store": "BEP_STORE" + }, + "worker_api": { + "scheduler": "MAIN_SCHEDULER" + }, + "admin": {}, + "health": { + "path": "/status" + } + } + } + ], + "global": { + "max_open_files": 512 + } +} diff --git a/tools/pre-commit-hooks.nix b/tools/pre-commit-hooks.nix index e690341ae..e78747383 100644 --- a/tools/pre-commit-hooks.nix +++ b/tools/pre-commit-hooks.nix @@ -65,6 +65,7 @@ in { # Bun binary lockfile "web/platform/bun.lockb" + "web/bridge/bun.lockb" ]; enable = true; types = ["binary"]; diff --git a/web/bridge/.env.example b/web/bridge/.env.example new file mode 100644 index 000000000..0442389be --- /dev/null +++ b/web/bridge/.env.example @@ -0,0 +1,3 @@ +REDIS_URL=redis://localhost:6379 +NATIVELINK_PUB_SUB_CHANNEL=build_event +POSTGRES_URL= diff --git a/web/bridge/.gitignore b/web/bridge/.gitignore new file mode 100644 index 000000000..322378173 --- /dev/null +++ b/web/bridge/.gitignore @@ -0,0 +1,139 @@ +# Logs +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage +*.lcov + +# nyc test coverage +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# Bower dependency directory (https://bower.io/) +bower_components + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) +build/Release + +# Dependency directories +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) +web_modules/ + +# TypeScript cache +*.tsbuildinfo + +# Optional npm cache directory +.npm + +# Optional eslint cache +.eslintcache + +# Optional stylelint cache +.stylelintcache + +# Microbundle cache +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history +.node_repl_history + +# Output of 'npm pack' +*.tgz + +# Yarn Integrity file +.yarn-integrity + +# dotenv environment variable files +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) +.parcel-cache + +# Next.js build output +.next +out + +# Nuxt.js build / generate output +.nuxt +dist + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output +.vuepress/dist + +# vuepress v2.x temp and cache directory +.temp + +# Docusaurus cache and generated files +.docusaurus + +# Serverless directories +.serverless/ + +# FuseBox cache +.fusebox/ + +# DynamoDB Local files +.dynamodb/ + +# TernJS port file +.tern-port + +# Stores VSCode versions used for testing VSCode extensions +.vscode-test + +# yarn v2 +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/web/bridge/README.md b/web/bridge/README.md new file mode 100644 index 000000000..6556be50c --- /dev/null +++ b/web/bridge/README.md @@ -0,0 +1,78 @@ +# NativeLink Bridge (Experimental) + +Make sure you are running an instance of Redis or DragonflyDB in your network. + +For DragonflyDB inside docker run: + +```bash +docker run \ + -d --name some-dragonfly \ + -p 6379:6379 \ + --ulimit memlock=-1 \ + docker.dragonflydb.io/dragonflydb/dragonfly + +``` + +For Redis inside docker run: + +```bash +docker run -d --name some-redis \ + -p 6379:6379 \ + redis +``` + +Set the Redis URL and the NativeLink pub sub channel ENV variables in `.env` as defined in the `.env.example` + + +The Redis URL format: 'redis://alice:foobared@awesome.redis.server:6380' + +The NativeLink pub sub channel ENV variable should match `experimental_pub_sub_channel` inside `nativelink-config/example/basic_bes.json`. + +Make sure you have set also the `key_prefix` in `nativelink-config/example/basic_bes.json` + +## You need 4 Components + Redis + +### 1. NativeLink + +Start an instance of NativeLink with the basic_bes.json inside the `nativelink-config/example/basic_bes.json`. + +### 2. NativeLink Web Bridge + +Install the dependencies and run the bridge: + +```bash +bun i && bun run index.ts +``` + +### 3. NativeLink Web UI + +Inside the web/ui directory run: + +```bash +bun i & bun dev +``` + +Now you can open http://localhost:4321. + + +### 4. Bazel + +Now you can run your Bazel build with NativeLink and see it in real-time going into the web app + +Include this in your .bazelrc +```bash +build --remote_instance_name=main +build --remote_cache=grpc://localhost:50051 +build --remote_executor=grpc://localhost:50051 +build --bes_backend=grpc://localhost:50061 +build --bes_results_url=http://localhost:4321/builds +build --bes_upload_mode=fully_async +build --build_event_publish_all_actions=true +``` + +Make sure to use the right IP, if it's not hosted on `localhost` + + +```bash +bazel build some-target +``` diff --git a/web/bridge/bun.lockb b/web/bridge/bun.lockb new file mode 100755 index 000000000..e2abd5271 Binary files /dev/null and b/web/bridge/bun.lockb differ diff --git a/web/bridge/image.nix b/web/bridge/image.nix new file mode 100644 index 000000000..889376944 --- /dev/null +++ b/web/bridge/image.nix @@ -0,0 +1,53 @@ +{ + pkgs, + buildImage, + ... +}: let + description = "A simple Bun environment image"; + title = "Bun Environment"; + + # Separately build the dependencies with the fixed-output + # to be able to download with bun + bunDeps = pkgs.stdenv.mkDerivation { + name = "bun-deps"; + src = ./.; + nativeBuildInputs = [pkgs.bun]; + buildPhase = '' + bun install + ''; + installPhase = '' + mkdir -p $out + cp -r $src/* $out + ''; + outputHashAlgo = "sha256"; + outputHashMode = "recursive"; + outputHash = "sha256-RHq0SxdXjryEnb7aOEm9Bp7Hcq9S+bgKbkCNU17fURg="; + }; + + # Use the bunDeps for the nativelink-image + nativelink-bridge = pkgs.stdenv.mkDerivation { + name = "nativelink-bridge"; + src = bunDeps; + buildInputs = [pkgs.bun]; + installPhase = '' + mkdir -p $out + cp -r ${bunDeps}/* $out + ''; + }; +in + buildImage { + name = "nativelink-bridge"; + + # Container configuration + config = { + WorkingDir = "${nativelink-bridge}"; + Entrypoint = ["${pkgs.bun}/bin/bun" "run" "index.ts"]; + ExposedPorts = { + "8080/tcp" = {}; + }; + Labels = { + "org.opencontainers.image.description" = description; + "org.opencontainers.image.title" = title; + }; + }; + } diff --git a/web/bridge/index.ts b/web/bridge/index.ts new file mode 100644 index 000000000..a50671e29 --- /dev/null +++ b/web/bridge/index.ts @@ -0,0 +1,3 @@ +import { start } from './src'; + +start().catch(err => console.error(err)); diff --git a/web/bridge/package.json b/web/bridge/package.json new file mode 100644 index 000000000..3fd75869d --- /dev/null +++ b/web/bridge/package.json @@ -0,0 +1,22 @@ +{ + "name": "nativelink-bridge", + "version": "0.5.3", + "module": "index.ts", + "type": "module", + "dependencies": { + "drizzle-orm": "^0.36.0", + "postgres": "^3.4.5", + "protobufjs": "^7.4.0", + "redis": "^4.7.0" + }, + "devDependencies": { + "@types/bun": "^1.1.8", + "drizzle-kit": "^0.27.1" + }, + "peerDependencies": { + "typescript": "^5.0.0" + }, + "trustedDependencies": [ + "protobufjs" + ] +} diff --git a/web/bridge/src/eventHandler.ts b/web/bridge/src/eventHandler.ts new file mode 100644 index 000000000..88d27ba91 --- /dev/null +++ b/web/bridge/src/eventHandler.ts @@ -0,0 +1,75 @@ +import type protobuf from 'protobufjs'; +import type { BuildEvent, Progress } from './types/buildTypes'; +import { commandOptions, type RedisClientType } from 'redis'; +import { constructRedisKey, parseMessage } from './utils'; +import { broadcastProgress } from './websocket'; + + +export async function handleEvent(message: string, commandClient: RedisClientType, types: { PublishBuildToolEventStreamRequest: protobuf.Type, PublishLifecycleEventRequest: protobuf.Type }) { + switch (parseMessage(message).eventType) { + case 'LifecycleEvent': + await fetchAndDecodeBuildData(constructRedisKey(parseMessage(message)), commandClient, types.PublishLifecycleEventRequest); + break; + case 'BuildToolEventStream': + await fetchAndDecodeBuildData(constructRedisKey(parseMessage(message)), commandClient, types.PublishBuildToolEventStreamRequest); + break; + default: + console.log('Unknown event type:', parseMessage(message).eventType); + } +} + +async function fetchAndDecodeBuildData(redisKey: string, commandClient: RedisClientType, messageType: protobuf.Type) { + try { + const buildData = await commandClient.get(commandOptions({ returnBuffers: true }), redisKey); + if (buildData) { + const decodedMessage = messageType.decode(new Uint8Array(Buffer.from(buildData))) as BuildEvent; + if(decodedMessage.orderedBuildEvent) { + const buildId = decodedMessage.orderedBuildEvent.streamId.buildId + const invocationId = decodedMessage.orderedBuildEvent.streamId.invocationId + console.log("Build ID: ", buildId) + console.log("Invocation ID: ", invocationId) + const eventTime = decodedMessage.orderedBuildEvent.event.eventTime; + const milliseconds = eventTime.seconds.low * 1000 + Math.floor(eventTime.nanos / 1000000); + const eventDate = new Date(milliseconds); + console.log("Event time nanos:", eventTime.nanos) + console.log("Event time seconds:", eventTime.seconds.low) + console.log("Event time:", eventDate.toISOString()); + const currentTime = new Date() + const elapsedTime = currentTime.getTime() - eventDate.getTime(); + console.log("Time Now: ", currentTime.toISOString()) + console.log(`Elapsed Time: ${elapsedTime} ms`); + } + if (decodedMessage?.orderedBuildEvent?.event?.bazelEvent) { + console.log("------------------") + decodeBazelEvent(decodedMessage.orderedBuildEvent.event.bazelEvent, messageType.root); + } + } + } catch (err) { + console.error(`Error fetching build data for key ${redisKey}:`, err); + } +} + +// TODO(SchahinRohani): Add Bazel Event Types +// biome-ignore lint/suspicious/noExplicitAny: Bazel Event Types are not known yet +function decodeBazelEvent(bazelEvent: any, root: protobuf.Root): any { + if (!bazelEvent || !bazelEvent.value) return null; + const messageType = root.lookupType(bazelEvent.typeUrl.split('/').pop()); + const decodedMessage = messageType.decode(new Uint8Array(Buffer.from(bazelEvent.value, 'base64'))); + const decodedObject = messageType.toObject(decodedMessage, { + longs: String, + enums: String, + bytes: String, + }); + if (decodedObject.progress) { + console.log("Processing progress information...\n\n"); + processProgress(decodedObject.progress); + } + return decodedObject; +} + +function processProgress(progress: Progress) { + if (progress.stderr) { + console.log(progress.stderr); + broadcastProgress(progress.stderr) + } +} diff --git a/web/bridge/src/http.ts b/web/bridge/src/http.ts new file mode 100644 index 000000000..e2e83bef3 --- /dev/null +++ b/web/bridge/src/http.ts @@ -0,0 +1,37 @@ +import { serve } from "bun"; + +export const startWebServer = () => { + console.log('\nHTTP server is running on http://localhost:3001\n'); + serve({ + port: 3001, + fetch(req) { + const url = new URL(req.url); + const handler = routes.get(url.pathname); + if (handler){ + return handler(); + } + return new Response("Not Found", { status: 404 }); + }, + }); +}; + +const routes = new Map Response>([ + ["/api", () => jsonResponse({ message: "Hello from API" })], + ["/health", () => jsonResponse({ status: "ok" })], + ["/readiness", () => jsonResponse({ status: "ready" })], + ]); + + +function jsonResponse(data: object, status: number = 200): Response { + const responseData = { + ...data, + timestamp: new Date().toISOString(), + }; + return new Response(JSON.stringify(responseData), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + }, + status, + }); + } diff --git a/web/bridge/src/index.ts b/web/bridge/src/index.ts new file mode 100644 index 000000000..7296d255a --- /dev/null +++ b/web/bridge/src/index.ts @@ -0,0 +1,73 @@ +import { initializeRedisClients } from './redis'; +import { initializeProtobuf } from './protobuf'; +import { handleEvent } from './eventHandler'; +import { startWebSocket } from './websocket'; +import { startWebServer } from './http'; + + +export async function start() { + // Base URL + const github = "https://raw.githubusercontent.com" + + // NativeLink URL + const nativelinkRepo = "TraceMachina/nativelink" + const nativelinkBranch = "main" + const nativelinkProtoPath = `${github}/${nativelinkRepo}/${nativelinkBranch}/nativelink-proto/`; + + // Proto Remote Path + const protoRepo = "protocolbuffers/protobuf" + const protoBranch = "master" + const protoRepoPath = `${github}/${protoRepo}/${protoBranch}/main/src/google/protobuf`; + const protoDevToolsPath = `${github}/${protoRepo}/main/src/google/devtools/build/v1`; + + const googleProto = "googleapis/googleapis" + const googleProtoBranch = "master" + const googleProtoPath = `${github}/${googleProto}/${googleProtoBranch}/google/devtools/build/v1`; + + // Bazel Remote Path + const bazelRepo = "bazelbuild/bazel" + const bazelBranch = "master" + const bazelProtoPath = `${github}/${bazelRepo}/${bazelBranch}/src/main/java/com/google/devtools/build/lib/buildeventstream/proto`; + + // TODO(SchahinRohani): Add Buck2 Protos for future Buck2 support + // Buck2 Protos + // const buck2Repo = "facebook/buck2/main" + // const buck2Branch = "main" + // const buck2ProtoPath = `${github}/${buck2Repo}/${buck2Branch}/app/buck2_data/data.proto`; + + // Actual using Protos. + const PublishBuildEventProto =`${googleProtoPath}/publish_build_event.proto`; + const BazelBuildEventStreamProto = `${bazelProtoPath}/build_event_stream.proto`; + + const protos = [ PublishBuildEventProto, BazelBuildEventStreamProto ] + + console.info("Link to: \n") + console.info("Google Publish Build Events Proto:\n", PublishBuildEventProto, "\n"); + console.info("Bazel Build Event Stream Proto:\n", BazelBuildEventStreamProto, "\n") + + // Load Remote Bazel Proto Files + const protoTypes = await initializeProtobuf(protos) + + const { redisClient, commandClient } = await initializeRedisClients(); + + // Subscribe to the build_event channel + await redisClient.subscribe(process.env.NATIVELINK_PUB_SUB_CHANNEL || "build_event", async (message: string) => { + await handleEvent(message, commandClient, protoTypes); + }); + + const websocketServer = startWebSocket(); + const webServer = startWebServer(); + + process.on('SIGINT', async () => { + await redisClient.disconnect(); + await commandClient.disconnect(); + console.info("Received SIGINT. Shutdown gracefully.") + process.exit(); + }); + process.on('SIGTERM', async () => { + await redisClient.disconnect(); + await commandClient.disconnect(); + console.info("Received SIGTERM. Shutdown gracefully.") + process.exit(); + }); +} diff --git a/web/bridge/src/protobuf.ts b/web/bridge/src/protobuf.ts new file mode 100644 index 000000000..1aaace85e --- /dev/null +++ b/web/bridge/src/protobuf.ts @@ -0,0 +1,98 @@ +import protobuf from 'protobufjs'; + +export async function initializeProtobuf(protos: string[]) { + console.log("Loading Remote Proto Files"); + + // Create a new Root instance + const combinedRoot = new protobuf.Root(); + + // Track loaded files to avoid circular dependencies + const loadedFiles: Record = {}; + + // Track processed imports to avoid duplicates + const processedImports = new Set(); + + // Load all initial proto files + for (const proto of protos) { + await loadProto(loadedFiles, combinedRoot, proto, processedImports); + } + console.log("\nDone parsing all proto files.\n"); + // Now combinedRoot contains your parsed .proto content + // Example: Look up specific message types + const BazelBuildEvent = combinedRoot.lookupType("build_event_stream.BuildEvent"); + const PublishBuildToolEventStreamRequest = combinedRoot.lookupType("google.devtools.build.v1.PublishBuildToolEventStreamRequest"); + const PublishLifecycleEventRequest = combinedRoot.lookupType("google.devtools.build.v1.PublishLifecycleEventRequest"); + + console.log("Loaded Types:\n"); + console.log({ + PublishLifecycleEventRequest: PublishLifecycleEventRequest ? PublishLifecycleEventRequest.fullName : "Not found", + PublishBuildToolEventStreamRequest: PublishBuildToolEventStreamRequest ? PublishBuildToolEventStreamRequest.fullName : "Not found", + BazelBuildEvent: BazelBuildEvent ? BazelBuildEvent.fullName : "Not found" + }); + + return { + PublishLifecycleEventRequest, + PublishBuildToolEventStreamRequest, + BazelBuildEvent + }; +} + +function resolveImportPath(protoUrl: string, importPath: string): string { + // Handle googleapis imports + if (importPath.startsWith("google/api") || importPath.startsWith("google/devtools/build/v1")) { + return `https://raw.githubusercontent.com/googleapis/googleapis/master/${importPath}`; + } + + // Handle protocolbuffers imports + if (importPath.startsWith("google/protobuf")) { + return `https://raw.githubusercontent.com/protocolbuffers/protobuf/master/src/${importPath}`; + } + + // Handle specific case for bazel + if (importPath.includes("com/google/devtools/build/lib/packages/metrics") || importPath.startsWith("src/main/protobuf")) { + return `https://raw.githubusercontent.com/bazelbuild/bazel/master/${importPath}`; + } + + // Default behavior for other imports - resolve relative to protoUrl + return new URL(importPath, protoUrl).toString(); +} + +// Recursive function to fetch, parse, and handle imports +async function loadProto( + loadedFiles: Record, + root: protobuf.Root, + protoUrl: string, + processedImports: Set, + indentLevel = 0, +) { + if (loadedFiles[protoUrl]) { + // If already loaded, skip to prevent circular imports + return; + } + + // Fetch the .proto file content + const response = await fetch(protoUrl); + if (!response.ok) { + throw new Error(`Failed to fetch .proto file from ${protoUrl}: ${response.statusText}`); + } + + // Parse the proto content + const parsedProto = protobuf.parse(await response.text(), root); + // Mark this proto as loaded + loadedFiles[protoUrl] = true; + // Log the imports necessary for this proto file + if (indentLevel < 1) { + console.log(`\n${ ' '.repeat(indentLevel)} ${protoUrl}:`); + } + if (parsedProto.imports && parsedProto.imports.length > 0) { + for (const importPath of parsedProto.imports) { + const resolvedImportUrl = resolveImportPath(protoUrl, importPath); + if (!processedImports.has(resolvedImportUrl)) { + console.log(`${ ' '.repeat(indentLevel)} - ${importPath}`); + processedImports.add(resolvedImportUrl); + // Recursively handle the imports + await loadProto(loadedFiles, root, resolvedImportUrl, processedImports, indentLevel + 1,); + } + } + } +} diff --git a/web/bridge/src/redis.ts b/web/bridge/src/redis.ts new file mode 100644 index 000000000..8154e9512 --- /dev/null +++ b/web/bridge/src/redis.ts @@ -0,0 +1,25 @@ +import { createClient, type RedisClientType } from 'redis'; + +export async function initializeRedisClients() { + try { + const redisClient: RedisClientType = createClient({ + url: process.env.REDIS_URL, + }); + const commandClient = redisClient.duplicate(); + + redisClient.on('error', (err) => { + console.error('Redis Client Error:', err); + throw new Error('Failed to connect to Redis.'); + }); + + await redisClient.connect(); + await commandClient.connect(); + + console.log('\nRedis clients successfully connected.\n'); + + return { redisClient, commandClient }; + } catch (error) { + console.error('Error during Redis client initialization:', error); + throw new Error('Unable to initialize Redis clients. Please check your connection.'); + } +} diff --git a/web/bridge/src/types/buildTypes.ts b/web/bridge/src/types/buildTypes.ts new file mode 100644 index 000000000..fdde7c7fc --- /dev/null +++ b/web/bridge/src/types/buildTypes.ts @@ -0,0 +1,20 @@ +export interface BuildEvent extends protobuf.Message { + orderedBuildEvent: { + streamId: { + buildId: string; + invocationId: string; + }, + event: { + eventTime: { + seconds: protobuf.Long; + nanos: number; + }; + // biome-ignore lint/suspicious/noExplicitAny: Not known yet + bazelEvent?: any; + }; + }; + } + + export type Progress = { + stderr: string; + }; diff --git a/web/bridge/src/utils.ts b/web/bridge/src/utils.ts new file mode 100644 index 000000000..53face425 --- /dev/null +++ b/web/bridge/src/utils.ts @@ -0,0 +1,28 @@ +type ParsedMessage = { + prefix: string; + eventType: string; + eventID: string; + subEventID: string; + sequenceNumber: string; +} + +export function parseMessage(message: string) { + const parts = message.split(':'); + const [prefix, eventType, eventID, subEventID, sequenceNumber] = parts; + return { + prefix, + eventType, + eventID, + subEventID, + sequenceNumber + }; +} + +export function constructRedisKey(parsedMessage: ParsedMessage) { + console.log("\nNew Published Event: ") + console.log(" EventID: ", parsedMessage.eventID) + console.log(" Sequence Number: ", parsedMessage.sequenceNumber) + console.log(" Invocation ID: ", parsedMessage.subEventID) + console.log("------------------") + return `${parsedMessage.prefix}:${parsedMessage.eventType}:${parsedMessage.eventID}:${parsedMessage.subEventID}:${parsedMessage.sequenceNumber}`; +} diff --git a/web/bridge/src/websocket.ts b/web/bridge/src/websocket.ts new file mode 100644 index 000000000..68e9f7c35 --- /dev/null +++ b/web/bridge/src/websocket.ts @@ -0,0 +1,42 @@ +import type { ServerWebSocket } from "bun"; + +const clients = new Set>(); + +export const startWebSocket = () => { + console.log('\nWebSocket server is running on ws://localhost:8080\n'); + Bun.serve({ + port: 8080, + fetch(req, server) { + // Upgrade the request to a WebSocket + // Here we can also do the websocket auth/token auth + if (server.upgrade(req)) { + return; + } + return new Response("Upgrade failed", { status: 500 }); + }, + websocket: { + open(ws) { + console.log('New client connected'); + clients.add(ws); + ws.send("Hello Web Client") + }, + message(ws, message) { + console.log('Received message from web client:', message); + }, + close(ws) { + console.log('Web Client disconnected'); + clients.delete(ws); + }, + drain(ws) { + console.log('Ready to receive more data'); + }, + }, +});} + +export function broadcastProgress(progress: string) { + const buffer = Buffer.from(progress) + console.log(progress) + for (const ws of clients) { + ws.send(new Uint8Array(buffer)); + } +} diff --git a/web/bridge/tsconfig.json b/web/bridge/tsconfig.json new file mode 100644 index 000000000..238655f2c --- /dev/null +++ b/web/bridge/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + // Enable latest features + "lib": ["ESNext", "DOM"], + "target": "ESNext", + "module": "ESNext", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true, + "noEmit": true, + + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false + } +}