From 03fb531f5c5063f29049274f753b9912a7adf28c Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Thu, 3 Apr 2025 11:44:43 +0400 Subject: [PATCH 1/7] Update changestreammonitor.js Adds capability to generate the monitor's output in CSV (with custom delimiter) and in JSON format. Still pending: update documentation. --- .../changestreammonitor.js | 117 ++++++++++++++++-- 1 file changed, 107 insertions(+), 10 deletions(-) diff --git a/snippets/change-streams-monitor/changestreammonitor.js b/snippets/change-streams-monitor/changestreammonitor.js index 76ebebf..f29c3a4 100644 --- a/snippets/change-streams-monitor/changestreammonitor.js +++ b/snippets/change-streams-monitor/changestreammonitor.js @@ -2,7 +2,21 @@ const localRequire = require("module").createRequire(__filename); const { Table } = localRequire("to-tabel"); const { templates } = localRequire("boks"); -function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) { +const OutputTypeEnum = { + TABLE: 'TABLE', + JSON: 'JSON', + CSV : 'CSV' +}; + +const PipelineFormatEnum = { + EJSON : 'EJSON', + NONE: 'NONE', + JSON: 'JSON' +}; + +const DEFAULT_DELIMITER="||||" + +function _listChangeStreams (extended = false, allUsers = true, nsFilter = [], outputType = OutputTypeEnum.TABLE, pipelineFormat=PipelineFormatEnum.JSON, delimiter=DEFAULT_DELIMITER) { tableData = []; let changeStreamsDataRaw = getChangeStreams(allUsers, nsFilter); @@ -16,11 +30,20 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) { } catch (error) {} //format the pipeline for better rendering - let changeStreamPipeline = EJSON.stringify( - changeStreamOpData.cursor.originatingCommand.pipeline, - null, - 1 - ); + let changeStreamPipeline = "" + switch (pipelineFormat){ + case PipelineFormatEnum.EJSON: + changeStreamPipeline = EJSON.stringify(changeStreamOpData.cursor.originatingCommand.pipeline, null,1) + break; + case PipelineFormatEnum.JSON: + changeStreamPipeline = JSON.stringify(changeStreamOpData.cursor.originatingCommand.pipeline) + break; + case PipelineFormatEnum.NONE: + changeStreamPipeline = changeStreamOpData.cursor.originatingCommand.pipeline + break; + default: + throw new Error("Internal Error: Unexepected PipelineFormatEnum value " + pipelineFormat) + } let usersStr = ""; if (changeStreamOpData.effectiveUsers){ @@ -66,8 +89,21 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) { }) - customConsoleTable(tableData, extended); - print("Found " + changeStreamsDataRaw.length + " change streams"); + switch (outputType){ + case OutputTypeEnum.TABLE: + generateTableOutput(tableData, extended); + print("Found " + changeStreamsDataRaw.length + " change streams"); + break; + case OutputTypeEnum.JSON: + print("JSON"); + generateJsonOutput(tableData, extended); + break; + case OutputTypeEnum.CSV: + generateCsvOutput(tableData, extended, delimiter); + break; + default: + throw new Error("Internal Error: Unexepected OutputTypeEnum value " + outputType) + } }; function _listChangeStreamsHelp(){ @@ -91,9 +127,22 @@ function _listChangeStreamsHelp(){ * Defailts to true. * @param {Array.} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter. */ -globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter);} +globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.TABLE, PipelineFormatEnum.EJSON);} globalThis.listChangeStreams.help = function () {_listChangeStreamsHelp();} +globalThis.listChangeStreamsAsTable = globalThis.listChangeStreams +//TODO add help +globalThis.listChangeStreamsAsTable.help = function () {_listChangeStreamsHelp();} + +globalThis.listChangeStreamsAsJSON = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.JSON, PipelineFormatEnum.NONE);} +//TODO add help +globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsHelp();} + +globalThis.listChangeStreamsAsCSV = function (extended = false, allUsers = true, nsFilter = [], delimiter=DEFAULT_DELIMITER) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON);} +//TODO add help +globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsHelp();} + + /** * @class Contains the data that will be presented in tabular format. This is the basic data set - @see {ExtendedChangeStreamsData} for the extended version. * @param {*} connId An identifier for the connection where the specific operation originated. @@ -161,6 +210,13 @@ class ChangeStreamsData { let newTbl = new Table(ChangeStreamsData.headers(), options); newTbl.print(); } + + toCsvString(delimiter){ + return this.constructor.headers().reduce( + (accumulator, currentValue) => accumulator === "" ? this[currentValue.name] : accumulator + delimiter + this[currentValue.name], + "" + ) + } }; globalThis.ChangeStreamsData = ChangeStreamsData; @@ -303,7 +359,7 @@ globalThis.getChangeStreams = function (allUsers, nsFilter) { * @param {*} data The data to be displayed in a table * @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers. */ -globalThis.customConsoleTable = function (data, extended) { +globalThis.generateTableOutput = function (data, extended) { if (data && data.length > 0) { const options = { maxSize: process.stdout.columns - 10, @@ -323,6 +379,47 @@ globalThis.customConsoleTable = function (data, extended) { } }; +/** + * Generates JSON output for the extracted changestream data + * @param {*} data The data to be displayed in a table + * @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers. + */ +globalThis.generateJsonOutput = function (data, extended) { + if (data && data.length > 0) { + data.forEach(changeStreamOpData => { + print(JSON.stringify(changeStreamOpData)) + }) + + } else { + print("No Change Streams found!"); + } +}; + +/** + * Generates CSV output for the extracted changestream data + * @param {*} data The data to be displayed in a table + * @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers. + */ +globalThis.generateCsvOutput = function (data, extended, delimiter) { + if (data && data.length > 0) { + let headersSource = extended ? ExtendedChangeStreamsData.headers() : ChangeStreamsData.headers() + let headers = headersSource.map(h => h.name) + let headersStr = headers.reduce( + (accumulator, currentValue) => accumulator === "" ? currentValue : accumulator + delimiter + currentValue, + "" + ) + print(headersStr) + + data.forEach(changeStreamOpData => { + print(changeStreamOpData.toCsvString(delimiter)) + }) + + } else { + print("No Change Streams found!"); + } +}; + + function _prettyPrintChangeStreamPipeline(connectionId) { let pipeline = [ From ee23e3ca411ab1d3ba4e22a679387d8c3737a0b9 Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Thu, 3 Apr 2025 11:53:05 +0400 Subject: [PATCH 2/7] Update changestreammonitor.js Remove unnecessary output --- snippets/change-streams-monitor/changestreammonitor.js | 1 - 1 file changed, 1 deletion(-) diff --git a/snippets/change-streams-monitor/changestreammonitor.js b/snippets/change-streams-monitor/changestreammonitor.js index f29c3a4..3ac8d14 100644 --- a/snippets/change-streams-monitor/changestreammonitor.js +++ b/snippets/change-streams-monitor/changestreammonitor.js @@ -95,7 +95,6 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = [], o print("Found " + changeStreamsDataRaw.length + " change streams"); break; case OutputTypeEnum.JSON: - print("JSON"); generateJsonOutput(tableData, extended); break; case OutputTypeEnum.CSV: From 8fb6a1e1ed6386f3c50295b650d31fd07936b930 Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Thu, 3 Apr 2025 11:56:16 +0400 Subject: [PATCH 3/7] Update changestreammonitor.js Changes order of arguments in listChangeStreamsAsCSV --- snippets/change-streams-monitor/changestreammonitor.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snippets/change-streams-monitor/changestreammonitor.js b/snippets/change-streams-monitor/changestreammonitor.js index 3ac8d14..c86ef4c 100644 --- a/snippets/change-streams-monitor/changestreammonitor.js +++ b/snippets/change-streams-monitor/changestreammonitor.js @@ -137,7 +137,7 @@ globalThis.listChangeStreamsAsJSON = function (extended = false, allUsers = true //TODO add help globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsHelp();} -globalThis.listChangeStreamsAsCSV = function (extended = false, allUsers = true, nsFilter = [], delimiter=DEFAULT_DELIMITER) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON);} +globalThis.listChangeStreamsAsCSV = function (extended = false, delimiter=DEFAULT_DELIMITER, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON);} //TODO add help globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsHelp();} From 6335a59b353c78bba5282a3551573e2c35f1c783 Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Thu, 3 Apr 2025 12:00:42 +0400 Subject: [PATCH 4/7] Fixes missing delimiter parameter --- snippets/change-streams-monitor/changestreammonitor.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snippets/change-streams-monitor/changestreammonitor.js b/snippets/change-streams-monitor/changestreammonitor.js index c86ef4c..e6a89b5 100644 --- a/snippets/change-streams-monitor/changestreammonitor.js +++ b/snippets/change-streams-monitor/changestreammonitor.js @@ -137,7 +137,7 @@ globalThis.listChangeStreamsAsJSON = function (extended = false, allUsers = true //TODO add help globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsHelp();} -globalThis.listChangeStreamsAsCSV = function (extended = false, delimiter=DEFAULT_DELIMITER, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON);} +globalThis.listChangeStreamsAsCSV = function (extended = false, delimiter=DEFAULT_DELIMITER, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON, delimiter);} //TODO add help globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsHelp();} From c729826f021f9aa2235751c5854ee627e095dbe0 Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Wed, 9 Apr 2025 23:40:48 +0400 Subject: [PATCH 5/7] Adds documentation --- .../changestreammonitor.js | 60 ++++++++++++++++--- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/snippets/change-streams-monitor/changestreammonitor.js b/snippets/change-streams-monitor/changestreammonitor.js index e6a89b5..5417d94 100644 --- a/snippets/change-streams-monitor/changestreammonitor.js +++ b/snippets/change-streams-monitor/changestreammonitor.js @@ -107,11 +107,12 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = [], o function _listChangeStreamsHelp(){ print("listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: any): void") + print("listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: any): void") print("Prints a table with the currently open Change Streams. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).") print("\t See prettyPrintChangeStreamPipeline.help() to pretty print a change stream pipeline. ") print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.") print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.") - print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defailts to true.") + print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.") print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.") } @@ -123,23 +124,68 @@ function _listChangeStreamsHelp(){ * @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. * If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. * If set to true, $currentOp reports operations belonging to all users. - * Defailts to true. + * Defaults to true. * @param {Array.} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter. */ globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.TABLE, PipelineFormatEnum.EJSON);} globalThis.listChangeStreams.help = function () {_listChangeStreamsHelp();} +/** + * Alias for {@link listChangeStreams} + */ globalThis.listChangeStreamsAsTable = globalThis.listChangeStreams -//TODO add help globalThis.listChangeStreamsAsTable.help = function () {_listChangeStreamsHelp();} + +function _listChangeStreamsAsJSONHelp(){ + print("listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: any): void") + print("Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).") + print("\t See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. ") + print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.") + print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.") + print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.") + print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.") +} + +/** + * Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). + * See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. + * See ChangeStreamsData and ExtendedChangeStreamsData for data outputted in extended and non-extended mode. + * @param {boolean} extended Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false. + * @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. + * If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. + * If set to true, $currentOp reports operations belonging to all users. + * Defaults to true. + * @param {Array.} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter. + */ globalThis.listChangeStreamsAsJSON = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.JSON, PipelineFormatEnum.NONE);} -//TODO add help -globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsHelp();} +globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsAsJSONHelp();} + +function _listChangeStreamsAsCSVHelp(){ + print("listChangeStreamsAsJSON(extended?: boolean, delimiter?: string, allUsers?: boolean, nsFilter?: any): void") + print("Prints the currently open Change Streams as a CSV string with \"" + DEFAULT_DELIMITER + "\" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). ") + print("\t See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. ") + print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.") + print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.") + print("\t @param delimiter — Provide a custom delimeter for the CSV string. Defaults to \"" + DEFAULT_DELIMITER + "\"") + print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.") + print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.") +} +/** + * Prints the currently open Change Streams as a CSV string with "||||" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). + * See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. + * See ChangeStreamsData and ExtendedChangeStreamsData for data outputted in extended and non-extended mode. + * @param {boolean} extended Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false. + * @param {string} delimiter Provide a custom delimeter for the CSV string + * @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. + * If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. + * If set to true, $currentOp reports operations belonging to all users. + * Defaults to true. + * @param {Array.} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter. + */ globalThis.listChangeStreamsAsCSV = function (extended = false, delimiter=DEFAULT_DELIMITER, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON, delimiter);} -//TODO add help -globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsHelp();} +globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsAsCSVHelp();} /** From 71fc7a5509634367a7c1eb46915b5ca4619ce707 Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Wed, 9 Apr 2025 23:53:27 +0400 Subject: [PATCH 6/7] Update README.md --- snippets/change-streams-monitor/README.md | 24 +++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/snippets/change-streams-monitor/README.md b/snippets/change-streams-monitor/README.md index 69dbbf8..53e61d6 100644 --- a/snippets/change-streams-monitor/README.md +++ b/snippets/change-streams-monitor/README.md @@ -6,6 +6,12 @@ - [Sample Output - Normal Mode](#sample-output---normal-mode) - [Sample Output - Extended](#sample-output---extended) - [listChangeStreams.help()](#listchangestreamshelp) + - [listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsastableextended-boolean-allusers-boolean-nsfilter-array) + - [listChangeStreamsAsTable.help()](#listchangestreamsastablehelp) + - [listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsasjsonextended-boolean-allusers-boolean-nsfilter-array) + - [listChangeStreamsAsJSON.help()](#listchangestreamsasjsonhelp) + - [listChangeStreamsAsCSV(extended?: boolean, delimiter: string, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsascsvextended-boolean-delimiter-string-allusers-boolean-nsfilter-array) + - [listChangeStreamsAsCSV.help()](#listchangestreamsascsvhelp) - [prettyPrintChangeStreamPipeline(connectionId: any)](#prettyprintchangestreampipelineconnectionid-any) - [Example](#example) - [prettyPrintChangeStreamPipeline.help()](#prettyprintchangestreampipelinehelp) @@ -157,6 +163,24 @@ Found 2 change streams ## listChangeStreams.help() Provides help on how to use the function. +## listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: Array) +Alias for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array)` + +## listChangeStreamsAsTable.help() +Provides help on how to use the function. Alias for `listChangeStreams.help()` + +## listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: Array) +Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). See documentation for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array)` for more details about the available parameters. + +## listChangeStreamsAsJSON.help() +Provides help on how to use the function. + +## listChangeStreamsAsCSV(extended?: boolean, delimiter: string, allUsers?: boolean, nsFilter?: Array) +Prints the currently open Change Streams as a CSV string with "||||" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). The delimiter parameter allows overriding the default delimiter. See documentation for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array)` for more details about the other available parameters. + +## listChangeStreamsAsCSV.help() +Provides help on how to use the function. + ## prettyPrintChangeStreamPipeline(connectionId: any) Pretty prints the Change Stream pipeline for a given Connection ID. From ec677002a3c5b74945f6008a0635a19872699e79 Mon Sep 17 00:00:00 2001 From: Edward Mallia Date: Wed, 9 Apr 2025 23:53:46 +0400 Subject: [PATCH 7/7] Updates snippet's version --- snippets/change-streams-monitor/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snippets/change-streams-monitor/package.json b/snippets/change-streams-monitor/package.json index 705652f..d5e22c2 100644 --- a/snippets/change-streams-monitor/package.json +++ b/snippets/change-streams-monitor/package.json @@ -1,7 +1,7 @@ { "name": "@mongosh/snippet-change-stream-monitor", "snippetName": "change-stream-monitor", - "version": "0.1.0", + "version": "0.2.0", "description": "Mongosh snippet that allows users to monitor Change Streams on the current server.", "main": "index.js", "license": "MIT",