Skip to content

Commit

Permalink
Merge pull request #69 from liquibase/snapshot_support_update
Browse files Browse the repository at this point in the history
Add Fixes for Snapshotting Databricks Tables
  • Loading branch information
CodyAustinDavis authored Nov 22, 2023
2 parents d9166bc + 979a21e commit 9cb11cb
Show file tree
Hide file tree
Showing 16 changed files with 433 additions and 77 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,21 @@ If hive_metastore is used, this is not tested and may not provide all the below
2. [x] CLUSTER BY (DDL) - createClusteredTable - createTable with clusterColumns as additional option for liquid - <b> SUPPORTED </b> in Contributed Harness
3. [x] ANALYZE TABLE - analyzeTable - change type with compute stats column options - <b> SUPPORTED </b> in Contributed Harness
4. [x] VACUUM - vacuumTable - change type with retentionHours parameter (default is 168) - <b> SUPPORTED </b> in Contributed Harness
5. [ ] ALTER CLUSTER KEY - changeClusterColumns - change type that will be used until index change types are mapped with CLUSTER BY columns for snapshot purposes
5. [ ] ALTER CLUSTER KEY - changeClusterColumns - change type that will be used until index change types are mapped with CLUSTER BY columns for snapshot purposes - TO DO


## Remaining Required Change Types to Finish in Base/Contributed
1. [ ] (nice to have, not required) createFunction/dropFunction - in Liquibase Pro, should work in Databricks, but change type not accessible from Liquibase Core
2. [x] (nice to have, not required) addCheckConstraint/dropCheckConstraint - in Liquibase Pro, should work in Databricks, but change type not accessible from Liquibase Core
3. [ ] addDefaultValue (of various types). Databricks/Delta tables support this, but does not get populated by databricks in the JDBC Driver (COLUMN_DEF property always None even with default)

The remaining other change types are not relevant to Databricks and have been marked with INVALID TEST


## General TO DO:
1. [ ] Add support for Snapshotting complex types like STRUCT/MAP
2. [ ] Add support for snapshotting IDENTITY KEYs
3. [ ] Add TIMESTAMP_NTZ Data Type

## Aspirational Roadmap - Databricks Specific Additional Change Types to Add:

1. COPY INTO
Expand Down Expand Up @@ -108,7 +112,10 @@ Then put this driver jar under the liquibase/lib directory.
3. Build this project or retrieve the jar from the latest release.
Then put this extension jar under the liquibase/lib directory.

4. Edit the connection parameters to your Databricks catlaog/database under the liquibase.properties file. The format will look like this:
4. IMPORTANT: If using Linux/MaxOS - run the following command in your terminal before continuing (you can add this to the bash/zsh profile):
export JAVA_OPTS=--add-opens=java.base/java.nio=ALL-UNNAMED

5. Edit the connection parameters to your Databricks catlaog/database under the liquibase.properties file. The format will look like this:

