diff --git a/lib/http-routes/api/create-call.js b/lib/http-routes/api/create-call.js index 381edeb21..7faa7d7a9 100644 --- a/lib/http-routes/api/create-call.js +++ b/lib/http-routes/api/create-call.js @@ -17,6 +17,7 @@ const { mergeSdpMedia, extractSdpMedia, removeVideoSdp } = require('../../utils/ const { createCallSchema, customSanitizeFunction } = require('../schemas/create-call'); const { selectHostPort } = require('../../utils/network'); const { JAMBONES_DIAL_SBC_FOR_REGISTERED_USER } = require('../../config'); +const { createMediaEndpoint } = require('../../utils/media-endpoint'); const removeNullProperties = (obj) => (Object.keys(obj).forEach((key) => obj[key] === null && delete obj[key]), obj); const removeNulls = (req, res, next) => { @@ -67,7 +68,7 @@ router.post('/', const { lookupAppBySid } = srf.locals.dbHelpers; - const {getSBC, getFreeswitch} = srf.locals; + const {getSBC} = srf.locals; let sbcAddress = getSBC(); if (!sbcAddress) throw new Error('no available SBCs for outbound call creation'); const target = restDial.to; @@ -170,9 +171,7 @@ router.post('/', } /* create endpoint for outdial */ - const ms = getFreeswitch(); - if (!ms) throw new Error('no available Freeswitch for outbound call creation'); - const ep = await ms.createEndpoint(); + const ep = await createMediaEndpoint(srf, logger); logger.debug(`createCall: successfully allocated endpoint, sending INVITE to ${sbcAddress}`); /* launch outdial */ @@ -181,7 +180,7 @@ router.post('/', let localSdp = ep.local.sdp; if (req.body.dual_streams) { - dualEp = await ms.createEndpoint(); + dualEp = await createMediaEndpoint(srf, logger); localSdp = mergeSdpMedia(localSdp, dualEp.local.sdp); } if (process.env.JAMBONES_VIDEO_CALLS_ENABLED_IN_FS) { diff --git a/lib/session/call-session.js b/lib/session/call-session.js index bb8d4cd27..15a796dd7 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -29,10 +29,6 @@ const { JAMBONES_INJECT_CONTENT, JAMBONES_EAGERLY_PRE_CACHE_AUDIO, AWS_REGION, - JAMBONES_USE_FREESWITCH_TIMER_FD, - JAMBONES_MEDIA_TIMEOUT_MS, - JAMBONES_MEDIA_HOLD_TIMEOUT_MS, - JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS } = require('../config'); const bent = require('bent'); const BackgroundTaskManager = require('../utils/background-task-manager'); @@ -40,7 +36,7 @@ const dbUtils = require('../utils/db-utils'); const BADPRECONDITIONS = 'preconditions not met'; const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction'; const { NonFatalTaskError} = require('../utils/error'); -const { sleepFor } = require('../utils/helpers'); +const { createMediaEndpoint } = require('../utils/media-endpoint'); const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks WHERE webhook_sid = ( @@ -2310,8 +2306,7 @@ Duration=${duration} ` // need to allocate an endpoint try { - if (!this.ms) this.ms = this.getMS(); - const ep = await this.ms.createEndpoint({ + const ep = await this._createMediaEndpoint({ headers: { 'X-Jambones-Call-ID': this.callId, }, @@ -2320,7 +2315,6 @@ Duration=${duration} ` //ep.cs = this; this.ep = ep; this.logger.info(`allocated endpoint ${ep.uuid}`); - this._configMsEndpoint(); this.ep.on('destroy', () => { this.logger.debug(`endpoint was destroyed!! ${this.ep.uuid}`); @@ -2391,9 +2385,6 @@ Duration=${duration} ` this.logger.error('CallSession:replaceEndpoint cannot be called without stable dlg'); return; } - // When this call kicked out from conference, session need to replace endpoint - // but this.ms might be undefined/null at this case. - this.ms = this.ms || this.getMS(); // Destroy previous ep if it's still running. if (this.ep?.connected) this.ep.destroy(); @@ -2418,8 +2409,7 @@ Duration=${duration} ` * This prevents call failures during media renegotiation. */ - this.ep = await this.ms.createEndpoint(); - this._configMsEndpoint(); + this.ep = await this._createMediaEndpoint(); const sdp = await this.dlg.modify(this.ep.local.sdp); await this.ep.modify(sdp); @@ -2679,15 +2669,8 @@ Duration=${duration} ` async createOrRetrieveEpAndMs() { if (this.ms && this.ep) return {ms: this.ms, ep: this.ep}; - // get a media server - if (!this.ms) { - const ms = this.srf.locals.getFreeswitch(); - if (!ms) throw new Error('no available freeswitch'); - this.ms = ms; - } if (!this.ep) { - this.ep = await this.ms.createEndpoint({remoteSdp: this.req.body}); - this._configMsEndpoint(); + this.ep = await this._createMediaEndpoint({remoteSdp: this.req.body}); } return {ms: this.ms, ep: this.ep}; } @@ -2840,8 +2823,7 @@ Duration=${duration} ` async reAnchorMedia(currentMediaRoute = MediaPath.PartialMedia) { assert(this.dlg && this.dlg.connected && !this.ep); - this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp}); - this._configMsEndpoint(); + this.ep = await this._createMediaEndpoint({remoteSdp: this.dlg.remote.sdp}); await this.dlg.modify(this.ep.local.sdp, { headers: { 'X-Reason': 'anchor-media' @@ -2916,59 +2898,13 @@ Duration=${duration} ` } } - _configMsEndpoint() { - this._enableInbandDtmfIfRequired(this.ep); - this.ep.once('destroy', this._handleMediaTimeout.bind(this)); - const opts = { - ...(this.onHoldMusic && {holdMusic: `shout://${this.onHoldMusic.replace(/^https?:\/\//, '')}`}), - ...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'}), - ...(JAMBONES_MEDIA_TIMEOUT_MS && {media_timeout: JAMBONES_MEDIA_TIMEOUT_MS}), - ...(JAMBONES_MEDIA_HOLD_TIMEOUT_MS && {media_hold_timeout: JAMBONES_MEDIA_HOLD_TIMEOUT_MS}) - }; - if (Object.keys(opts).length > 0) { - this.ep.set(opts); - } - - const origDestroy = this.ep.destroy.bind(this.ep); - this.ep.destroy = async() => { - try { - if (this.currentTask?.name === TaskName.Transcribe && JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS) { - // transcribe task is being used, wait for some time before destroy - // if final transcription is received but endpoint is already closed, - // freeswitch module will not be able to send the transcription - - this.logger.debug('callSession:_configMsEndpoint -' + - ' transcribe task, wait for some time before destroy'); - await sleepFor(JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS); - } - await origDestroy(); - } catch (err) { - this.logger.error(err, 'callSession:_configMsEndpoint - error destroying endpoint'); - } - }; - } - async _handleMediaTimeout(evt) { - if (evt.reason === 'MEDIA_TIMEOUT' && !this.callGone) { + if (evt?.reason === 'MEDIA_TIMEOUT' && !this.callGone) { this.logger.info('CallSession:_handleMediaTimeout: received MEDIA_TIMEOUT, hangup the call'); this._jambonzHangup('Media Timeout'); } } - async _enableInbandDtmfIfRequired(ep) { - if (ep.inbandDtmfEnabled) return; - // only enable inband dtmf detection if voip carrier dtmf_type === tones - if (this.inbandDtmfEnabled) { - // https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about - try { - ep.execute('start_dtmf'); - ep.inbandDtmfEnabled = true; - } catch (err) { - this.logger.info(err, 'CallSession:_enableInbandDtmf - error enable inband DTMF'); - } - } - } - /** * notifyTaskError - only used when websocket connection is used instead of webhooks */ @@ -3020,6 +2956,18 @@ Duration=${duration} ` }); } + async _enableInbandDtmfIfRequired(ep) { + if (ep.inbandDtmfEnabled) return; + // only enable inband dtmf detection if voip carrier dtmf_type === tones + if (this.inbandDtmfEnabled) { + // https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about + ep.execute('start_dtmf').catch((err) => { + this.logger.info({err}, 'CallSession:_enableInbandDtmfIfRequired - error starting DTMF'); + }); + ep.inbandDtmfEnabled = true; + } + } + _clearTasks(backgroundGather, evt) { if (this.requestor instanceof WsRequestor && !backgroundGather.cleared) { this.logger.debug({evt}, 'CallSession:_clearTasks on event from background gather'); @@ -3127,6 +3075,16 @@ Duration=${duration} ` } } + async _createMediaEndpoint(drachtioFsmrfOptions = {}) { + return await createMediaEndpoint(this.srf, this.logger, { + activeMs: this.getMS(), + drachtioFsmrfOptions, + onHoldMusic: this.onHoldMusic, + inbandDtmfEnabled: this.inbandDtmfEnabled, + mediaTimeoutHandler: this._handleMediaTimeout.bind(this), + }); + } + getFormattedConversation(numTurns) { const turns = this.conversationTurns.slice(-numTurns); if (turns.length === 0) return null; diff --git a/lib/session/siprec-call-session.js b/lib/session/siprec-call-session.js index 8cfed7057..d796bdd4f 100644 --- a/lib/session/siprec-call-session.js +++ b/lib/session/siprec-call-session.js @@ -45,12 +45,11 @@ class SipRecCallSession extends InboundCallSession { async answerSipRecCall() { try { - this.ms = this.getMS(); let remoteSdp = this.sdp1.replace(/sendonly/, 'sendrecv'); - this.ep = await this.ms.createEndpoint({remoteSdp}); + this.ep = await this._createMediaEndpoint({remoteSdp}); //this.logger.debug({remoteSdp, localSdp: this.ep.local.sdp}, 'SipRecCallSession - allocated first endpoint'); remoteSdp = this.sdp2.replace(/sendonly/, 'sendrecv'); - this.ep2 = await this.ms.createEndpoint({remoteSdp}); + this.ep2 = await this._createMediaEndpoint({remoteSdp}); //this.logger.debug({remoteSdp, localSdp: this.ep2.local.sdp}, 'SipRecCallSession - allocated second endpoint'); await this.ep.bridge(this.ep2); const combinedSdp = await createSipRecPayload(this.ep.local.sdp, this.ep2.local.sdp, this.logger); diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index c90a0c5a4..37df89d13 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -844,6 +844,10 @@ class TaskGather extends SttTask { } _onTranscription(cs, ep, evt, fsEvent) { + // check if we are in graceful shutdown mode + if (ep.gracefulShutdownResolver) { + ep.gracefulShutdownResolver(); + } // make sure this is not a transcript from answering machine detection const bugname = fsEvent.getHeader('media-bugname'); const finished = fsEvent.getHeader('transcription-session-finished'); @@ -1092,7 +1096,8 @@ class TaskGather extends SttTask { if (this.canFallback) { ep.stopTranscription({ vendor: this.vendor, - bugname: this.bugname + bugname: this.bugname, + gracefulShutdown: false }) .catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`)); try { diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index 08db9a563..16b68b20c 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -137,13 +137,18 @@ class TaskTranscribe extends SttTask { stopTranscription = true; this.ep.stopTranscription({ vendor: this.vendor, - bugname: this.bugname + bugname: this.bugname, + gracefulShutdown: this.paused ? false : true }) .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); } if (this.transcribing2 && this.ep2?.connected) { stopTranscription = true; - this.ep2.stopTranscription({vendor: this.vendor, bugname: this.bugname}) + this.ep2.stopTranscription({ + vendor: this.vendor, + bugname: this.bugname, + gracefulShutdown: this.paused ? false : true + }) .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); } @@ -421,6 +426,10 @@ class TaskTranscribe extends SttTask { } async _onTranscription(cs, ep, channel, evt, fsEvent) { + // check if we are in graceful shutdown mode + if (ep.gracefulShutdownResolver) { + ep.gracefulShutdownResolver(); + } // make sure this is not a transcript from answering machine detection const bugname = fsEvent.getHeader('media-bugname'); const finished = fsEvent.getHeader('transcription-session-finished'); @@ -703,7 +712,8 @@ class TaskTranscribe extends SttTask { if (this.canFallback) { _ep.stopTranscription({ vendor: this.vendor, - bugname: this.bugname + bugname: this.bugname, + gracefulShutdown: false }) .catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`)); try { diff --git a/lib/utils/amd-utils.js b/lib/utils/amd-utils.js index d1057d741..cd47fc94e 100644 --- a/lib/utils/amd-utils.js +++ b/lib/utils/amd-utils.js @@ -418,7 +418,11 @@ module.exports = (logger) => { } if (ep.connected) { - ep.stopTranscription({vendor, bugname}) + ep.stopTranscription({ + vendor, + bugname, + gracefulShutdown: false + }) .catch((err) => logger.info(err, 'stopAmd: Error stopping transcription')); task.emit('amd', {type: AmdEvents.Stopped}); ep.execute('avmd_stop').catch((err) => this.logger.info(err, 'Error stopping avmd')); diff --git a/lib/utils/media-endpoint.js b/lib/utils/media-endpoint.js new file mode 100644 index 000000000..aedb8628f --- /dev/null +++ b/lib/utils/media-endpoint.js @@ -0,0 +1,115 @@ +const { + JAMBONES_USE_FREESWITCH_TIMER_FD, + JAMBONES_MEDIA_TIMEOUT_MS, + JAMBONES_MEDIA_HOLD_TIMEOUT_MS, + JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS, +} = require('../config'); +const { sleepFor } = require('./helpers'); + +const createMediaEndpoint = async(srf, logger, { + activeMs, + drachtioFsmrfOptions = {}, + onHoldMusic, + inbandDtmfEnabled, + mediaTimeoutHandler, +} = {}) => { + const { getFreeswitch } = srf.locals; + const ms = activeMs || getFreeswitch(); + if (!ms) + throw new Error('no available Freeswitch for creating media endpoint'); + + const ep = await ms.createEndpoint(drachtioFsmrfOptions); + + // Configure the endpoint + const opts = { + ...(onHoldMusic && {holdMusic: `shout://${onHoldMusic.replace(/^https?:\/\//, '')}`}), + ...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'}), + ...(JAMBONES_MEDIA_TIMEOUT_MS && {media_timeout: JAMBONES_MEDIA_TIMEOUT_MS}), + ...(JAMBONES_MEDIA_HOLD_TIMEOUT_MS && {media_hold_timeout: JAMBONES_MEDIA_HOLD_TIMEOUT_MS}) + }; + if (Object.keys(opts).length > 0) { + ep.set(opts); + } + // inbandDtmfEnabled + if (inbandDtmfEnabled) { + // https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about + ep.execute('start_dtmf').catch((err) => { + logger.error('Error starting inband DTMF', { error: err }); + }); + ep.inbandDtmfEnabled = true; + } + // Handle Media Timeout + if (mediaTimeoutHandler) { + ep.once('destroy', (evt) => { + mediaTimeoutHandler(evt, ep); + }); + } + // Handle graceful shutdown for endpoint if required + if (JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS > 0) { + const getEpGracefulShutdownPromise = () => { + if (!ep.gracefulShutdownPromise) { + ep.gracefulShutdownPromise = new Promise((resolve) => { + // this resolver will be called when stt task received transcription. + ep.gracefulShutdownResolver = () => { + resolve(); + ep.gracefulShutdownPromise = null; + }; + }); + } + return ep.gracefulShutdownPromise; + }; + + const gracefulShutdownHandler = async() => { + // resolve when one of the following happens: + // 1. stt task received transcription + // 2. JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS passed + await Promise.race([ + getEpGracefulShutdownPromise(), + sleepFor(JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS) + ]); + }; + + const origStartTranscription = ep.startTranscription.bind(ep); + ep.startTranscription = async(...args) => { + try { + const result = await origStartTranscription(...args); + ep.isTranscribeActive = true; + return result; + } catch (err) { + ep.isTranscribeActive = false; + throw err; + } + }; + + const origStopTranscription = ep.stopTranscription.bind(ep); + ep.stopTranscription = async(opts = {}, ...args) => { + const { gracefulShutdown = true, ...others } = opts; + if (ep.isTranscribeActive && gracefulShutdown) { + // only wait for graceful shutdown if transcription is active + await gracefulShutdownHandler(); + } + try { + const result = await origStopTranscription({...others}, ...args); + ep.isTranscribeActive = false; + return result; + } catch (err) { + ep.isTranscribeActive = false; + throw err; + } + }; + + const origDestroy = ep.destroy.bind(ep); + ep.destroy = async() => { + if (ep.isTranscribeActive) { + await gracefulShutdownHandler(); + } + return await origDestroy(); + }; + } + + return ep; +}; + +module.exports = { + createMediaEndpoint, +}; diff --git a/lib/utils/place-outdial.js b/lib/utils/place-outdial.js index b4649b90a..d205b2582 100644 --- a/lib/utils/place-outdial.js +++ b/lib/utils/place-outdial.js @@ -16,13 +16,7 @@ const crypto = require('crypto'); const HttpRequestor = require('./http-requestor'); const WsRequestor = require('./ws-requestor'); const {makeOpusFirst, removeVideoSdp} = require('./sdp-utils'); -const { - JAMBONES_USE_FREESWITCH_TIMER_FD, - JAMBONES_MEDIA_TIMEOUT_MS, - JAMBONES_MEDIA_HOLD_TIMEOUT_MS, - JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS -} = require('../config'); -const { sleepFor } = require('./helpers'); +const { createMediaEndpoint } = require('./media-endpoint'); class SingleDialer extends Emitter { constructor({logger, sbcAddress, target, opts, application, callInfo, accountInfo, rootSpan, startSpan, dialTask, @@ -98,6 +92,7 @@ class SingleDialer extends Emitter { }; } this.ms = ms; + this.srf = srf; let uri, to, inviteSpan; try { switch (this.target.type) { @@ -139,8 +134,7 @@ class SingleDialer extends Emitter { this.updateCallStatus = srf.locals.dbHelpers.updateCallStatus; this.serviceUrl = srf.locals.serviceUrl; - this.ep = await ms.createEndpoint(); - this._configMsEndpoint(); + this.ep = await this._createMediaEndpoint(); this.logger.debug(`SingleDialer:exec - created endpoint ${this.ep.uuid}`); /** @@ -352,43 +346,19 @@ class SingleDialer extends Emitter { } } - _configMsEndpoint() { - const opts = { - ...(this.onHoldMusic && {holdMusic: `shout://${this.onHoldMusic.replace(/^https?:\/\//, '')}`}), - ...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'}), - ...(JAMBONES_MEDIA_TIMEOUT_MS && {media_timeout: JAMBONES_MEDIA_TIMEOUT_MS}), - ...(JAMBONES_MEDIA_HOLD_TIMEOUT_MS && {media_hold_timeout: JAMBONES_MEDIA_HOLD_TIMEOUT_MS}) - }; - if (Object.keys(opts).length > 0) { - this.ep.set(opts); - } - if (this.dialTask?.inbandDtmfEnabled && !this.ep.inbandDtmfEnabled) { - // https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about - try { - this.ep.execute('start_dtmf'); - this.ep.inbandDtmfEnabled = true; - } catch (err) { - this.logger.info(err, 'place-outdial:_configMsEndpoint - error enable inband DTMF'); - } - } + async _handleMediaTimeout(evt, ep) { + this.logger.info({evt}, 'SingleDialer:_handleMediaTimeout - media timeout event received'); + this.dialTask.kill(this.dialTask.cs, 'media-timeout'); + } - const origDestroy = this.ep.destroy.bind(this.ep); - this.ep.destroy = async() => { - try { - if (this.dialTask.transcribeTask && JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS) { - // transcribe task is being used, wait for some time before destroy - // if final transcription is received but endpoint is already closed, - // freeswitch module will not be able to send the transcription - - this.logger.info('SingleDialer:_configMsEndpoint -' + - ' Dial with transcribe task, wait for some time before destroy'); - await sleepFor(JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS); - } - await origDestroy(); - } catch (err) { - this.logger.error(err, 'SingleDialer:_configMsEndpoint - error destroying endpoint'); - } - }; + async _createMediaEndpoint(drachtioFsmrfOptions = {}) { + return await createMediaEndpoint(this.srf, this.logger, { + acactiveMs: this.ms, + drachtioFsmrfOptions, + onHoldMusic: this.onHoldMusic, + inbandDtmfEnabled: this.dialTask?.inbandDtmfEnabled, + mediaTimeoutHandler: this._handleMediaTimeout.bind(this), + }); } /** @@ -528,8 +498,7 @@ class SingleDialer extends Emitter { assert(this.dlg && this.dlg.connected && !this.ep); this.logger.debug('SingleDialer:reAnchorMedia: re-anchoring media after partial media'); - this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp}); - this._configMsEndpoint(); + this.ep = await this._createMediaEndpoint({remoteSdp: this.dlg.remote.sdp}); await this.dlg.modify(this.ep.local.sdp, { headers: { 'X-Reason': 'anchor-media'