From 1d57c397446bcc418bc7b8a657f8b5c3c8b5f82e Mon Sep 17 00:00:00 2001 From: Arvind Shyamsundar Date: Tue, 6 Jul 2021 20:05:19 -0700 Subject: [PATCH 1/8] Support bulk insert into SQL Graph tables If user-configurable option `hideGraphColumns` is True (default) treat SQL graph internal columns as computed columns; else treat them as normal columns. --- .../jdbc/spark/SQLServerBulkJdbcOptions.scala | 4 ++- .../jdbc/spark/utils/BulkCopyUtils.scala | 33 ++++++++++++++++--- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala index 710f570f..e9e03a88 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala @@ -68,10 +68,12 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String]) val allowEncryptedValueModifications = params.getOrElse("allowEncryptedValueModifications", "false").toBoolean - val schemaCheckEnabled = params.getOrElse("schemaCheckEnabled", "true").toBoolean + val hideGraphColumns = + params.getOrElse("hideGraphColumns", "true").toBoolean + // Not a feature // Only used for internally testing data idempotency val testDataIdempotency = diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index 347694a0..86cd0035 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -186,8 +186,29 @@ object BulkCopyUtils extends Logging { */ private[spark] def getComputedCols( conn: Connection, - table: String): List[String] = { - val queryStr = s"SELECT name FROM sys.computed_columns WHERE object_id = OBJECT_ID('${table}');" + table: String, + hideGraphColumns: Boolean): List[String] = { + // TODO can optimize this, also evaluate SQLi issues + val queryStr = if (hideGraphColumns) s"""IF (SERVERPROPERTY('EngineEdition') = 5 OR SERVERPROPERTY('ProductMajorVersion') >= 14) +exec sp_executesql N'SELECT name + FROM sys.computed_columns + WHERE object_id = OBJECT_ID(''${table}'') + UNION ALL + SELECT C.name + FROM sys.tables AS T + JOIN sys.columns AS C + ON T.object_id = C.object_id + WHERE T.object_id = OBJECT_ID(''${table}'') + AND (T.is_edge = 1 OR T.is_node = 1) + AND C.is_hidden = 0 + AND C.graph_type = 2' +ELSE +SELECT name + FROM sys.computed_columns + WHERE object_id = OBJECT_ID('${table}') + """ + else s"SELECT name FROM sys.computed_columns WHERE object_id = OBJECT_ID('${table}');" + val computedColRs = conn.createStatement.executeQuery(queryStr) val computedCols = ListBuffer[String]() while (computedColRs.next()) { @@ -263,7 +284,7 @@ object BulkCopyUtils extends Logging { val colMetaData = { if(checkSchema) { checkExTableType(conn, options) - matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled) + matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled, options.hideGraphColumns) } else { defaultColMetadataMap(rs.getMetaData()) } @@ -289,6 +310,7 @@ object BulkCopyUtils extends Logging { * @param url: String, * @param isCaseSensitive: Boolean * @param strictSchemaCheck: Boolean + * @param hideGraphColumns - Whether to hide the $node_id, $from_id, $to_id, $edge_id columns in SQL graph tables */ private[spark] def matchSchemas( conn: Connection, @@ -297,13 +319,14 @@ object BulkCopyUtils extends Logging { rs: ResultSet, url: String, isCaseSensitive: Boolean, - strictSchemaCheck: Boolean): Array[ColumnMetadata]= { + strictSchemaCheck: Boolean, + hideGraphColumns: Boolean): Array[ColumnMetadata]= { val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase) zip df.schema.fieldNames.toList).toMap val dfCols = df.schema val tableCols = getSchema(rs, JdbcDialects.get(url)) - val computedCols = getComputedCols(conn, dbtable) + val computedCols = getComputedCols(conn, dbtable, hideGraphColumns) val prefix = "Spark Dataframe and SQL Server table have differing" From 07fee04de9f10b97584435ed5fa0b7bf8a021db7 Mon Sep 17 00:00:00 2001 From: luxu1-ms Date: Fri, 15 Oct 2021 09:14:09 -0700 Subject: [PATCH 2/8] add support for graph table / temporal table --- .../jdbc/spark/SQLServerBulkJdbcOptions.scala | 4 +- .../jdbc/spark/utils/BulkCopyUtils.scala | 83 ++++++++----------- 2 files changed, 35 insertions(+), 52 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala index e9e03a88..710f570f 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala @@ -68,12 +68,10 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String]) val allowEncryptedValueModifications = params.getOrElse("allowEncryptedValueModifications", "false").toBoolean + val schemaCheckEnabled = params.getOrElse("schemaCheckEnabled", "true").toBoolean - val hideGraphColumns = - params.getOrElse("hideGraphColumns", "true").toBoolean - // Not a feature // Only used for internally testing data idempotency val testDataIdempotency = diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index 86cd0035..91da7a24 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -180,60 +180,47 @@ object BulkCopyUtils extends Logging { } /** - * getComputedCols - * utility function to get computed columns. - * Use computed column names to exclude computed column when matching schema. + * getAutoCols + * utility function to get auto generated columns. + * Use auto generated column names to exclude them when matching schema. */ - private[spark] def getComputedCols( + private[spark] def getAutoCols( conn: Connection, - table: String, - hideGraphColumns: Boolean): List[String] = { - // TODO can optimize this, also evaluate SQLi issues - val queryStr = if (hideGraphColumns) s"""IF (SERVERPROPERTY('EngineEdition') = 5 OR SERVERPROPERTY('ProductMajorVersion') >= 14) -exec sp_executesql N'SELECT name - FROM sys.computed_columns - WHERE object_id = OBJECT_ID(''${table}'') - UNION ALL - SELECT C.name - FROM sys.tables AS T - JOIN sys.columns AS C - ON T.object_id = C.object_id - WHERE T.object_id = OBJECT_ID(''${table}'') - AND (T.is_edge = 1 OR T.is_node = 1) - AND C.is_hidden = 0 - AND C.graph_type = 2' -ELSE -SELECT name - FROM sys.computed_columns + table: String): List[String] = { + // auto cols union computed cols, generated always cols, and node / edge table auto cols + val queryStr = s"""SELECT name + FROM sys.columns WHERE object_id = OBJECT_ID('${table}') + AND (is_computed = 1 -- computed column + OR generated_always_type > 0 -- generated always / temporal table + OR (is_hidden = 0 AND graph_type = 2)) -- graph table """ - else s"SELECT name FROM sys.computed_columns WHERE object_id = OBJECT_ID('${table}');" - val computedColRs = conn.createStatement.executeQuery(queryStr) - val computedCols = ListBuffer[String]() - while (computedColRs.next()) { - val colName = computedColRs.getString("name") - computedCols.append(colName) + val autoColRs = conn.createStatement.executeQuery(queryStr) + val autoCols = ListBuffer[String]() + while (autoColRs.next()) { + val colName = autoColRs.getString("name") + autoCols.append(colName) } - computedCols.toList + autoCols.toList } /** - * dfComputedColCount + * dfAutoColCount * utility function to get number of computed columns in dataframe. * Use number of computed columns in dataframe to get number of non computed column in df, * and compare with the number of non computed column in sql table */ - private[spark] def dfComputedColCount( + private[spark] def dfAutoColCount( dfColNames: List[String], - computedCols: List[String], + autoCols: List[String], dfColCaseMap: Map[String, String], isCaseSensitive: Boolean): Int ={ var dfComputedColCt = 0 - for (j <- 0 to computedCols.length-1){ - if (isCaseSensitive && dfColNames.contains(computedCols(j)) || - !isCaseSensitive && dfColCaseMap.contains(computedCols(j).toLowerCase()) - && dfColCaseMap(computedCols(j).toLowerCase()) == computedCols(j)) { + for (j <- 0 to autoCols.length-1){ + if (isCaseSensitive && dfColNames.contains(autoCols(j)) || + !isCaseSensitive && dfColCaseMap.contains(autoCols(j).toLowerCase()) + && dfColCaseMap(autoCols(j).toLowerCase()) == autoCols(j)) { dfComputedColCt += 1 } } @@ -284,7 +271,7 @@ SELECT name val colMetaData = { if(checkSchema) { checkExTableType(conn, options) - matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled, options.hideGraphColumns) + matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled) } else { defaultColMetadataMap(rs.getMetaData()) } @@ -310,7 +297,6 @@ SELECT name * @param url: String, * @param isCaseSensitive: Boolean * @param strictSchemaCheck: Boolean - * @param hideGraphColumns - Whether to hide the $node_id, $from_id, $to_id, $edge_id columns in SQL graph tables */ private[spark] def matchSchemas( conn: Connection, @@ -319,40 +305,39 @@ SELECT name rs: ResultSet, url: String, isCaseSensitive: Boolean, - strictSchemaCheck: Boolean, - hideGraphColumns: Boolean): Array[ColumnMetadata]= { + strictSchemaCheck: Boolean): Array[ColumnMetadata]= { val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase) zip df.schema.fieldNames.toList).toMap val dfCols = df.schema val tableCols = getSchema(rs, JdbcDialects.get(url)) - val computedCols = getComputedCols(conn, dbtable, hideGraphColumns) + val autoCols = getAutoCols(conn, dbtable) val prefix = "Spark Dataframe and SQL Server table have differing" - if (computedCols.length == 0) { + if (autoCols.length == 0) { assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck, s"${prefix} numbers of columns") } else if (strictSchemaCheck) { val dfColNames = df.schema.fieldNames.toList - val dfComputedColCt = dfComputedColCount(dfColNames, computedCols, dfColCaseMap, isCaseSensitive) + val dfComputedColCt = dfAutoColCount(dfColNames, autoCols, dfColCaseMap, isCaseSensitive) // if df has computed column(s), check column length using non computed column in df and table. // non computed column number in df: dfCols.length - dfComputedColCt - // non computed column number in table: tableCols.length - computedCols.length - assertIfCheckEnabled(dfCols.length-dfComputedColCt == tableCols.length-computedCols.length, strictSchemaCheck, + // non computed column number in table: tableCols.length - autoCols.length + assertIfCheckEnabled(dfCols.length-dfComputedColCt == tableCols.length-autoCols.length, strictSchemaCheck, s"${prefix} numbers of columns") } - val result = new Array[ColumnMetadata](tableCols.length - computedCols.length) + val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) var nonAutoColIndex = 0 for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 // set dfFieldIndex = -1 for all computed columns to skip ColumnMetadata - if (computedCols.contains(tableColName)) { - logDebug(s"skipping computed col index $i col name $tableColName dfFieldIndex $dfFieldIndex") + if (autoCols.contains(tableColName)) { + logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex") }else{ var dfColName:String = "" if (isCaseSensitive) { From 8b7aa570f35fb41e1906d5cb20436ba76f2e5f66 Mon Sep 17 00:00:00 2001 From: luxu1-ms Date: Tue, 16 Nov 2021 11:00:25 -0800 Subject: [PATCH 3/8] add column mapping option --- .../jdbc/spark/SQLServerBulkJdbcOptions.scala | 4 ++ .../jdbc/spark/utils/BulkCopyUtils.scala | 40 +++++++++++-------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala index 710f570f..4b4c835b 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala @@ -72,6 +72,10 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String]) val schemaCheckEnabled = params.getOrElse("schemaCheckEnabled", "true").toBoolean + // user input column names array to match dataframe + val columnsToWrite = + params.getOrElse("columnsToWrite", Array[String]()) + // Not a feature // Only used for internally testing data idempotency val testDataIdempotency = diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index a6061733..da7dcccb 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -207,24 +207,24 @@ object BulkCopyUtils extends Logging { /** * dfAutoColCount - * utility function to get number of computed columns in dataframe. - * Use number of computed columns in dataframe to get number of non computed column in df, - * and compare with the number of non computed column in sql table + * utility function to get number of auto columns in dataframe. + * Use number of auto columns in dataframe to get number of non auto columns in df, + * and compare with the number of non auto columns in sql table */ private[spark] def dfAutoColCount( dfColNames: List[String], autoCols: List[String], dfColCaseMap: Map[String, String], isCaseSensitive: Boolean): Int ={ - var dfComputedColCt = 0 + var dfAutoColCt = 0 for (j <- 0 to autoCols.length-1){ if (isCaseSensitive && dfColNames.contains(autoCols(j)) || !isCaseSensitive && dfColCaseMap.contains(autoCols(j).toLowerCase()) && dfColCaseMap(autoCols(j).toLowerCase()) == autoCols(j)) { - dfComputedColCt += 1 + dfAutoColCt += 1 } } - dfComputedColCt + dfAutoColCt } @@ -271,7 +271,7 @@ object BulkCopyUtils extends Logging { val colMetaData = { if(checkSchema) { checkExTableType(conn, options) - matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled) + matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled, options.columnsToWrite) } else { defaultColMetadataMap(rs.getMetaData()) } @@ -297,6 +297,7 @@ object BulkCopyUtils extends Logging { * @param url: String, * @param isCaseSensitive: Boolean * @param strictSchemaCheck: Boolean + * @param columnsToWrite: Array[String] */ private[spark] def matchSchemas( conn: Connection, @@ -305,7 +306,8 @@ object BulkCopyUtils extends Logging { rs: ResultSet, url: String, isCaseSensitive: Boolean, - strictSchemaCheck: Boolean): Array[ColumnMetadata]= { + strictSchemaCheck: Boolean, + columnsToWrite: Array[String]): Array[ColumnMetadata]= { val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase) zip df.schema.fieldNames.toList).toMap val dfCols = df.schema @@ -320,23 +322,29 @@ object BulkCopyUtils extends Logging { s"${prefix} numbers of columns") } else if (strictSchemaCheck) { val dfColNames = df.schema.fieldNames.toList - val dfComputedColCt = dfAutoColCount(dfColNames, autoCols, dfColCaseMap, isCaseSensitive) - // if df has computed column(s), check column length using non computed column in df and table. - // non computed column number in df: dfCols.length - dfComputedColCt - // non computed column number in table: tableCols.length - autoCols.length - assertIfCheckEnabled(dfCols.length-dfComputedColCt == tableCols.length-autoCols.length, strictSchemaCheck, + val dfAutoColCt = dfAutoColCount(dfColNames, autoCols, dfColCaseMap, isCaseSensitive) + // if df has auto column(s), check column length using non auto column in df and table. + // non auto column number in df: dfCols.length - dfAutoColCt + // non auto column number in table: tableCols.length - autoCols.length + assertIfCheckEnabled(dfCols.length-dfAutoColCt == tableCols.length-autoCols.length, strictSchemaCheck, s"${prefix} numbers of columns") } + if (columnsToWrite.isEmpty()) { + val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) + } else { + val result = new Array[ColumnMetadata](columnsToWrite.size) + } - val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) var nonAutoColIndex = 0 for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - // set dfFieldIndex = -1 for all computed columns to skip ColumnMetadata - if (autoCols.contains(tableColName)) { + // set dfFieldIndex = -1 for all auto columns to skip ColumnMetadata + if (!columnsToWrite.isEmpty() && !columnsToWrite.contain(tableColName)) { + logDebug(s"skipping col index $i col name $tableColName, not provided in columnsToWrite list") + } else if (autoCols.contains(tableColName)) { logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex") }else{ var dfColName:String = "" From a22c30a2e1f30c09f6bdfb2181b9675ff4fe521b Mon Sep 17 00:00:00 2001 From: luxu1-ms Date: Tue, 16 Nov 2021 13:53:16 -0800 Subject: [PATCH 4/8] remove dfAutoCount() and make columns set --- .../jdbc/spark/SQLServerBulkJdbcOptions.scala | 2 +- .../jdbc/spark/utils/BulkCopyUtils.scala | 49 ++++--------------- 2 files changed, 10 insertions(+), 41 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala index 4b4c835b..6c0b3b31 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala @@ -74,7 +74,7 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String]) // user input column names array to match dataframe val columnsToWrite = - params.getOrElse("columnsToWrite", Array[String]()) + params.getOrElse("columnsToWrite", Array[String]()).toSet // Not a feature // Only used for internally testing data idempotency diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index da7dcccb..e894d3cb 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -205,29 +205,6 @@ object BulkCopyUtils extends Logging { autoCols.toList } - /** - * dfAutoColCount - * utility function to get number of auto columns in dataframe. - * Use number of auto columns in dataframe to get number of non auto columns in df, - * and compare with the number of non auto columns in sql table - */ - private[spark] def dfAutoColCount( - dfColNames: List[String], - autoCols: List[String], - dfColCaseMap: Map[String, String], - isCaseSensitive: Boolean): Int ={ - var dfAutoColCt = 0 - for (j <- 0 to autoCols.length-1){ - if (isCaseSensitive && dfColNames.contains(autoCols(j)) || - !isCaseSensitive && dfColCaseMap.contains(autoCols(j).toLowerCase()) - && dfColCaseMap(autoCols(j).toLowerCase()) == autoCols(j)) { - dfAutoColCt += 1 - } - } - dfAutoColCt - } - - /** * getColMetadataMap * Utility function convert result set meta data to array. @@ -297,7 +274,7 @@ object BulkCopyUtils extends Logging { * @param url: String, * @param isCaseSensitive: Boolean * @param strictSchemaCheck: Boolean - * @param columnsToWrite: Array[String] + * @param columnsToWrite: Set[String] */ private[spark] def matchSchemas( conn: Connection, @@ -307,7 +284,7 @@ object BulkCopyUtils extends Logging { url: String, isCaseSensitive: Boolean, strictSchemaCheck: Boolean, - columnsToWrite: Array[String]): Array[ColumnMetadata]= { + columnsToWrite: Set[String]): Array[ColumnMetadata]= { val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase) zip df.schema.fieldNames.toList).toMap val dfCols = df.schema @@ -317,18 +294,9 @@ object BulkCopyUtils extends Logging { val prefix = "Spark Dataframe and SQL Server table have differing" - if (autoCols.length == 0) { - assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck, - s"${prefix} numbers of columns") - } else if (strictSchemaCheck) { - val dfColNames = df.schema.fieldNames.toList - val dfAutoColCt = dfAutoColCount(dfColNames, autoCols, dfColCaseMap, isCaseSensitive) - // if df has auto column(s), check column length using non auto column in df and table. - // non auto column number in df: dfCols.length - dfAutoColCt - // non auto column number in table: tableCols.length - autoCols.length - assertIfCheckEnabled(dfCols.length-dfAutoColCt == tableCols.length-autoCols.length, strictSchemaCheck, - s"${prefix} numbers of columns") - } + // auto columns should not exist in df + assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck, + s"${prefix} numbers of columns") if (columnsToWrite.isEmpty()) { val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) @@ -341,10 +309,11 @@ object BulkCopyUtils extends Logging { for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - // set dfFieldIndex = -1 for all auto columns to skip ColumnMetadata - if (!columnsToWrite.isEmpty() && !columnsToWrite.contain(tableColName)) { - logDebug(s"skipping col index $i col name $tableColName, not provided in columnsToWrite list") + if (!columnsToWrite.isEmpty() && !columnsToWrite.contains(tableColName)) { + // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata + logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list") } else if (autoCols.contains(tableColName)) { + // if auto columns, skip column mapping and ColumnMetadata logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex") }else{ var dfColName:String = "" From ef94937a173e436e7e72abd6736e3358e76eb9f8 Mon Sep 17 00:00:00 2001 From: luxu1-ms Date: Tue, 16 Nov 2021 14:23:32 -0800 Subject: [PATCH 5/8] use string then convert to set --- .../jdbc/spark/SQLServerBulkJdbcOptions.scala | 2 +- .../sqlserver/jdbc/spark/utils/BulkCopyUtils.scala | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala index 6c0b3b31..4a2d095c 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala @@ -74,7 +74,7 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String]) // user input column names array to match dataframe val columnsToWrite = - params.getOrElse("columnsToWrite", Array[String]()).toSet + params.getOrElse("columnsToWrite", "").toString // Not a feature // Only used for internally testing data idempotency diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index e894d3cb..85b57d54 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -274,7 +274,7 @@ object BulkCopyUtils extends Logging { * @param url: String, * @param isCaseSensitive: Boolean * @param strictSchemaCheck: Boolean - * @param columnsToWrite: Set[String] + * @param columnsToWrite: String */ private[spark] def matchSchemas( conn: Connection, @@ -284,7 +284,7 @@ object BulkCopyUtils extends Logging { url: String, isCaseSensitive: Boolean, strictSchemaCheck: Boolean, - columnsToWrite: Set[String]): Array[ColumnMetadata]= { + columnsToWrite: String): Array[ColumnMetadata]= { val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase) zip df.schema.fieldNames.toList).toMap val dfCols = df.schema @@ -292,16 +292,18 @@ object BulkCopyUtils extends Logging { val tableCols = getSchema(rs, JdbcDialects.get(url)) val autoCols = getAutoCols(conn, dbtable) + val columnsToWriteSet = columnsToWrite.split(",").toSet + val prefix = "Spark Dataframe and SQL Server table have differing" // auto columns should not exist in df assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck, s"${prefix} numbers of columns") - if (columnsToWrite.isEmpty()) { + if (columnsToWriteSet.isEmpty()) { val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) } else { - val result = new Array[ColumnMetadata](columnsToWrite.size) + val result = new Array[ColumnMetadata](columnsToWriteSet.size) } var nonAutoColIndex = 0 @@ -309,7 +311,7 @@ object BulkCopyUtils extends Logging { for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - if (!columnsToWrite.isEmpty() && !columnsToWrite.contains(tableColName)) { + if (!columnsToWriteSet.isEmpty() && !columnsToWriteSet.contains(tableColName)) { // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list") } else if (autoCols.contains(tableColName)) { From 6d9e49fdd4edaf46cf7245c361709b07f8831074 Mon Sep 17 00:00:00 2001 From: luxu1-ms Date: Tue, 16 Nov 2021 23:10:53 -0800 Subject: [PATCH 6/8] minor fix --- .../sqlserver/jdbc/spark/utils/BulkCopyUtils.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index 85b57d54..45ed3cfc 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -293,6 +293,7 @@ object BulkCopyUtils extends Logging { val autoCols = getAutoCols(conn, dbtable) val columnsToWriteSet = columnsToWrite.split(",").toSet + logDebug(s"columnsToWrite: $columnsToWriteSet") val prefix = "Spark Dataframe and SQL Server table have differing" @@ -300,10 +301,11 @@ object BulkCopyUtils extends Logging { assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck, s"${prefix} numbers of columns") - if (columnsToWriteSet.isEmpty()) { - val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) - } else { + // if columnsToWrite provided by user, use it for metadata mapping. If not, use sql table. + if (columnsToWrite == "") { val result = new Array[ColumnMetadata](columnsToWriteSet.size) + } else { + val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) } var nonAutoColIndex = 0 @@ -311,7 +313,7 @@ object BulkCopyUtils extends Logging { for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - if (!columnsToWriteSet.isEmpty() && !columnsToWriteSet.contains(tableColName)) { + if (!columnsToWriteSet.isEmpty && !columnsToWriteSet.contains(tableColName)) { // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list") } else if (autoCols.contains(tableColName)) { From 2a1506ec910fb2bc0f8af84707a1f36b3fc0bbc1 Mon Sep 17 00:00:00 2001 From: luxu1 Date: Thu, 9 Dec 2021 12:21:03 -0800 Subject: [PATCH 7/8] leave out auto col and use user providec col mapping --- .../jdbc/spark/utils/BulkCopyUtils.scala | 65 +++++-------------- 1 file changed, 16 insertions(+), 49 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index 45ed3cfc..ffdcb0d5 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -22,8 +22,6 @@ import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createConnectionFactory, getSchema, schemaString} import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions} -import scala.collection.mutable.ListBuffer - /** * BulkCopyUtils Object implements common utility function used by both datapool and */ @@ -35,7 +33,7 @@ object BulkCopyUtils extends Logging { * a connection, sets connection properties and does a BulkWrite. Called when writing data to * master instance and data pools both. URL in options is used to create the relevant connection. * - * @param itertor - iterator for row of the partition. + * @param iterator - iterator for row of the partition. * @param dfColMetadata - array of ColumnMetadata type * @param options - SQLServerBulkJdbcOptions with url for the connection */ @@ -179,32 +177,6 @@ object BulkCopyUtils extends Logging { conn.createStatement.executeQuery(queryStr) } - /** - * getAutoCols - * utility function to get auto generated columns. - * Use auto generated column names to exclude them when matching schema. - */ - private[spark] def getAutoCols( - conn: Connection, - table: String): List[String] = { - // auto cols union computed cols, generated always cols, and node / edge table auto cols - val queryStr = s"""SELECT name - FROM sys.columns - WHERE object_id = OBJECT_ID('${table}') - AND (is_computed = 1 -- computed column - OR generated_always_type > 0 -- generated always / temporal table - OR (is_hidden = 0 AND graph_type = 2)) -- graph table - """ - - val autoColRs = conn.createStatement.executeQuery(queryStr) - val autoCols = ListBuffer[String]() - while (autoColRs.next()) { - val colName = autoColRs.getString("name") - autoCols.append(colName) - } - autoCols.toList - } - /** * getColMetadataMap * Utility function convert result set meta data to array. @@ -290,35 +262,30 @@ object BulkCopyUtils extends Logging { val dfCols = df.schema val tableCols = getSchema(rs, JdbcDialects.get(url)) - val autoCols = getAutoCols(conn, dbtable) - - val columnsToWriteSet = columnsToWrite.split(",").toSet - logDebug(s"columnsToWrite: $columnsToWriteSet") val prefix = "Spark Dataframe and SQL Server table have differing" - // auto columns should not exist in df - assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck, - s"${prefix} numbers of columns") - // if columnsToWrite provided by user, use it for metadata mapping. If not, use sql table. - if (columnsToWrite == "") { - val result = new Array[ColumnMetadata](columnsToWriteSet.size) + var metadataLen = tableCols.length + var columnsToWriteSet: Set[String] = Set() + if (columnsToWrite.isEmpty) { + assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck, + s"${prefix} numbers of columns") } else { - val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) + columnsToWriteSet = columnsToWrite.split(",").map(_.trim).toSet + logDebug(s"columnsToWrite: $columnsToWriteSet") + metadataLen = columnsToWriteSet.size } - var nonAutoColIndex = 0 + var colMappingIndex = 0 + val result = new Array[ColumnMetadata](metadataLen) for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - if (!columnsToWriteSet.isEmpty && !columnsToWriteSet.contains(tableColName)) { - // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata + // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata + if (!columnsToWrite.isEmpty && !columnsToWriteSet.contains(tableColName)) { logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list") - } else if (autoCols.contains(tableColName)) { - // if auto columns, skip column mapping and ColumnMetadata - logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex") }else{ var dfColName:String = "" if (isCaseSensitive) { @@ -361,15 +328,15 @@ object BulkCopyUtils extends Logging { s" DF col ${dfColName} nullable config is ${dfCols(dfFieldIndex).nullable} " + s" Table col ${tableColName} nullable config is ${tableCols(i).nullable}") - // Schema check passed for element, Create ColMetaData only for non auto generated column - result(nonAutoColIndex) = new ColumnMetadata( + // Schema check passed for element, Create ColMetaData for columns + result(colMappingIndex) = new ColumnMetadata( rs.getMetaData().getColumnName(i+1), rs.getMetaData().getColumnType(i+1), rs.getMetaData().getPrecision(i+1), rs.getMetaData().getScale(i+1), dfFieldIndex ) - nonAutoColIndex += 1 + colMappingIndex += 1 } } result From 4c532e1041a050b629e3020570cb1342409bca88 Mon Sep 17 00:00:00 2001 From: luxu1-ms Date: Tue, 1 Nov 2022 22:51:14 -0700 Subject: [PATCH 8/8] add readme --- README.md | 1 + .../microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f4f8d83a..cde17ea9 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ In addition following options are supported | isolationLevel | "READ_COMMITTED" | Specify the isolation level | | tableLock | "false" | Implements an insert with TABLOCK option to improve write performance | | schemaCheckEnabled | "true" | Disables strict dataframe and sql table schema check when set to false | +| columnsToWrite | "" | Enable user defined column mapping. Users provide list of column names and only write to these columns.| Other [Bulk api options](https://docs.microsoft.com/en-us/sql/connect/jdbc/using-bulk-copy-with-the-jdbc-driver?view=sql-server-2017#sqlserverbulkcopyoptions) can be set as options on the dataframe and will be passed to bulkcopy apis on write diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index ffdcb0d5..342b2ae1 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -283,7 +283,7 @@ object BulkCopyUtils extends Logging { for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata + // if columnsToWrite option provided, and sql column names not in it, skip this column mapping and ColumnMetadata if (!columnsToWrite.isEmpty && !columnsToWriteSet.contains(tableColName)) { logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list") }else{