From 03b7b6fd5b5d3445eca3926b6023b0c3d4c04ada Mon Sep 17 00:00:00 2001 From: Salvatore Sferrazza Date: Tue, 17 Dec 2019 21:13:41 -0500 Subject: [PATCH] Create specified topic if non-existent (#91) * initial websock publisher for real-time * package defs * change file name * cleanup * tabs * update ws package to 3.3.1 * tabs, add message when connection closed * small tweaks * refactor * cloudbuild.yaml point to revised function directory namew * fix dir * add socketUrl as custom pubsub message attribute * add bin to package.json * create specified topic if not there * template error message and exit with code * refactor open handler for web socket connection to own routine * cleanup --- ingestion/streaming/index.js | 68 +++++++++++++++++---------- ingestion/streaming/package-lock.json | 2 +- ingestion/streaming/package.json | 15 ++++-- 3 files changed, 54 insertions(+), 31 deletions(-) diff --git a/ingestion/streaming/index.js b/ingestion/streaming/index.js index 0abe4313b..8d379d071 100755 --- a/ingestion/streaming/index.js +++ b/ingestion/streaming/index.js @@ -17,68 +17,84 @@ 'use strict'; -// Usage: ${0} +// Usage: pubsock // // Get messages arriving via WebSocket (JSONL-formatted) and // publish them individually to the specified topic +// +// TODO: Set message attributes from data coming over via websockets +// +// if (process.argv.length < 4) { - console.error(`Usage: ${process.argv[0]} ${process.argv[1]} `); + console.error(`Usage: pubsock `); process.exit(1); } - -const {PubSub} = require('@google-cloud/pubsub'); -const pubsub = new PubSub(); - const socketUrl = process.argv[2]; const topicName = process.argv[3]; const WebSocket = require('ws'); +const {PubSub} = require('@google-cloud/pubsub'); +const pubsub = new PubSub(); const ws = new WebSocket(socketUrl); +let topic = undefined; const publishMessages = function() { let topic = pubsub.topic(topicName); - - ws.on('open', function open() { - console.error('Web socket connection opened'); - }); + ws.on('open', open); ws.on('message', inbound); ws.on('close', close); +} +const open = function() { + console.error('Web socket connection opened'); } const inbound = function (data) { - try { - topic.publisher.publish(Buffer.from(data), function(err, messageId) { + try { + let payload = Buffer.from(data); + topic.publisher.publish(payload, + { origin: socketUrl }, + function(err, messageId) { if (err) { - console.error(`could not publish message ${messageId}`); + console.error(`error in publish callback: ${err}`); } }); } catch(error) { - console.error(`error publishing message: ${error}`); + console.error(`caught error publishing message: ${error}`); } } const close = function() { - console.error("Web socket connection closed"); + console.error('Web socket connection closed'); process.exit(1); } -let topic = pubsub.topic(topicName); try { - if (!topic) { - pubsub.createTopic(topicName, function (err) { - if (err) { - console.error('Could not create topic: ' + JSON.stringify(err)); - process.exit(1); + topic = pubsub.topic(topicName); + topic.exists(function(err, exists) { + if (err) { + console.error(`Error looking for specified topic ${topicName}: ${error}`); + process.exit(1); + } else { + if (!exists) { + console.error(`Topic ${topicName} not found, creating...`); + topic.create(function (err, topic, apiResponse) { + if (err) { + console.error(`Could not create non-existent topic ${topicName}: ${apiResponse} ${err}`); + process.exit(1); + } else { + console.error(`Created topic ${topicName}`); + publishMessages(); + } + }); } else { publishMessages(); } - }); - } else { - publishMessages(); - } + } + }); } catch(error) { - console.error("Error: " + error); + console.error(`Error: ${error}`); + process.exit(1); } diff --git a/ingestion/streaming/package-lock.json b/ingestion/streaming/package-lock.json index 9d926814a..50227a7d7 100644 --- a/ingestion/streaming/package-lock.json +++ b/ingestion/streaming/package-lock.json @@ -1,5 +1,5 @@ { - "name": "ws2pubsub", + "name": "pubsock", "version": "0.0.1", "lockfileVersion": 1, "requires": true, diff --git a/ingestion/streaming/package.json b/ingestion/streaming/package.json index 4f6be9ba9..76d06ab8f 100644 --- a/ingestion/streaming/package.json +++ b/ingestion/streaming/package.json @@ -1,11 +1,18 @@ { - "name": "ws2pubsub", + "name": "pubsock", "version": "0.0.1", - "description": "BQDS WebSocket publisher", + "description": "Streams WebSocket data to Pub/Sub", "license": "Apache-2.0", - "repository": {}, + "repository": { + "type": "git", + "url": "https://github.com/GoogleCloudPlatform/bq-datashare-toolkit" + }, "dependencies": { "ws": "~3.3.1", "@google-cloud/pubsub": "^0.28.1" - } + }, + "bin": { + "pubsock": "index.js" + }, + "main": "index.js" }