Skip to content

Commit

Permalink
Create specified topic if non-existent (#91)
Browse files Browse the repository at this point in the history
* initial websock publisher for real-time

* package defs

* change file name

* cleanup

* tabs

* update ws package to 3.3.1

* tabs, add message when connection closed

* small tweaks

* refactor

* cloudbuild.yaml point to revised function directory namew

* fix dir

* add socketUrl as custom pubsub message attribute

* add bin to package.json

* create specified topic if not there

* template error message and exit with code

* refactor open handler for web socket connection to own routine

* cleanup
  • Loading branch information
salsferrazza authored and mservidio committed Dec 18, 2019
1 parent f5bd2e1 commit 03b7b6f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 31 deletions.
68 changes: 42 additions & 26 deletions ingestion/streaming/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,84 @@

'use strict';

// Usage: ${0} <ws url> <topic>
// Usage: pubsock <ws url> <topic>
//
// Get messages arriving via WebSocket (JSONL-formatted) and
// publish them individually to the specified topic
//
// TODO: Set message attributes from data coming over via websockets
//
//

if (process.argv.length < 4) {
console.error(`Usage: ${process.argv[0]} ${process.argv[1]} <WebSocket URL> <topic-mame>`);
console.error(`Usage: pubsock <WebSocket URL> <topic-mame>`);
process.exit(1);
}

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const socketUrl = process.argv[2];
const topicName = process.argv[3];
const WebSocket = require('ws');
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const ws = new WebSocket(socketUrl);
let topic = undefined;

const publishMessages = function() {
let topic = pubsub.topic(topicName);

ws.on('open', function open() {
console.error('Web socket connection opened');
});
ws.on('open', open);
ws.on('message', inbound);
ws.on('close', close);
}

const open = function() {
console.error('Web socket connection opened');
}

const inbound = function (data) {
try {
topic.publisher.publish(Buffer.from(data), function(err, messageId) {
try {
let payload = Buffer.from(data);
topic.publisher.publish(payload,
{ origin: socketUrl },
function(err, messageId) {
if (err) {
console.error(`could not publish message ${messageId}`);
console.error(`error in publish callback: ${err}`);
}
});
} catch(error) {
console.error(`error publishing message: ${error}`);
console.error(`caught error publishing message: ${error}`);
}
}

const close = function() {
console.error("Web socket connection closed");
console.error('Web socket connection closed');
process.exit(1);
}

let topic = pubsub.topic(topicName);
try {
if (!topic) {
pubsub.createTopic(topicName, function (err) {
if (err) {
console.error('Could not create topic: ' + JSON.stringify(err));
process.exit(1);
topic = pubsub.topic(topicName);
topic.exists(function(err, exists) {
if (err) {
console.error(`Error looking for specified topic ${topicName}: ${error}`);
process.exit(1);
} else {
if (!exists) {
console.error(`Topic ${topicName} not found, creating...`);
topic.create(function (err, topic, apiResponse) {
if (err) {
console.error(`Could not create non-existent topic ${topicName}: ${apiResponse} ${err}`);
process.exit(1);
} else {
console.error(`Created topic ${topicName}`);
publishMessages();
}
});
} else {
publishMessages();
}
});
} else {
publishMessages();
}
}
});
} catch(error) {
console.error("Error: " + error);
console.error(`Error: ${error}`);
process.exit(1);
}


2 changes: 1 addition & 1 deletion ingestion/streaming/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions ingestion/streaming/package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
{
"name": "ws2pubsub",
"name": "pubsock",
"version": "0.0.1",
"description": "BQDS WebSocket publisher",
"description": "Streams WebSocket data to Pub/Sub",
"license": "Apache-2.0",
"repository": {},
"repository": {
"type": "git",
"url": "https://github.com/GoogleCloudPlatform/bq-datashare-toolkit"
},
"dependencies": {
"ws": "~3.3.1",
"@google-cloud/pubsub": "^0.28.1"
}
},
"bin": {
"pubsock": "index.js"
},
"main": "index.js"
}

0 comments on commit 03b7b6f

Please sign in to comment.