Skip to content

Commit

Permalink
Revert "feat(condo): DOMA-8441 Reuse opened connection to send a batc…
Browse files Browse the repository at this point in the history
…h of VOIP pushes & use RedisGuard to prevent opening multiple connection with APNS (#4391)" (#4503)

This reverts commit 5a160b2.
  • Loading branch information
VKislov authored Mar 22, 2024
1 parent 6748e31 commit fe2e23e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 68 deletions.
37 changes: 13 additions & 24 deletions apps/condo/domains/notification/adapters/apple/AppleMessaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@ const logger = getLogger('AppleMessaging')
class AppleMessaging {
#token = null
#session = null
#url = null

/**
* @param config
*/
constructor (config) {
if (typeof config.url !== 'string') throw new Error('config.url not provided to AppleMessaging instance')
this.#token = new AppleJSONWebToken(config)
this.#url = config.url
this.#session = new AppleSession(config.url)
this.getResponseHandler = this.getResponseHandler.bind(this)
this.sendPush = this.sendPush.bind(this)
}

getResponseHandler (stream, resolve, reject) {
const reqErrorHandler = error => {
stream.close()
this.#session.errorHandler(error)
}

return (headers, flags) => {
Expand Down Expand Up @@ -102,7 +101,7 @@ class AppleMessaging {
* @return {Promise} A promise that resolves if the request is successful or rejects
* with an error
*/
async sendPush (pushToken, payload, options = {}) {
sendPush (pushToken, payload, options = {}) {
return new Promise((resolve, reject) => {
if (!payload) return reject(Error('Parameter `payload` is required'))
if (!pushToken) return reject(Error('Parameter `pushToken` is required'))
Expand All @@ -129,23 +128,19 @@ class AppleMessaging {

logger.info({ msg: 'sendPush before request', headers, options, payload })

this.#session.request(headers)
.then(stream => {
stream.on('response', this.getResponseHandler(stream, resolve, reject))
stream.on('error', err => {
logger.error({ msg: 'sendPush errored', headers, options, payload, err })
return resolve(err)
})
stream.write(buffer)
stream.end()
})
.catch(err => reject(err))
const stream = this.#session.request(headers)

stream.on('response', this.getResponseHandler(stream, resolve, reject))
stream.on('error', (err) => {
logger.error({ msg: 'sendPush errored', headers, options, payload, err })
return resolve(err)
})
stream.write(buffer)
stream.end()
})
}

async sendAll (notifications, isVoIP = false ) {
this.#session = new AppleSession(this.#url)
await this.#session.connect()
const responses = []
let successCount = 0, failureCount = 0

Expand All @@ -169,12 +164,7 @@ class AppleMessaging {

let response
for (let retryCounter = 0; retryCounter < RETRY_RESTRICTION; retryCounter++) {

try {
response = await this.sendPush(token, payload, options)
} catch (err) {
logger.error({ msg: 'http request failed', err })
}
response = await this.sendPush(token, payload, options)
if (response instanceof Error) {
logger.warn({ msg: `sendPush not successful on ${retryCounter + 1} try`, err: response })
continue
Expand All @@ -197,7 +187,6 @@ class AppleMessaging {
}
}

await this.#session.disconnect()
return { responses, successCount, failureCount }
}
}
Expand Down
60 changes: 21 additions & 39 deletions apps/condo/domains/notification/adapters/apple/AppleSession.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ const http2 = require('http2')
const { isEmpty, get } = require('lodash')

const { getLogger } = require('@open-condo/keystone/logging')
const { getRedisClient } = require('@open-condo/keystone/redis')

const { getCurrTimeStamp } = require('@condo/domains/common/utils/date')
const { RedisGuard } = require('@condo/domains/user/utils/serverSchema/guards')

const SESSION_LIFE_TIME = 60 * 60 * 24
const SESSION_PING_INTERVAL = 60
Expand All @@ -28,9 +26,6 @@ class AppleSession {
#session = null
#expires = null
#timerId = null
#redisGuard = new RedisGuard()
#redisClient = getRedisClient()
#channel = 'guard_lock:apple_session'

constructor (url = APPLE_API_ENDPOINT) {
this.disconnect = this.disconnect.bind(this)
Expand All @@ -51,10 +46,9 @@ class AppleSession {
/**
* Closes session if it is still alive, cleans session data and ping interval
*/
async disconnect () {
disconnect () {
if (this.#timerId) clearInterval(this.#timerId)
if (this.#validateSession()) this.#session.close()
await this.#redisGuard.unlock('apple_session', 'connect')

this.#session = null
this.#expires = null
Expand All @@ -72,59 +66,47 @@ class AppleSession {
* Handles session errors, closes session, logs errors
* @param error
*/
async errorHandler (error) {
await this.disconnect()
errorHandler (error) {
this.disconnect()

logger.error({ msg: 'sessionErrorHandler', error })
}

async #setupConnection (currTime) {
this.#expires = currTime + SESSION_LIFE_TIME
await this.#redisGuard.lock('apple_session', 'connect')
this.#session = http2.connect(this.#ENDPOINT)

this.#session.on('error', this.errorHandler)
this.#session.on('socketError', this.errorHandler)
this.#session.on('goaway', this.errorHandler)

this.#timerId = setInterval(this.pingHandler, SESSION_PING_INTERVAL * 1000)
}

/**
* Created new session if not alive yet, forced or previous session is expired. Does nothing in other case.
* Sets error handler on some session events. Sets ping interval to check session health.
* @param force
*/
async connect (force = false) {
#connect (force = false) {
const currTime = getCurrTimeStamp()
const isExpired = !isEmpty(this.#expires) && currTime >= this.#expires

if (this.#validateSession() && !force && !isExpired) return
if (isExpired) await this.disconnect()

const isLocked = await this.#redisGuard.isLocked('apple_session', 'connect')
if (isLocked) {
this.#redisClient.subscribe(this.#channel)
this.#redisClient.on('message', async (channel, message) => {
if (message === 'key_deleted') {
await this.#setupConnection()
return
}
})
} else {
await this.#setupConnection()
}
if (isExpired) this.disconnect()

this.#expires = currTime + SESSION_LIFE_TIME
this.#session = http2.connect(this.#ENDPOINT)

this.#session.on('error', this.errorHandler)
this.#session.on('socketError', this.errorHandler)
this.#session.on('goaway', this.errorHandler)

this.#timerId = setInterval(this.pingHandler, SESSION_PING_INTERVAL * 1000)
}

async request (...args) {
return await this.#session.request(...args)
request (...args) {
this.#connect()

return this.#session.request(...args)
}

/**
* Creates session if no alive session available then returns current session object
* @returns {session}
*/
async get () {
get () {
this.#connect()

return this.#session
}
}
Expand Down
5 changes: 0 additions & 5 deletions apps/condo/domains/user/utils/serverSchema/guards.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ class RedisGuard {
await this.redis.expire(`${this.lockPrefix}${actionFolder}${variable}`, ttl )
}

async unlock (variable, action = '') {
const actionFolder = action ? `${action}:` : ''
await this.redis.del(`${this.lockPrefix}${actionFolder}${variable}`)
}

}

module.exports = {
Expand Down

0 comments on commit fe2e23e

Please sign in to comment.