Skip to content

Add JSON and CSV output to change-stream-monitor snippet #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions snippets/change-streams-monitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
- [Sample Output - Normal Mode](#sample-output---normal-mode)
- [Sample Output - Extended](#sample-output---extended)
- [listChangeStreams.help()](#listchangestreamshelp)
- [listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsastableextended-boolean-allusers-boolean-nsfilter-array)
- [listChangeStreamsAsTable.help()](#listchangestreamsastablehelp)
- [listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsasjsonextended-boolean-allusers-boolean-nsfilter-array)
- [listChangeStreamsAsJSON.help()](#listchangestreamsasjsonhelp)
- [listChangeStreamsAsCSV(extended?: boolean, delimiter: string, allUsers?: boolean, nsFilter?: Array)](#listchangestreamsascsvextended-boolean-delimiter-string-allusers-boolean-nsfilter-array)
- [listChangeStreamsAsCSV.help()](#listchangestreamsascsvhelp)
- [prettyPrintChangeStreamPipeline(connectionId: any)](#prettyprintchangestreampipelineconnectionid-any)
- [Example](#example)
- [prettyPrintChangeStreamPipeline.help()](#prettyprintchangestreampipelinehelp)
Expand Down Expand Up @@ -157,6 +163,24 @@ Found 2 change streams
## listChangeStreams.help()
Provides help on how to use the function.

## listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)
Alias for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)`

## listChangeStreamsAsTable.help()
Provides help on how to use the function. Alias for `listChangeStreams.help()`

## listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)
Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). See documentation for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)` for more details about the available parameters.

## listChangeStreamsAsJSON.help()
Provides help on how to use the function.

## listChangeStreamsAsCSV(extended?: boolean, delimiter: string, allUsers?: boolean, nsFilter?: Array<string>)
Prints the currently open Change Streams as a CSV string with "||||" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). The delimiter parameter allows overriding the default delimiter. See documentation for `listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: Array<string>)` for more details about the other available parameters.

## listChangeStreamsAsCSV.help()
Provides help on how to use the function.

## prettyPrintChangeStreamPipeline(connectionId: any)

Pretty prints the Change Stream pipeline for a given Connection ID.
Expand Down
166 changes: 154 additions & 12 deletions snippets/change-streams-monitor/changestreammonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,21 @@ const localRequire = require("module").createRequire(__filename);
const { Table } = localRequire("to-tabel");
const { templates } = localRequire("boks");

function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) {
const OutputTypeEnum = {
TABLE: 'TABLE',
JSON: 'JSON',
CSV : 'CSV'
};

const PipelineFormatEnum = {
EJSON : 'EJSON',
NONE: 'NONE',
JSON: 'JSON'
};

const DEFAULT_DELIMITER="||||"

function _listChangeStreams (extended = false, allUsers = true, nsFilter = [], outputType = OutputTypeEnum.TABLE, pipelineFormat=PipelineFormatEnum.JSON, delimiter=DEFAULT_DELIMITER) {
tableData = [];
let changeStreamsDataRaw = getChangeStreams(allUsers, nsFilter);

Expand All @@ -16,11 +30,20 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) {
} catch (error) {}

//format the pipeline for better rendering
let changeStreamPipeline = EJSON.stringify(
changeStreamOpData.cursor.originatingCommand.pipeline,
null,
1
);
let changeStreamPipeline = ""
switch (pipelineFormat){
case PipelineFormatEnum.EJSON:
changeStreamPipeline = EJSON.stringify(changeStreamOpData.cursor.originatingCommand.pipeline, null,1)
break;
case PipelineFormatEnum.JSON:
changeStreamPipeline = JSON.stringify(changeStreamOpData.cursor.originatingCommand.pipeline)
break;
case PipelineFormatEnum.NONE:
changeStreamPipeline = changeStreamOpData.cursor.originatingCommand.pipeline
break;
default:
throw new Error("Internal Error: Unexepected PipelineFormatEnum value " + pipelineFormat)
}

let usersStr = "";
if (changeStreamOpData.effectiveUsers){
Expand Down Expand Up @@ -66,17 +89,30 @@ function _listChangeStreams (extended = false, allUsers = true, nsFilter = []) {

})

customConsoleTable(tableData, extended);
print("Found " + changeStreamsDataRaw.length + " change streams");
switch (outputType){
case OutputTypeEnum.TABLE:
generateTableOutput(tableData, extended);
print("Found " + changeStreamsDataRaw.length + " change streams");
break;
case OutputTypeEnum.JSON:
generateJsonOutput(tableData, extended);
break;
case OutputTypeEnum.CSV:
generateCsvOutput(tableData, extended, delimiter);
break;
default:
throw new Error("Internal Error: Unexepected OutputTypeEnum value " + outputType)
}
};

function _listChangeStreamsHelp(){
print("listChangeStreams(extended?: boolean, allUsers?: boolean, nsFilter?: any): void")
print("listChangeStreamsAsTable(extended?: boolean, allUsers?: boolean, nsFilter?: any): void")
print("Prints a table with the currently open Change Streams. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).")
print("\t See prettyPrintChangeStreamPipeline.help() to pretty print a change stream pipeline. ")
print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.")
print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.")
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defailts to true.")
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.")
print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.")
}

Expand All @@ -88,12 +124,70 @@ function _listChangeStreamsHelp(){
* @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e.
* If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command.
* If set to true, $currentOp reports operations belonging to all users.
* Defailts to true.
* Defaults to true.
* @param {Array.<string>} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter.
*/
globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter);}
globalThis.listChangeStreams = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.TABLE, PipelineFormatEnum.EJSON);}
globalThis.listChangeStreams.help = function () {_listChangeStreamsHelp();}

/**
* Alias for {@link listChangeStreams}
*/
globalThis.listChangeStreamsAsTable = globalThis.listChangeStreams
globalThis.listChangeStreamsAsTable.help = function () {_listChangeStreamsHelp();}


function _listChangeStreamsAsJSONHelp(){
print("listChangeStreamsAsJSON(extended?: boolean, allUsers?: boolean, nsFilter?: any): void")
print("Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).")
print("\t See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. ")
print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.")
print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.")
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.")
print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.")
}

/**
* Prints the currently open Change Streams as a JSON string. A JSON string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).
* See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline.
* See ChangeStreamsData and ExtendedChangeStreamsData for data outputted in extended and non-extended mode.
* @param {boolean} extended Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.
* @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e.
* If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command.
* If set to true, $currentOp reports operations belonging to all users.
* Defaults to true.
* @param {Array.<string>} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter.
*/
globalThis.listChangeStreamsAsJSON = function (extended = false, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.JSON, PipelineFormatEnum.NONE);}
globalThis.listChangeStreamsAsJSON.help = function () {_listChangeStreamsAsJSONHelp();}


function _listChangeStreamsAsCSVHelp(){
print("listChangeStreamsAsJSON(extended?: boolean, delimiter?: string, allUsers?: boolean, nsFilter?: any): void")
print("Prints the currently open Change Streams as a CSV string with \"" + DEFAULT_DELIMITER + "\" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour). ")
print("\t See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline. ")
print("\t See ChangeStreamsData.help() and ExtendedChangeStreamsData.help() for data outputted in extended and non-extended mode.")
print("\t @param extended — Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.")
print("\t @param delimiter — Provide a custom delimeter for the CSV string. Defaults to \"" + DEFAULT_DELIMITER + "\"")
print("\t @param allUsers — Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e. If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command. If set to true, $currentOp reports operations belonging to all users. Defaults to true.")
print("\t @param nsFilter — An optional array of namespace filter. Defaults to [] i.e. to filter.")
}
/**
* Prints the currently open Change Streams as a CSV string with "||||" as the default delimeter. A string is printed separately on a newline for each open Change Stream. The behaviour of the function can be controlled with the available parameters (see parameter defaults for default behaviour).
* See prettyPrintChangeStreamPipeline() to pretty print a change stream pipeline.
* See ChangeStreamsData and ExtendedChangeStreamsData for data outputted in extended and non-extended mode.
* @param {boolean} extended Controls whether a simple or extended output is presented. Refer to ExtendedChangeStreamsData. Defaults to false.
* @param {string} delimiter Provide a custom delimeter for the CSV string
* @param {boolean} allUsers Boolean that correspond's to the allUsers flag of the $currentOp MongoDB Pipeline Stage i.e.
* If set to false, $currentOp only reports on operations/idle connections/idle cursors/idle sessions belonging to the user who ran the command.
* If set to true, $currentOp reports operations belonging to all users.
* Defaults to true.
* @param {Array.<string>} nsFilter An optional array of namespace filter. Defaults to [] i.e. to filter.
*/
globalThis.listChangeStreamsAsCSV = function (extended = false, delimiter=DEFAULT_DELIMITER, allUsers = true, nsFilter = []) {_listChangeStreams(extended, allUsers, nsFilter, OutputTypeEnum.CSV, PipelineFormatEnum.JSON, delimiter);}
globalThis.listChangeStreamsAsCSV.help = function () {_listChangeStreamsAsCSVHelp();}


/**
* @class Contains the data that will be presented in tabular format. This is the basic data set - @see {ExtendedChangeStreamsData} for the extended version.
* @param {*} connId An identifier for the connection where the specific operation originated.
Expand Down Expand Up @@ -161,6 +255,13 @@ class ChangeStreamsData {
let newTbl = new Table(ChangeStreamsData.headers(), options);
newTbl.print();
}

toCsvString(delimiter){
return this.constructor.headers().reduce(
(accumulator, currentValue) => accumulator === "" ? this[currentValue.name] : accumulator + delimiter + this[currentValue.name],
""
)
}
};

globalThis.ChangeStreamsData = ChangeStreamsData;
Expand Down Expand Up @@ -303,7 +404,7 @@ globalThis.getChangeStreams = function (allUsers, nsFilter) {
* @param {*} data The data to be displayed in a table
* @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers.
*/
globalThis.customConsoleTable = function (data, extended) {
globalThis.generateTableOutput = function (data, extended) {
if (data && data.length > 0) {
const options = {
maxSize: process.stdout.columns - 10,
Expand All @@ -323,6 +424,47 @@ globalThis.customConsoleTable = function (data, extended) {
}
};

/**
* Generates JSON output for the extracted changestream data
* @param {*} data The data to be displayed in a table
* @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers.
*/
globalThis.generateJsonOutput = function (data, extended) {
if (data && data.length > 0) {
data.forEach(changeStreamOpData => {
print(JSON.stringify(changeStreamOpData))
})

} else {
print("No Change Streams found!");
}
};

/**
* Generates CSV output for the extracted changestream data
* @param {*} data The data to be displayed in a table
* @param {boolean} extended Whether the extended output format is being used. This is used to generate the output table headers.
*/
globalThis.generateCsvOutput = function (data, extended, delimiter) {
if (data && data.length > 0) {
let headersSource = extended ? ExtendedChangeStreamsData.headers() : ChangeStreamsData.headers()
let headers = headersSource.map(h => h.name)
let headersStr = headers.reduce(
(accumulator, currentValue) => accumulator === "" ? currentValue : accumulator + delimiter + currentValue,
""
)
print(headersStr)

data.forEach(changeStreamOpData => {
print(changeStreamOpData.toCsvString(delimiter))
})

} else {
print("No Change Streams found!");
}
};



function _prettyPrintChangeStreamPipeline(connectionId) {
let pipeline = [
Expand Down
2 changes: 1 addition & 1 deletion snippets/change-streams-monitor/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@mongosh/snippet-change-stream-monitor",
"snippetName": "change-stream-monitor",
"version": "0.1.0",
"version": "0.2.0",
"description": "Mongosh snippet that allows users to monitor Change Streams on the current server.",
"main": "index.js",
"license": "MIT",
Expand Down