```
url: jdbc:databricks://<workspace_url>:443/default;transportMode=http;ssl=1;httpPath=<http_path>;AuthMech=3;ConnCatalog=<catalog>;ConnSchema=<database>;
Expand Down
Binary file added lib/DatabricksJDBC42.jar
Binary file not shown.
6 changes: 3 additions & 3 deletions liquibase.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
url: jdbc:databricks://<workspace_url>:443/default;transportMode=http;ssl=1;httpPath=sql/<warehouse_id>;AuthMech=3;ConnCatalog=main;ConnSchema=liquibase_harness_test_ds;
username: token
password: <dbx_token>
url= jdbc:databricks://<workspace_url>:443/default;transportMode=http;ssl=1;httpPath=sql/<warehouse_id>;AuthMech=3;ConnCatalog=main;ConnSchema=liquibase_harness_test_ds;
username= token
password= <dbx_token>
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>org.liquibase.ext</groupId>
<artifactId>liquibase-databricks</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>

<name>Liquibase Extension Databricks support</name>
<description>Liquibase Extension for Databricks.</description>
Expand Down Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-jdbc</artifactId>
<version>2.6.33</version>
<version>2.6.34</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -85,10 +85,10 @@
<version>${liquibase.version}</version>
<configuration>
<propertyFileWillOverride>true</propertyFileWillOverride>
<propertyFile>target/classes/liquibase.properties</propertyFile>
<propertyFile>dev_db.properties</propertyFile>
<logging>DEBUG</logging>
<changeLogFile>target/classes/changelog.sql</changeLogFile>
<outputChangeLogFile>target/classes/generatedChangelog.databricks.sql</outputChangeLogFile>
<changeLogFile>dev_setup.xml</changeLogFile>
<outputChangeLogFile>generatedChangelog.databricks.sql</outputChangeLogFile>
<diffTypes>tables,views</diffTypes>
<diffIncludeCatalog>true</diffIncludeCatalog>
<diffIncludeSchema>true</diffIncludeSchema>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package liquibase.ext.databricks.database;

import liquibase.Scope;
import liquibase.database.AbstractJdbcDatabase;
import liquibase.database.DatabaseConnection;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.DatabaseException;
import liquibase.structure.DatabaseObject;
import liquibase.statement.SqlStatement;
import liquibase.statement.core.RawCallStatement;
import liquibase.structure.core.Schema;
import liquibase.util.StringUtil;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -22,6 +25,8 @@ public class DatabricksDatabase extends AbstractJdbcDatabase {
public static final String PRODUCT_NAME = "databricks";
// Set default catalog - must be unity Catalog Enabled

public String systemSchema = "information_schema";

// This is from the new INFORMATION_SCHEMA() database
private Set<String> systemTablesAndViews = new HashSet<>();

Expand Down Expand Up @@ -65,6 +70,12 @@ public Set<String> getSystemViews() {
return systemTablesAndViews;
}

// Static Value
@Override
public String getSystemSchema() {
return this.systemSchema;
}

@Override
public Integer getDefaultPort() {
return 443;
Expand All @@ -82,12 +93,14 @@ public boolean isCorrectDatabaseImplementation(DatabaseConnection conn) throws D

@Override
public String getDefaultDriver(String url) {
if (url.startsWith("jdbc:databricks:") || url.startsWith("jdbc:spark:")) {
if (url.startsWith("jdbc:databricks") || url.startsWith("jdbc:spark")) {

return "com.databricks.client.jdbc.Driver";
}
return null;
}


@Override
public boolean supportsInitiallyDeferrableColumns() {
return false;
Expand Down Expand Up @@ -174,6 +187,56 @@ protected SqlStatement getConnectionSchemaNameCallStatement() {
return new RawCallStatement("select current_schema()");
}

@Override
protected String getConnectionSchemaName() {
DatabaseConnection connection = getConnection();

if (connection == null) {
return null;
}
try (ResultSet resultSet = ((JdbcConnection) connection).createStatement().executeQuery("SELECT CURRENT_SCHEMA()")) {
resultSet.next();
return resultSet.getString(1);
} catch (Exception e) {
Scope.getCurrentScope().getLog(getClass()).info("Error getting default schema", e);
}

String foundSchema = parseUrlForSchema(connection.getURL());
System.out.println("SCHEMA IDENFIED: "+ foundSchema);

return foundSchema;
}

private String parseUrlForSchema(String url) {

String schemaToken = "ConnSchema=";

int startIndex = url.indexOf(schemaToken);

// If ConnSchema not found, find the default value
if (startIndex == -1) {

return "default";
}

startIndex += schemaToken.length();
int endIndex = url.indexOf(";", startIndex);

if (endIndex == -1) {
return url.substring(startIndex);
}

return url.substring(startIndex, endIndex);
}

@Override
public void setDefaultSchemaName(final String schemaName) {
this.defaultSchemaName = correctObjectName(schemaName, Schema.class);
}

public void setSystemSchema(String systemSchema) {this.systemSchema = systemSchema;}


private Set<String> getDatabricksReservedWords() {

// Get Reserved words from: https://docs.databricks.com/sql/language-manual/sql-ref-reserved-words.html
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package liquibase.ext.databricks.datatype;

import liquibase.datatype.core.BigIntType;
import liquibase.change.core.LoadDataChange;
import liquibase.database.Database;
import liquibase.datatype.DataTypeInfo;
import liquibase.datatype.DatabaseDataType;
import liquibase.datatype.LiquibaseDataType;
import liquibase.ext.databricks.database.DatabricksDatabase;



@DataTypeInfo(name = "bigint", aliases = {"java.sql.Types.BIGINT", "java.math.BigInteger", "java.lang.Long", "integer8", "bigserial", "serial8", "int8"}, minParameters = 0, maxParameters = 0, priority = DatabricksDatabase.PRIORITY_DATABASE)
public class BigintDatatypeDatabricks extends BigIntType {

private boolean autoIncrement;

@Override
public boolean isAutoIncrement() {
return autoIncrement;
}

public void setAutoIncrement(boolean autoIncrement) {
this.autoIncrement = autoIncrement;
}

@Override
public DatabaseDataType toDatabaseDataType(Database database) {
if (database instanceof DatabricksDatabase) {
return new DatabaseDataType("BIGINT");
}

return super.toDatabaseDataType(database);
}

@Override
public boolean supports(Database database) {
return database instanceof DatabricksDatabase;
}

@Override
public int getPriority() {
return PRIORITY_DATABASE;
}


@Override
public LoadDataChange.LOAD_DATA_TYPE getLoadTypeName() {
return LoadDataChange.LOAD_DATA_TYPE.NUMERIC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
name = "timestamp",
aliases = {"java.sql.Types.DATETIME", "datetime"},
minParameters = 0,
maxParameters = 2,
maxParameters = 0,
priority = PRIORITY_DATABASE
)
public class DatetimeDatatypeDatabricks extends LiquibaseDataType {
Expand All @@ -23,7 +23,7 @@ public class DatetimeDatatypeDatabricks extends LiquibaseDataType {
public DatabaseDataType toDatabaseDataType(Database database) {

if (database instanceof DatabricksDatabase) {
return new DatabaseDataType("TIMESTAMP", getParameters());
return new DatabaseDataType("TIMESTAMP");
}

return super.toDatabaseDataType(database);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package liquibase.ext.databricks.datatype;

import liquibase.change.core.LoadDataChange;
import liquibase.database.Database;
import liquibase.datatype.DataTypeInfo;
import liquibase.datatype.DatabaseDataType;
import liquibase.datatype.LiquibaseDataType;
import liquibase.ext.databricks.database.DatabricksDatabase;

import static liquibase.ext.databricks.database.DatabricksDatabase.PRIORITY_DATABASE;


@DataTypeInfo(
name = "int",
minParameters = 0,
maxParameters = 0,
priority = PRIORITY_DATABASE
)
public class IntegerDatatypeDatabricks extends LiquibaseDataType {
public IntegerDatatypeDatabricks() {
}

public boolean supports(Database database) {
return database instanceof DatabricksDatabase;
}

public DatabaseDataType toDatabaseDataType(Database database) {
if (database instanceof DatabricksDatabase) {

DatabaseDataType type = new DatabaseDataType("INT", this.getParameters());
type.setType("INT");
return type;
} else {
return super.toDatabaseDataType(database);
}

}

public LoadDataChange.LOAD_DATA_TYPE getLoadTypeName() {
return LoadDataChange.LOAD_DATA_TYPE.NUMERIC;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package liquibase.ext.databricks.datatype;

import liquibase.change.core.LoadDataChange;
import liquibase.database.Database;
import liquibase.datatype.DataTypeInfo;
import liquibase.datatype.DatabaseDataType;
import liquibase.datatype.LiquibaseDataType;
import liquibase.ext.databricks.database.DatabricksDatabase;

import static liquibase.ext.databricks.database.DatabricksDatabase.PRIORITY_DATABASE;


@DataTypeInfo(
name = "string",
minParameters = 0,
maxParameters = 0,
priority = PRIORITY_DATABASE
)
public class StringDatatypeDatabricks extends LiquibaseDataType {
public StringDatatypeDatabricks() {
}

public boolean supports(Database database) {
return database instanceof DatabricksDatabase;
}

public DatabaseDataType toDatabaseDataType(Database database) {
if (database instanceof DatabricksDatabase) {

DatabaseDataType type = new DatabaseDataType("STRING");

type.setType("STRING");

return type;
} else {
return super.toDatabaseDataType(database);
}

}

public LoadDataChange.LOAD_DATA_TYPE getLoadTypeName() {
return LoadDataChange.LOAD_DATA_TYPE.STRING;
}
}
Loading

0 comments on commit 9cb11cb

Please sign in to comment.