Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions lib/http-routes/api/create-call.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const { mergeSdpMedia, extractSdpMedia, removeVideoSdp } = require('../../utils/
const { createCallSchema, customSanitizeFunction } = require('../schemas/create-call');
const { selectHostPort } = require('../../utils/network');
const { JAMBONES_DIAL_SBC_FOR_REGISTERED_USER } = require('../../config');
const { createMediaEndpoint } = require('../../utils/media-endpoint');

const removeNullProperties = (obj) => (Object.keys(obj).forEach((key) => obj[key] === null && delete obj[key]), obj);
const removeNulls = (req, res, next) => {
Expand Down Expand Up @@ -67,7 +68,7 @@ router.post('/',
const {
lookupAppBySid
} = srf.locals.dbHelpers;
const {getSBC, getFreeswitch} = srf.locals;
const {getSBC} = srf.locals;
let sbcAddress = getSBC();
if (!sbcAddress) throw new Error('no available SBCs for outbound call creation');
const target = restDial.to;
Expand Down Expand Up @@ -170,9 +171,7 @@ router.post('/',
}

/* create endpoint for outdial */
const ms = getFreeswitch();
if (!ms) throw new Error('no available Freeswitch for outbound call creation');
const ep = await ms.createEndpoint();
const ep = await createMediaEndpoint(srf, logger);
logger.debug(`createCall: successfully allocated endpoint, sending INVITE to ${sbcAddress}`);

/* launch outdial */
Expand All @@ -181,7 +180,7 @@ router.post('/',
let localSdp = ep.local.sdp;

if (req.body.dual_streams) {
dualEp = await ms.createEndpoint();
dualEp = await createMediaEndpoint(srf, logger);
localSdp = mergeSdpMedia(localSdp, dualEp.local.sdp);
}
if (process.env.JAMBONES_VIDEO_CALLS_ENABLED_IN_FS) {
Expand Down
98 changes: 28 additions & 70 deletions lib/session/call-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,14 @@ const {
JAMBONES_INJECT_CONTENT,
JAMBONES_EAGERLY_PRE_CACHE_AUDIO,
AWS_REGION,
JAMBONES_USE_FREESWITCH_TIMER_FD,
JAMBONES_MEDIA_TIMEOUT_MS,
JAMBONES_MEDIA_HOLD_TIMEOUT_MS,
JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS
} = require('../config');
const bent = require('bent');
const BackgroundTaskManager = require('../utils/background-task-manager');
const dbUtils = require('../utils/db-utils');
const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
const { NonFatalTaskError} = require('../utils/error');
const { sleepFor } = require('../utils/helpers');
const { createMediaEndpoint } = require('../utils/media-endpoint');
const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks
WHERE webhook_sid =
(
Expand Down Expand Up @@ -2310,8 +2306,7 @@ Duration=${duration} `

// need to allocate an endpoint
try {
if (!this.ms) this.ms = this.getMS();
const ep = await this.ms.createEndpoint({
const ep = await this._createMediaEndpoint({
headers: {
'X-Jambones-Call-ID': this.callId,
},
Expand All @@ -2320,7 +2315,6 @@ Duration=${duration} `
//ep.cs = this;
this.ep = ep;
this.logger.info(`allocated endpoint ${ep.uuid}`);
this._configMsEndpoint();

this.ep.on('destroy', () => {
this.logger.debug(`endpoint was destroyed!! ${this.ep.uuid}`);
Expand Down Expand Up @@ -2391,9 +2385,6 @@ Duration=${duration} `
this.logger.error('CallSession:replaceEndpoint cannot be called without stable dlg');
return;
}
// When this call kicked out from conference, session need to replace endpoint
// but this.ms might be undefined/null at this case.
this.ms = this.ms || this.getMS();
// Destroy previous ep if it's still running.
if (this.ep?.connected) this.ep.destroy();

Expand All @@ -2418,8 +2409,7 @@ Duration=${duration} `
* This prevents call failures during media renegotiation.
*/

this.ep = await this.ms.createEndpoint();
this._configMsEndpoint();
this.ep = await this._createMediaEndpoint();

const sdp = await this.dlg.modify(this.ep.local.sdp);
await this.ep.modify(sdp);
Expand Down Expand Up @@ -2679,15 +2669,8 @@ Duration=${duration} `
async createOrRetrieveEpAndMs() {
if (this.ms && this.ep) return {ms: this.ms, ep: this.ep};

// get a media server
if (!this.ms) {
Comment thread
davehorton marked this conversation as resolved.
const ms = this.srf.locals.getFreeswitch();
if (!ms) throw new Error('no available freeswitch');
this.ms = ms;
}
if (!this.ep) {
this.ep = await this.ms.createEndpoint({remoteSdp: this.req.body});
this._configMsEndpoint();
this.ep = await this._createMediaEndpoint({remoteSdp: this.req.body});
}
return {ms: this.ms, ep: this.ep};
}
Expand Down Expand Up @@ -2840,8 +2823,7 @@ Duration=${duration} `
async reAnchorMedia(currentMediaRoute = MediaPath.PartialMedia) {
assert(this.dlg && this.dlg.connected && !this.ep);

this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp});
this._configMsEndpoint();
this.ep = await this._createMediaEndpoint({remoteSdp: this.dlg.remote.sdp});
await this.dlg.modify(this.ep.local.sdp, {
headers: {
'X-Reason': 'anchor-media'
Expand Down Expand Up @@ -2916,59 +2898,13 @@ Duration=${duration} `
}
}

_configMsEndpoint() {
this._enableInbandDtmfIfRequired(this.ep);
this.ep.once('destroy', this._handleMediaTimeout.bind(this));
const opts = {
...(this.onHoldMusic && {holdMusic: `shout://${this.onHoldMusic.replace(/^https?:\/\//, '')}`}),
...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'}),
...(JAMBONES_MEDIA_TIMEOUT_MS && {media_timeout: JAMBONES_MEDIA_TIMEOUT_MS}),
...(JAMBONES_MEDIA_HOLD_TIMEOUT_MS && {media_hold_timeout: JAMBONES_MEDIA_HOLD_TIMEOUT_MS})
};
if (Object.keys(opts).length > 0) {
this.ep.set(opts);
}

const origDestroy = this.ep.destroy.bind(this.ep);
this.ep.destroy = async() => {
try {
if (this.currentTask?.name === TaskName.Transcribe && JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS) {
// transcribe task is being used, wait for some time before destroy
// if final transcription is received but endpoint is already closed,
// freeswitch module will not be able to send the transcription

this.logger.debug('callSession:_configMsEndpoint -' +
' transcribe task, wait for some time before destroy');
await sleepFor(JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS);
}
await origDestroy();
} catch (err) {
this.logger.error(err, 'callSession:_configMsEndpoint - error destroying endpoint');
}
};
}

async _handleMediaTimeout(evt) {
if (evt.reason === 'MEDIA_TIMEOUT' && !this.callGone) {
if (evt?.reason === 'MEDIA_TIMEOUT' && !this.callGone) {
this.logger.info('CallSession:_handleMediaTimeout: received MEDIA_TIMEOUT, hangup the call');
this._jambonzHangup('Media Timeout');
}
}

async _enableInbandDtmfIfRequired(ep) {
if (ep.inbandDtmfEnabled) return;
// only enable inband dtmf detection if voip carrier dtmf_type === tones
if (this.inbandDtmfEnabled) {
// https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about
try {
ep.execute('start_dtmf');
ep.inbandDtmfEnabled = true;
} catch (err) {
this.logger.info(err, 'CallSession:_enableInbandDtmf - error enable inband DTMF');
}
}
}

/**
* notifyTaskError - only used when websocket connection is used instead of webhooks
*/
Expand Down Expand Up @@ -3020,6 +2956,18 @@ Duration=${duration} `
});
}

async _enableInbandDtmfIfRequired(ep) {
if (ep.inbandDtmfEnabled) return;
// only enable inband dtmf detection if voip carrier dtmf_type === tones
if (this.inbandDtmfEnabled) {
// https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about
ep.execute('start_dtmf').catch((err) => {
this.logger.info({err}, 'CallSession:_enableInbandDtmfIfRequired - error starting DTMF');
});
ep.inbandDtmfEnabled = true;
}
}

_clearTasks(backgroundGather, evt) {
if (this.requestor instanceof WsRequestor && !backgroundGather.cleared) {
this.logger.debug({evt}, 'CallSession:_clearTasks on event from background gather');
Expand Down Expand Up @@ -3127,6 +3075,16 @@ Duration=${duration} `
}
}

async _createMediaEndpoint(drachtioFsmrfOptions = {}) {
return await createMediaEndpoint(this.srf, this.logger, {
activeMs: this.getMS(),
drachtioFsmrfOptions,
onHoldMusic: this.onHoldMusic,
inbandDtmfEnabled: this.inbandDtmfEnabled,
mediaTimeoutHandler: this._handleMediaTimeout.bind(this),
});
}

getFormattedConversation(numTurns) {
const turns = this.conversationTurns.slice(-numTurns);
if (turns.length === 0) return null;
Expand Down
5 changes: 2 additions & 3 deletions lib/session/siprec-call-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ class SipRecCallSession extends InboundCallSession {

async answerSipRecCall() {
try {
this.ms = this.getMS();
let remoteSdp = this.sdp1.replace(/sendonly/, 'sendrecv');
this.ep = await this.ms.createEndpoint({remoteSdp});
this.ep = await this._createMediaEndpoint({remoteSdp});
//this.logger.debug({remoteSdp, localSdp: this.ep.local.sdp}, 'SipRecCallSession - allocated first endpoint');
remoteSdp = this.sdp2.replace(/sendonly/, 'sendrecv');
this.ep2 = await this.ms.createEndpoint({remoteSdp});
this.ep2 = await this._createMediaEndpoint({remoteSdp});
//this.logger.debug({remoteSdp, localSdp: this.ep2.local.sdp}, 'SipRecCallSession - allocated second endpoint');
await this.ep.bridge(this.ep2);
const combinedSdp = await createSipRecPayload(this.ep.local.sdp, this.ep2.local.sdp, this.logger);
Expand Down
7 changes: 6 additions & 1 deletion lib/tasks/gather.js
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,10 @@ class TaskGather extends SttTask {
}

_onTranscription(cs, ep, evt, fsEvent) {
// check if we are in graceful shutdown mode
if (ep.gracefulShutdownResolver) {
ep.gracefulShutdownResolver();
}
// make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname');
const finished = fsEvent.getHeader('transcription-session-finished');
Expand Down Expand Up @@ -1092,7 +1096,8 @@ class TaskGather extends SttTask {
if (this.canFallback) {
ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
bugname: this.bugname,
gracefulShutdown: false
})
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
try {
Expand Down
16 changes: 13 additions & 3 deletions lib/tasks/transcribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,18 @@ class TaskTranscribe extends SttTask {
stopTranscription = true;
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
bugname: this.bugname,
gracefulShutdown: this.paused ? false : true
})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
}
if (this.transcribing2 && this.ep2?.connected) {
stopTranscription = true;
this.ep2.stopTranscription({vendor: this.vendor, bugname: this.bugname})
this.ep2.stopTranscription({
vendor: this.vendor,
bugname: this.bugname,
gracefulShutdown: this.paused ? false : true
})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
}

Expand Down Expand Up @@ -421,6 +426,10 @@ class TaskTranscribe extends SttTask {
}

async _onTranscription(cs, ep, channel, evt, fsEvent) {
// check if we are in graceful shutdown mode
if (ep.gracefulShutdownResolver) {
ep.gracefulShutdownResolver();
}
// make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname');
const finished = fsEvent.getHeader('transcription-session-finished');
Expand Down Expand Up @@ -703,7 +712,8 @@ class TaskTranscribe extends SttTask {
if (this.canFallback) {
_ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
bugname: this.bugname,
gracefulShutdown: false
})
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
try {
Expand Down
6 changes: 5 additions & 1 deletion lib/utils/amd-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,11 @@ module.exports = (logger) => {
}

if (ep.connected) {
ep.stopTranscription({vendor, bugname})
ep.stopTranscription({
vendor,
bugname,
gracefulShutdown: false
})
.catch((err) => logger.info(err, 'stopAmd: Error stopping transcription'));
task.emit('amd', {type: AmdEvents.Stopped});
ep.execute('avmd_stop').catch((err) => this.logger.info(err, 'Error stopping avmd'));
Expand Down
Loading