Skip to content

Commit

Permalink
fix write checks
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jan 16, 2024
1 parent c34d274 commit 86f85f5
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 40 deletions.
4 changes: 1 addition & 3 deletions packages/logger/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default class Logger {

err(message) {
if (!['ERROR', 'WARN', 'INFO', 'DEBUG'].includes(this?.log_level))
return
return
this.logger.error(message);
this.write(message)
}
Expand All @@ -41,8 +41,6 @@ export default class Logger {
}

debug(message) {
if (!['DEBUG'].includes(this?.log_level))
return
this.logger.debug(message);
this.write(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class WebsocketAdapterDefault {
* @returns promise<result?>
*/
async check_write(){
this.$.logger.debug(`check_write()`)
const ev = JSON.stringify(['EVENT', this.config?.event_sample || this.$.SAMPLE_EVENT])
console.log(ev)
this.$.ws.send(ev)
}

Expand Down Expand Up @@ -76,6 +78,7 @@ class WebsocketAdapterDefault {
*/
handle_nostr_event(buffer){
const ev = JSON.parse(buffer.toString())
console.log(ev[0])
if(ev[0] === 'EVENT') {
if(this.$.subid('read') === ev[1])
this.$.on_event(ev[1], ev[2])
Expand Down
60 changes: 31 additions & 29 deletions packages/nocap/src/classes/Base.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default class {
//
this.config = new ConfigInterface(config)
this.results = new ResultInterface()
this.session = new SessionHelper()
this.session = new SessionHelper(url)
this.timeouts = new TimeoutHelper(this.session)
this.latency = new LatencyHelper(this.session)
this.promises = new DeferredWrapper(this.session, this.timeouts)
Expand Down Expand Up @@ -102,7 +102,7 @@ export default class {
else {
return this.throw(`check(${keys}) failed. keys must be one (string) or several (array of strings) of: ${this.checks.join(', ')}`)
}
if(this.isConnected()) this.close()
// if(this.isConnected()) this.close()
return headers? result: this.results.cleanResult(keys, result)
}

Expand Down Expand Up @@ -134,12 +134,11 @@ export default class {
* @param {string} key - The key associated with the timeout
* @returns {Function} - The reject function
*/
maybeTimeoutReject(key){
maybeTimeoutReject(key){
return (reject) => {
if(this.isWebsocketKey(key))
return reject({ data: false, duration: -1, status: "error", message: `Websocket connection to relay timed out (after ${this.config.timeout[key]}ms}` })
else
return reject({ data: {}, duration: -1, status: "error", message: `${key} check timed out (after ${this.config.timeout[key]}ms}` })
const message = `${key} check timed out (after ${this.config.timeout[key]}ms}`
this.logger.debug(message)
return reject({ data: {}, duration: -1, status: "error", message})
}
}

Expand Down Expand Up @@ -178,14 +177,14 @@ export default class {
throw new Error('Key must be string')

if( !this?.adapters?.[adapter]?.[`check_${key}`] )
throw new Error(`check_${key} method not found in ${adapter} Adapter, key should be 'connect', 'read', or 'write'`)
throw new Error(`check_${key} method not found in ${adapter} Adapter`)

this.precheck(key)
.then(() => {
.then(async () => {
this.logger.debug(`${key}: precheck resolved`)
this.current = key
this.latency.start(key)
this.adapters[adapter][`check_${key}`]()
await this.adapters[adapter][`check_${key}`]()
})
.catch((precheck) => {
let reason
Expand Down Expand Up @@ -221,15 +220,15 @@ export default class {
this.current = null
this.latency.finish(key)
const result = this.produce_result(key, data)
if(this.ignore_result(key)) return
if(this.ignore_result(key)) return this.logger.debug(`ignoring result ${key}`)
this.results.setMany(result)
this.promises.get(key).resolve(result)
const deferred = await this.promises.resolve(key, result)
this.on_change()
}

/**
* ignore_result
* Determines if the result for a given key should be ignore
* Determines if the result for a given key should be ignored
*
* @private
* @param {string} key - The key to check for ignoring
Expand Down Expand Up @@ -302,7 +301,7 @@ export default class {
const keyIsConnect = key === 'connect'
const resolvePrecheck = precheckDeferred.resolve
const rejectPrecheck = precheckDeferred.reject
const connectAttempted = this.promises.exists('connect') && this.promises.reflect('connect').state.isFulfilled
const connectAttempted = this.promises.exists('connect') && this.promises.reflect('connect').state.isPending

const waitForConnection = async () => {
this.logger.debug(`${key}: waitForConnection()`)
Expand Down Expand Up @@ -345,15 +344,16 @@ export default class {
return rejectPrecheck({ status: "error", message: `Cannot check ${key}, websocket connection could not be established` })
}


//Websocket is open, key is connect, reject precheck and directly resolve check deferred promise with cached result to bypass starting the connect check.
if(keyIsConnect && this.isConnected()) {
this.logger.debug(`${key}: prechecker(): websocket is open, key is connect`)
// this.logger.debug(`precheck(${key}):prechecker():websocket is open, key is connect`)
rejectPrecheck({ status: "error", message: 'Cannot check connect because websocket is already connected, returning cached result'})
}
//Websocket is not connecting, key is not connect
//Websocket is not connected, key is not connect
if( !keyIsConnect && !this.isConnected()) {
this.logger.debug(`${key}: prechecker(): websocket is not connecting, key is not connect`)
this.logger.debug(`${key}: prechecker(): websocket is not connected, key is not connect`)
return rejectPrecheck({ status: "error", message: `Cannot check ${key}, no active websocket connection to relay` })
}

Expand Down Expand Up @@ -399,12 +399,12 @@ export default class {
return
if(this.adapters.websocket?.unsubscribe)
return this.adapters.websocket.unsubscribe()
this.maybeExecuteAdapterMethod(
'websocket',
'close',
() => this.ws.send(['CLOSE', subid]),
subid
)
// this.maybeExecuteAdapterMethod(
// 'websocket',
// 'close',
// () => this.ws.send(['CLOSE', subid]),
// subid
// )
}

/**
Expand All @@ -414,6 +414,7 @@ export default class {
* @private
*/
close(){
this.logger.debug(`close()`)
if( this.isClosing() || !this.isConnected() || this.isClosed())
return
this.maybeExecuteAdapterMethod(
Expand Down Expand Up @@ -515,7 +516,6 @@ export default class {
this.track('relay', 'event', ev.id)
if(this?.adapters?.websocket?.handle_event)
this.adapters.websocket.handle_event(subid, ev)
this.handle_read_check(true)
}

/**
Expand Down Expand Up @@ -557,8 +557,7 @@ export default class {
on_ok(ok){
this.cbcall('ok')
this.handle_ok(ok)
if(this.promises.reflect('write').state.isPending)
this.handle_write_check(true)
// if(this.promises.reflect('write').state.isPending)
}

/**
Expand Down Expand Up @@ -625,6 +624,7 @@ export default class {
* @returns null
*/
handle_write_check(data){
this.logger.debug('handle_write_checked()')
this.finish('write', { data })
}

Expand All @@ -645,8 +645,9 @@ export default class {
* @private
* @returns null
*/
handle_ok(){

handle_ok(ok){
this.logger.debug('handle_ok()', "ok?", ok)
this.handle_write_check(true)
}

/**
Expand Down Expand Up @@ -755,7 +756,7 @@ export default class {
return altFn(...args)
}
catch(err){
throw new Error(`Provided alternative functiiion: Threw error using default method: ${err}, the respective adapter should probably define this method instead` )
throw new Error(`Provided alternative function: Threw error using default method: ${err}, the respective adapter should probably define this method instead` )
}
}
}
Expand Down Expand Up @@ -854,9 +855,10 @@ export default class {
* @returns {Promise<*>} - The promise of the deferred
*/
async addDeferred(key, cb=()=>{}){
this.logger.debug(`addDeferred('${key}')`)
const existingDeferred = this.promises.exists(key)
if(existingDeferred)
await this.promises.get(key).promise
return this.promises.get(key).promise
this.promises.add(key, this.config?.timeout?.[key], cb)
return this.promises.get(key)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/nocap/src/classes/Base.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { fetch } from 'cross-fetch';

import { createMockRelay, faker, MockRelay } from "vitest-nostr";

let url = `wss://history.nostr.watch`
let url = `wss://relay.damus.io`
let nocap

beforeAll(async () => {});
Expand Down Expand Up @@ -309,7 +309,7 @@ describe("Nocap class", () => {

it("clean results should return connect result", async () => {
const response = await nocap.check(method, false)
console.log(response)
console.log("check('dns')", response)
expect(response).toBeTypeOf('object');
expect(response).toHaveProperty(method);
expect(response[method]).toBeTypeOf('object');
Expand Down
16 changes: 11 additions & 5 deletions packages/nocap/src/classes/DeferredWrapper.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import Deferred from 'promise-deferred'
import Logger from "@nostrwatch/logger"


export class DeferredWrapper {
constructor($session, $timeout){
this.promises = {}
this.timeout = $timeout
this.$session = $session
this.logger = new Logger(this.$session.url)
}

add(key, timeout, timeoutCb){
Expand All @@ -24,15 +27,16 @@ export class DeferredWrapper {
return deferred
}

resolve(key, result){
async resolve(key, result){
this.logger.debug(`deferred:resolve("${key}")`, `has timeout: ${this.timeout.has(key)}`)
if(this.timeout.has(key))
clearTimeout(this.timeout.get(key))
this.get(key).resolve(result)
this.timeout.clear(key)
return this.get(key).resolve(result)
}

reject(key, error){
if(this.timeout.has(key))
clearTimeout(this.timeout.get(key))
this.timeout.clear(key)
this.get(key).reject(error)
}

Expand Down Expand Up @@ -62,11 +66,13 @@ export class DeferredWrapper {
}

exists(key){
return this.promises?.[this.session()]?.[key]
this.logger.debug(`deferred:exists("${key}")`)
return typeof this.promises?.[this.session()]?.[key] === 'object'
}

get(key){
const deferred = this.promises[this.session()][key]
this.logger.debug(`deferred:get("${key}"), exists: ${typeof deferred !== 'undefined'}`)
return deferred
}

Expand Down
3 changes: 2 additions & 1 deletion packages/nocap/src/classes/SessionHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import murmurhash from 'murmurhash'
import { random } from '../utils.js'

export class SessionHelper {
constructor(){
constructor(url){
this.url = url
this.init()
this.initial = true
}
Expand Down

0 comments on commit 86f85f5

Please sign in to comment.