From 90b6a0613a97cba87458688ebb5ab92fa037395c Mon Sep 17 00:00:00 2001 From: liudefu <1625567290@qq.com> Date: Sat, 7 Oct 2023 21:35:52 +0800 Subject: [PATCH] changed --- .../server/restful/MetadataQueryRestful.java | 491 ++++++++------ .../server/service/MetadataQueryService.java | 142 ++-- .../impl/MetadataQueryServiceImpl.java | 622 +++++++++++------- .../query/common/domain/FlinkSqlTemplate.java | 67 ++ 4 files changed, 802 insertions(+), 520 deletions(-) create mode 100644 linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/domain/FlinkSqlTemplate.java diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java index ddb7214b33..7915387254 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java @@ -60,9 +60,9 @@ public class MetadataQueryRestful { @RequestMapping(value = "/getConnectionInfo", method = RequestMethod.GET) public Message getConnectionInfo( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -74,40 +74,40 @@ public Message getConnectionInfo( return Message.error("'dataSourceName' is invalid[数据源错误]"); } String userName = - ModuleUserUtils.getOperationUser( - request, "getConnectionInfo, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser( + request, "getConnectionInfo, dataSourceName:" + dataSourceName); Map queryParams = - request.getParameterMap().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, entry -> StringUtils.join(entry.getValue(), ","))); + request.getParameterMap().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> StringUtils.join(entry.getValue(), ","))); Map info = - metadataQueryService.getConnectionInfoByDsName( - dataSourceName, queryParams, system, userName); + metadataQueryService.getConnectionInfoByDsName( + dataSourceName, queryParams, system, userName); return Message.ok().data("info", info); } catch (Exception e) { return errorToResponseMessage( - "Fail to get connection info [获得连接信息失败], name: [" - + dataSourceName - + "], system:[" - + system - + "]", - e); + "Fail to get connection info [获得连接信息失败], name: [" + + dataSourceName + + "], system:[" + + system + + "]", + e); } } @ApiOperation(value = "getDatabases", notes = "get databases", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "envId", required = false, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String") }) @RequestMapping(value = "/getDatabases", method = RequestMethod.GET) public Message getDatabases( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam(value = "envId", required = false) String envId, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -119,37 +119,37 @@ public Message getDatabases( return Message.error("'dataSourceName' is invalid[数据源错误]"); } String userName = - ModuleUserUtils.getOperationUser( - request, "getDatabases, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser( + request, "getDatabases, dataSourceName:" + dataSourceName); List databases = - metadataQueryService.getDatabasesByDsNameAndEnvId( - dataSourceName, system, userName, envId); + metadataQueryService.getDatabasesByDsNameAndEnvId( + dataSourceName, system, userName, envId); return Message.ok().data("dbs", databases); } catch (Exception e) { return errorToResponseMessage( - "Fail to get database list[获取库信息失败], name:[" - + dataSourceName - + "], system:[" - + system - + "]", - e); + "Fail to get database list[获取库信息失败], name:[" + + dataSourceName + + "], system:[" + + system + + "]", + e); } } @ApiOperation(value = "getTables", notes = "get tables", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "envId", required = false, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String") }) @RequestMapping(value = "/getTables", method = RequestMethod.GET) public Message getTables( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam(value = "envId", required = false) String envId, - @RequestParam("database") String database, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("database") String database, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -164,39 +164,39 @@ public Message getTables( return Message.error("'database' is invalid[数据库名称错误]"); } String userName = - ModuleUserUtils.getOperationUser(request, "getTables, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser(request, "getTables, dataSourceName:" + dataSourceName); List tables = - metadataQueryService.getTablesByDsNameAndEnvId( - dataSourceName, database, system, userName, envId); + metadataQueryService.getTablesByDsNameAndEnvId( + dataSourceName, database, system, userName, envId); return Message.ok().data("tables", tables); } catch (Exception e) { return errorToResponseMessage( - "Fail to get table list[获取表信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "]", - e); + "Fail to get table list[获取表信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "]", + e); } } @ApiOperation(value = "getTableProps", notes = "get table props", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String"), - @ApiImplicitParam(name = "table", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") }) @RequestMapping(value = "/getTableProps", method = RequestMethod.GET) public Message getTableProps( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam("database") String database, - @RequestParam("table") String table, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -214,43 +214,43 @@ public Message getTableProps( return Message.error("'dataSourceName' is invalid[数据源错误]"); } String userName = - ModuleUserUtils.getOperationUser( - request, "getTableProps, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser( + request, "getTableProps, dataSourceName:" + dataSourceName); Map tableProps = - metadataQueryService.getTablePropsByDsName( - dataSourceName, database, table, system, userName); + metadataQueryService.getTablePropsByDsName( + dataSourceName, database, table, system, userName); return Message.ok().data("props", tableProps); } catch (Exception e) { return errorToResponseMessage( - "Fail to get table properties[获取表参数信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "], table:[" - + table - + "]", - e); + "Fail to get table properties[获取表参数信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); } } @ApiOperation(value = "getPartitions", notes = "get partitions", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String"), - @ApiImplicitParam(name = "table", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") }) @RequestMapping(value = "/getPartitions", method = RequestMethod.GET) public Message getPartitions( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam("database") String database, - @RequestParam("table") String table, - @RequestParam("system") String system, - @RequestParam(name = "traverse", required = false, defaultValue = "false") Boolean traverse, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + @RequestParam(name = "traverse", required = false, defaultValue = "false") Boolean traverse, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -269,47 +269,47 @@ public Message getPartitions( } String userName = - ModuleUserUtils.getOperationUser( - request, "getPartitions, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser( + request, "getPartitions, dataSourceName:" + dataSourceName); MetaPartitionInfo partitionInfo = - metadataQueryService.getPartitionsByDsName( - dataSourceName, database, table, system, traverse, userName); + metadataQueryService.getPartitionsByDsName( + dataSourceName, database, table, system, traverse, userName); return Message.ok().data("partitions", partitionInfo); } catch (Exception e) { return errorToResponseMessage( - "Fail to get partitions[获取表分区信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "], table:[" - + table - + "]", - e); + "Fail to get partitions[获取表分区信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); } } @ApiOperation( - value = "getPartitionProps", - notes = "get partition props", - response = Message.class) + value = "getPartitionProps", + notes = "get partition props", + response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String"), - @ApiImplicitParam(name = "table", required = true, dataType = "String"), - @ApiImplicitParam(name = "partition", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String"), + @ApiImplicitParam(name = "partition", required = true, dataType = "String") }) @RequestMapping(value = "getPartitionProps", method = RequestMethod.GET) public Message getPartitionProps( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam("database") String database, - @RequestParam("table") String table, - @RequestParam("partition") String partition, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("partition") String partition, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -330,46 +330,46 @@ public Message getPartitionProps( return Message.error("'partition' is invalid[partition错误]"); } String userName = - ModuleUserUtils.getOperationUser( - request, "getPartitionProps, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser( + request, "getPartitionProps, dataSourceName:" + dataSourceName); Map partitionProps = - metadataQueryService.getPartitionPropsByDsName( - dataSourceName, database, table, partition, system, userName); + metadataQueryService.getPartitionPropsByDsName( + dataSourceName, database, table, partition, system, userName); return Message.ok().data("props", partitionProps); } catch (Exception e) { return errorToResponseMessage( - "Fail to get partition properties[获取分区参数信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "], table:[" - + table - + "], partition:[" - + partition - + "]", - e); + "Fail to get partition properties[获取分区参数信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "], partition:[" + + partition + + "]", + e); } } @ApiOperation(value = "getColumns", notes = "get columns", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "envId", required = false, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String"), - @ApiImplicitParam(name = "table", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") }) @RequestMapping(value = "/getColumns", method = RequestMethod.GET) public Message getColumns( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam(value = "envId", required = false) String envId, - @RequestParam("database") String database, - @RequestParam("table") String table, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -388,44 +388,44 @@ public Message getColumns( } String userName = - ModuleUserUtils.getOperationUser(request, "getColumns, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser(request, "getColumns, dataSourceName:" + dataSourceName); List columns = - metadataQueryService.getColumnsByDsNameAndEnvId( - dataSourceName, database, table, system, userName, envId); + metadataQueryService.getColumnsByDsNameAndEnvId( + dataSourceName, database, table, system, userName, envId); return Message.ok().data("columns", columns); } catch (Exception e) { return errorToResponseMessage( - "Fail to get column list[获取表字段信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "], table:[" - + table - + "]", - e); + "Fail to get column list[获取表字段信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); } } @ApiOperation(value = "getSparkDdlSql", notes = "get spark ddl sql", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "envId", required = false, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String"), - @ApiImplicitParam(name = "table", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") }) @RequestMapping(value = "/getSparkSql", method = RequestMethod.GET) public Message getSparkSql( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam(value = "envId", required = false) String envId, - @RequestParam("database") String database, - @RequestParam("table") String table, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -444,45 +444,45 @@ public Message getSparkSql( } String userName = - ModuleUserUtils.getOperationUser( - request, "getSparkDdlSql, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser( + request, "getSparkDdlSql, dataSourceName:" + dataSourceName); GenerateSqlInfo sparkSql = - metadataQueryService.getSparkSqlByDsNameAndEnvId( - dataSourceName, database, table, system, userName, envId); + metadataQueryService.getSparkSqlByDsNameAndEnvId( + dataSourceName, database, table, system, userName, envId); return Message.ok().data("sparkSql", sparkSql); } catch (Exception e) { return errorToResponseMessage( - "Fail to spark sql[获取getSparkSql信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "], table:[" - + table - + "]", - e); + "Fail to spark sql[获取getSparkSql信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); } } @ApiOperation(value = "getJdbcSql", notes = "get jdbc sql", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), - @ApiImplicitParam(name = "envId", required = false, dataType = "String"), - @ApiImplicitParam(name = "system", required = true, dataType = "String"), - @ApiImplicitParam(name = "database", required = true, dataType = "String"), - @ApiImplicitParam(name = "table", required = true, dataType = "String") + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") }) @RequestMapping(value = "/getJdbcSql", method = RequestMethod.GET) public Message getJdbcSql( - @RequestParam("dataSourceName") String dataSourceName, - @RequestParam(value = "envId", required = false) String envId, - @RequestParam("database") String database, - @RequestParam("table") String table, - @RequestParam("system") String system, - HttpServletRequest request) { + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { try { if (StringUtils.isBlank(system)) { return Message.error("'system' is missing[缺少系统名]"); @@ -501,25 +501,82 @@ public Message getJdbcSql( } String userName = - ModuleUserUtils.getOperationUser(request, "getJdbcSql, dataSourceName:" + dataSourceName); + ModuleUserUtils.getOperationUser(request, "getJdbcSql, dataSourceName:" + dataSourceName); GenerateSqlInfo sparkSql = - metadataQueryService.getJdbcSqlByDsNameAndEnvId( - dataSourceName, database, table, system, userName, envId); + metadataQueryService.getJdbcSqlByDsNameAndEnvId( + dataSourceName, database, table, system, userName, envId); return Message.ok().data("jdbcSql", sparkSql); } catch (Exception e) { return errorToResponseMessage( - "Fail to jdbc sql[获取getJdbcSql信息失败], name:[" - + dataSourceName - + "]" - + ", system:[" - + system - + "], database:[" - + database - + "], table:[" - + table - + "]", - e); + "Fail to jdbc sql[获取getJdbcSql信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); + } + } + + @ApiOperation(value = "getFlinkSql", notes = "get flink ddl sql", response = Message.class) + @ApiImplicitParams({ + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") + }) + @RequestMapping(value = "/getFlinkSql", method = RequestMethod.GET) + public Message getFlinkSql( + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { + try { + if (StringUtils.isBlank(system)) { + return Message.error("'system' is missing[缺少系统名]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(system).matches()) { + return Message.error("'system' is invalid[系统名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(database).matches()) { + return Message.error("'database' is invalid[数据库名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(table).matches()) { + return Message.error("'table' is invalid[表名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(dataSourceName).matches()) { + return Message.error("'dataSourceName' is invalid[数据源错误]"); + } + + String userName = + ModuleUserUtils.getOperationUser( + request, "getSparkDdlSql, dataSourceName:" + dataSourceName); + + GenerateSqlInfo flinkSql = + metadataQueryService.getFlinkSqlByDsNameAndEnvId( + dataSourceName, database, table, system, userName, envId); + return Message.ok().data("flinkSql", flinkSql); + } catch (Exception e) { + return errorToResponseMessage( + "Fail to spark sql[获取getflinkSql信息失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); } } @@ -534,12 +591,12 @@ private Message errorToResponseMessage(String uiMessage, Exception e) { // Ignore } logger.trace( - uiMessage - + " => Method: " - + invokeException.getMethod() - + ", Arguments:" - + argumentJson, - e); + uiMessage + + " => Method: " + + invokeException.getMethod() + + ", Arguments:" + + argumentJson, + e); } uiMessage += " possible reason[可能原因]: (" + invokeException.getCause().getMessage() + ")"; } else { diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java index 1e7438559c..9ab91933eb 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java @@ -34,7 +34,7 @@ public interface MetadataQueryService { * @return */ void getConnection(String dataSourceType, String operator, Map params) - throws Exception; + throws Exception; /** * @param dataSourceId data source id @@ -43,7 +43,7 @@ void getConnection(String dataSourceType, String operator, Map p */ @Deprecated List getDatabasesByDsId(String dataSourceId, String system, String userName) - throws ErrorException; + throws ErrorException; /** * @param dataSourceId data source id @@ -53,7 +53,7 @@ List getDatabasesByDsId(String dataSourceId, String system, String userN */ @Deprecated List getTablesByDsId(String dataSourceId, String database, String system, String userName) - throws ErrorException; + throws ErrorException; /** * @param dataSourceId data source id @@ -64,8 +64,8 @@ List getTablesByDsId(String dataSourceId, String database, String system */ @Deprecated Map getTablePropsByDsId( - String dataSourceId, String database, String table, String system, String userName) - throws ErrorException; + String dataSourceId, String database, String table, String system, String userName) + throws ErrorException; /** * @param dataSourceId data source i * @param database database @@ -75,13 +75,13 @@ Map getTablePropsByDsId( */ @Deprecated MetaPartitionInfo getPartitionsByDsId( - String dataSourceId, - String database, - String table, - String system, - Boolean traverse, - String userName) - throws ErrorException; + String dataSourceId, + String database, + String table, + String system, + Boolean traverse, + String userName) + throws ErrorException; /** * @param dataSourceId data source id @@ -95,13 +95,13 @@ MetaPartitionInfo getPartitionsByDsId( */ @Deprecated Map getPartitionPropsByDsId( - String dataSourceId, - String database, - String table, - String partition, - String system, - String userName) - throws ErrorException; + String dataSourceId, + String database, + String table, + String partition, + String system, + String userName) + throws ErrorException; /** * @param dataSourceId data source id @@ -112,8 +112,8 @@ Map getPartitionPropsByDsId( */ @Deprecated List getColumnsByDsId( - String dataSourceId, String database, String table, String system, String userName) - throws ErrorException; + String dataSourceId, String database, String table, String system, String userName) + throws ErrorException; /** * Get connection information @@ -125,8 +125,8 @@ List getColumnsByDsId( * @return */ Map getConnectionInfoByDsName( - String dataSourceName, Map queryParams, String system, String userName) - throws ErrorException; + String dataSourceName, Map queryParams, String system, String userName) + throws ErrorException; /** * @param dataSourceName data source name @@ -134,7 +134,7 @@ Map getConnectionInfoByDsName( * @return */ List getDatabasesByDsName(String dataSourceName, String system, String userName) - throws ErrorException; + throws ErrorException; /** * @param dataSourceName @@ -145,7 +145,7 @@ List getDatabasesByDsName(String dataSourceName, String system, String u * @throws ErrorException */ List getDatabasesByDsNameAndEnvId( - String dataSourceName, String system, String userName, String envId) throws ErrorException; + String dataSourceName, String system, String userName, String envId) throws ErrorException; /** * @param dataSourceName data source name @@ -154,7 +154,7 @@ List getDatabasesByDsNameAndEnvId( * @return */ List getTablesByDsName( - String dataSourceName, String database, String system, String userName) throws ErrorException; + String dataSourceName, String database, String system, String userName) throws ErrorException; /** * @param dataSourceName @@ -166,8 +166,8 @@ List getTablesByDsName( * @throws ErrorException */ List getTablesByDsNameAndEnvId( - String dataSourceName, String database, String system, String userName, String envId) - throws ErrorException; + String dataSourceName, String database, String system, String userName, String envId) + throws ErrorException; /** * @param dataSourceName data source name @@ -177,8 +177,8 @@ List getTablesByDsNameAndEnvId( * @return */ Map getTablePropsByDsName( - String dataSourceName, String database, String table, String system, String userName) - throws ErrorException; + String dataSourceName, String database, String table, String system, String userName) + throws ErrorException; /** * @param dataSourceName data source name * @param database database @@ -187,13 +187,13 @@ Map getTablePropsByDsName( * @return */ MetaPartitionInfo getPartitionsByDsName( - String dataSourceName, - String database, - String table, - String system, - Boolean traverse, - String userName) - throws ErrorException; + String dataSourceName, + String database, + String table, + String system, + Boolean traverse, + String userName) + throws ErrorException; /** * @param dataSourceName data source name @@ -206,13 +206,13 @@ MetaPartitionInfo getPartitionsByDsName( * @throws ErrorException */ Map getPartitionPropsByDsName( - String dataSourceName, - String database, - String table, - String partition, - String system, - String userName) - throws ErrorException; + String dataSourceName, + String database, + String table, + String partition, + String system, + String userName) + throws ErrorException; /** * @param dataSourceName data source id @@ -222,8 +222,8 @@ Map getPartitionPropsByDsName( * @return */ List getColumnsByDsName( - String dataSourceName, String database, String table, String system, String userName) - throws ErrorException; + String dataSourceName, String database, String table, String system, String userName) + throws ErrorException; /** * @param dataSourceName @@ -236,13 +236,13 @@ List getColumnsByDsName( * @throws ErrorException */ List getColumnsByDsNameAndEnvId( - String dataSourceName, - String database, - String table, - String system, - String userName, - String envId) - throws ErrorException; + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException; /** * @param dataSourceName @@ -255,13 +255,13 @@ List getColumnsByDsNameAndEnvId( * @throws ErrorException */ GenerateSqlInfo getSparkSqlByDsNameAndEnvId( - String dataSourceName, - String database, - String table, - String system, - String userName, - String envId) - throws ErrorException; + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException; /** * @param dataSourceName @@ -274,11 +274,19 @@ GenerateSqlInfo getSparkSqlByDsNameAndEnvId( * @throws ErrorException */ GenerateSqlInfo getJdbcSqlByDsNameAndEnvId( - String dataSourceName, - String database, - String table, - String system, - String userName, - String envId) - throws ErrorException; + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException; + + GenerateSqlInfo getFlinkSqlByDsNameAndEnvId( + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId); } diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java index 904ab4d947..f61f8c6ec7 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java @@ -30,6 +30,7 @@ import org.apache.linkis.metadata.query.common.domain.GenerateSqlInfo; import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo; import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo; +import org.apache.linkis.metadata.query.common.domain.FlinkSqlTemplate; import org.apache.linkis.metadata.query.common.exception.MetaMethodInvokeException; import org.apache.linkis.metadata.query.common.exception.MetaRuntimeException; import org.apache.linkis.metadata.query.common.service.GenerateSqlTemplate; @@ -72,16 +73,16 @@ public class MetadataQueryServiceImpl implements MetadataQueryService { @PostConstruct public void init() { dataSourceRpcSender = - Sender.getSender(MdmConfiguration.DATA_SOURCE_SERVICE_APPLICATION.getValue()); + Sender.getSender(MdmConfiguration.DATA_SOURCE_SERVICE_APPLICATION.getValue()); metaClassLoaderManager = new MetaClassLoaderManager(); } @Override public void getConnection(String dataSourceType, String operator, Map params) - throws Exception { + throws Exception { MetadataConnection metadataConnection = - invokeMetaMethod( - dataSourceType, "getConnection", new Object[] {operator, params}, Map.class); + invokeMetaMethod( + dataSourceType, "getConnection", new Object[] {operator, params}, Map.class); if (Objects.nonNull(metadataConnection)) { Closeable connection = metadataConnection.getConnection(); try { @@ -95,14 +96,14 @@ public void getConnection(String dataSourceType, String operator, Map getDatabasesByDsId(String dataSourceId, String system, String userName) - throws ErrorException { + throws ErrorException { DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getDatabases", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, - List.class); + dsInfoResponse.getDsType(), + "getDatabases", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, + List.class); } return new ArrayList<>(); } @@ -110,14 +111,14 @@ public List getDatabasesByDsId(String dataSourceId, String system, Strin @Override @Deprecated public List getTablesByDsId( - String dataSourceId, String database, String system, String userName) throws ErrorException { + String dataSourceId, String database, String system, String userName) throws ErrorException { DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getTables", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database}, - List.class); + dsInfoResponse.getDsType(), + "getTables", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database}, + List.class); } return new ArrayList<>(); } @@ -125,22 +126,22 @@ public List getTablesByDsId( @Override @Deprecated public Map getPartitionPropsByDsId( - String dataSourceId, - String database, - String table, - String partition, - String system, - String userName) - throws ErrorException { + String dataSourceId, + String database, + String table, + String partition, + String system, + String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getPartitionProps", - new Object[] { - dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, partition - }, - Map.class); + dsInfoResponse.getDsType(), + "getPartitionProps", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, partition + }, + Map.class); } return new HashMap<>(); } @@ -148,15 +149,15 @@ public Map getPartitionPropsByDsId( @Override @Deprecated public Map getTablePropsByDsId( - String dataSourceId, String database, String table, String system, String userName) - throws ErrorException { + String dataSourceId, String database, String table, String system, String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getTableProps", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, - Map.class); + dsInfoResponse.getDsType(), + "getTableProps", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, + Map.class); } return new HashMap<>(); } @@ -164,22 +165,22 @@ public Map getTablePropsByDsId( @Override @Deprecated public MetaPartitionInfo getPartitionsByDsId( - String dataSourceId, - String database, - String table, - String system, - Boolean traverse, - String userName) - throws ErrorException { + String dataSourceId, + String database, + String table, + String system, + Boolean traverse, + String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getPartitions", - new Object[] { - dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, traverse - }, - MetaPartitionInfo.class); + dsInfoResponse.getDsType(), + "getPartitions", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, traverse + }, + MetaPartitionInfo.class); } return new MetaPartitionInfo(); } @@ -187,243 +188,243 @@ public MetaPartitionInfo getPartitionsByDsId( @Override @Deprecated public List getColumnsByDsId( - String dataSourceId, String database, String table, String system, String userName) - throws ErrorException { + String dataSourceId, String database, String table, String system, String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getColumns", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, - List.class); + dsInfoResponse.getDsType(), + "getColumns", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, + List.class); } return new ArrayList<>(); } @Override public List getDatabasesByDsName(String dataSourceName, String system, String userName) - throws ErrorException { + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getDatabases", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, - List.class); + dsInfoResponse.getDsType(), + "getDatabases", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, + List.class); } return new ArrayList<>(); } @Override public List getDatabasesByDsNameAndEnvId( - String dataSourceName, String system, String userName, String envId) throws ErrorException { + String dataSourceName, String system, String userName, String envId) throws ErrorException { DsInfoResponse dsInfoResponse = - queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getDatabases", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, - List.class); + dsInfoResponse.getDsType(), + "getDatabases", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, + List.class); } return new ArrayList<>(); } @Override public Map getConnectionInfoByDsName( - String dataSourceName, Map queryParams, String system, String userName) - throws ErrorException { + String dataSourceName, Map queryParams, String system, String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getConnectionInfo", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), queryParams}, - Map.class); + dsInfoResponse.getDsType(), + "getConnectionInfo", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), queryParams}, + Map.class); } return new HashMap<>(); } @Override public List getTablesByDsName( - String dataSourceName, String database, String system, String userName) - throws ErrorException { + String dataSourceName, String database, String system, String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getTables", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database}, - List.class); + dsInfoResponse.getDsType(), + "getTables", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database}, + List.class); } return new ArrayList<>(); } @Override public List getTablesByDsNameAndEnvId( - String dataSourceName, String database, String system, String userName, String envId) - throws ErrorException { + String dataSourceName, String database, String system, String userName, String envId) + throws ErrorException { DsInfoResponse dsInfoResponse = - queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getTables", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database}, - List.class); + dsInfoResponse.getDsType(), + "getTables", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database}, + List.class); } return new ArrayList<>(); } @Override public Map getPartitionPropsByDsName( - String dataSourceName, - String database, - String table, - String partition, - String system, - String userName) - throws ErrorException { + String dataSourceName, + String database, + String table, + String partition, + String system, + String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getPartitionProps", - new Object[] { - dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, partition - }, - Map.class); + dsInfoResponse.getDsType(), + "getPartitionProps", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, partition + }, + Map.class); } return new HashMap<>(); } @Override public Map getTablePropsByDsName( - String dataSourceName, String database, String table, String system, String userName) - throws ErrorException { + String dataSourceName, String database, String table, String system, String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getTableProps", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, - Map.class); + dsInfoResponse.getDsType(), + "getTableProps", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, + Map.class); } return new HashMap<>(); } @Override public MetaPartitionInfo getPartitionsByDsName( - String dataSourceName, - String database, - String table, - String system, - Boolean traverse, - String userName) - throws ErrorException { + String dataSourceName, + String database, + String table, + String system, + Boolean traverse, + String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getPartitions", - new Object[] { - dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, traverse - }, - MetaPartitionInfo.class); + dsInfoResponse.getDsType(), + "getPartitions", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table, traverse + }, + MetaPartitionInfo.class); } return new MetaPartitionInfo(); } @Override public List getColumnsByDsName( - String dataSourceName, String database, String table, String system, String userName) - throws ErrorException { + String dataSourceName, String database, String table, String system, String userName) + throws ErrorException { DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getColumns", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, - List.class); + dsInfoResponse.getDsType(), + "getColumns", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, + List.class); } return new ArrayList<>(); } @Override public List getColumnsByDsNameAndEnvId( - String dataSourceName, - String database, - String table, - String system, - String userName, - String envId) - throws ErrorException { + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException { DsInfoResponse dsInfoResponse = - queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getColumns", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, - List.class); + dsInfoResponse.getDsType(), + "getColumns", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, + List.class); } return new ArrayList<>(); } @Override public GenerateSqlInfo getSparkSqlByDsNameAndEnvId( - String dataSourceName, - String database, - String table, - String system, - String userName, - String envId) - throws ErrorException { + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException { DsInfoResponse dsInfoResponse = - queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { List columns = new ArrayList<>(); try { columns = - invokeMetaMethod( - dsInfoResponse.getDsType(), - "getColumns", - new Object[] { - dsInfoResponse.getCreator(), - dsInfoResponse.getParams(), - database, - dsInfoResponse - .getDsType() - .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue()) - ? "_doc" - : table - }, - List.class); + invokeMetaMethod( + dsInfoResponse.getDsType(), + "getColumns", + new Object[] { + dsInfoResponse.getCreator(), + dsInfoResponse.getParams(), + database, + dsInfoResponse + .getDsType() + .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue()) + ? "_doc" + : table + }, + List.class); } catch (Exception e) { logger.warn("Fail to get Sql columns(获取字段列表失败)"); } if (CacheConfiguration.MYSQL_RELATIONSHIP_LIST - .getValue() - .contains(dsInfoResponse.getDsType())) { + .getValue() + .contains(dsInfoResponse.getDsType())) { String sqlConnectUrl = - invokeMetaMethod( - dsInfoResponse.getDsType(), - "getSqlConnectUrl", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, - String.class); + invokeMetaMethod( + dsInfoResponse.getDsType(), + "getSqlConnectUrl", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, + String.class); return getSparkSqlByJdbc( - database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl); + database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl); } else if (dsInfoResponse.getDsType().equalsIgnoreCase(DataSourceTypeEnum.KAFKA.getValue())) { return getSparkSqlByKafka(table, dsInfoResponse.getParams()); } else if (dsInfoResponse - .getDsType() - .equalsIgnoreCase(DataSourceTypeEnum.MONGODB.getValue())) { + .getDsType() + .equalsIgnoreCase(DataSourceTypeEnum.MONGODB.getValue())) { return getSparkSqlByMongo(database, table, dsInfoResponse.getParams(), columns); } else if (dsInfoResponse - .getDsType() - .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())) { + .getDsType() + .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())) { return getSparkSqlByElasticsearch(table, dsInfoResponse.getParams(), columns); } } @@ -432,7 +433,7 @@ public GenerateSqlInfo getSparkSqlByDsNameAndEnvId( } public GenerateSqlInfo getSparkSqlByElasticsearch( - String table, Map params, List columns) { + String table, Map params, List columns) { GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); String[] endPoints = new String[] {}; @@ -451,12 +452,12 @@ public GenerateSqlInfo getSparkSqlByElasticsearch( HttpHost httpHost = HttpHost.create(endPoints[0]); String ddl = - String.format( - GenerateSqlTemplate.ES_DDL_SQL_TEMPLATE, - table, - httpHost.getHostName(), - httpHost.getPort(), - table); + String.format( + GenerateSqlTemplate.ES_DDL_SQL_TEMPLATE, + table, + httpHost.getHostName(), + httpHost.getPort(), + table); generateSqlInfo.setDdl(ddl); generateSqlInfo.setDml(GenerateSqlTemplate.generateDmlSql(table)); @@ -471,15 +472,15 @@ public GenerateSqlInfo getSparkSqlByElasticsearch( } public GenerateSqlInfo getSparkSqlByMongo( - String database, String table, Map params, List columns) { + String database, String table, Map params, List columns) { GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); String url = - String.format( - "mongodb://%s:%s/%s", - params.getOrDefault("host", ""), params.getOrDefault("port", ""), database); + String.format( + "mongodb://%s:%s/%s", + params.getOrDefault("host", ""), params.getOrDefault("port", ""), database); String ddl = - String.format(GenerateSqlTemplate.MONGO_DDL_SQL_TEMPLATE, table, url, database, table); + String.format(GenerateSqlTemplate.MONGO_DDL_SQL_TEMPLATE, table, url, database, table); generateSqlInfo.setDdl(ddl); generateSqlInfo.setDml(GenerateSqlTemplate.generateDmlSql(table)); @@ -487,10 +488,10 @@ public GenerateSqlInfo getSparkSqlByMongo( String columnStr = "*"; if (CollectionUtils.isNotEmpty(columns)) { columnStr = - columns.stream() - .filter(column -> !column.getName().equals("_id")) - .map(MetaColumnInfo::getName) - .collect(Collectors.joining(",")); + columns.stream() + .filter(column -> !column.getName().equals("_id")) + .map(MetaColumnInfo::getName) + .collect(Collectors.joining(",")); } generateSqlInfo.setDql(GenerateSqlTemplate.generateDqlSql(columnStr, table)); @@ -501,7 +502,7 @@ public GenerateSqlInfo getSparkSqlByKafka(String table, Map para GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); String kafkaServers = String.valueOf(params.getOrDefault("uris", "localhost:9092")); String ddl = - String.format(GenerateSqlTemplate.KAFKA_DDL_SQL_TEMPLATE, table, kafkaServers, table); + String.format(GenerateSqlTemplate.KAFKA_DDL_SQL_TEMPLATE, table, kafkaServers, table); generateSqlInfo.setDdl(ddl); generateSqlInfo.setDml(GenerateSqlTemplate.generateDmlSql(table)); @@ -511,28 +512,28 @@ public GenerateSqlInfo getSparkSqlByKafka(String table, Map para } public GenerateSqlInfo getSparkSqlByJdbc( - String database, - String table, - Map params, - List columns, - String sqlConnectUrl) { + String database, + String table, + Map params, + List columns, + String sqlConnectUrl) { GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); String sparkTableName = table.contains(".") ? table.substring(table.indexOf(".") + 1) : table; String url = - String.format( - sqlConnectUrl, - params.getOrDefault("host", ""), - params.getOrDefault("port", ""), - database); + String.format( + sqlConnectUrl, + params.getOrDefault("host", ""), + params.getOrDefault("port", ""), + database); String ddl = - String.format( - GenerateSqlTemplate.JDBC_DDL_SQL_TEMPLATE, - sparkTableName, - url, - table, - params.getOrDefault("username", ""), - params.getOrDefault("password", "")); + String.format( + GenerateSqlTemplate.JDBC_DDL_SQL_TEMPLATE, + sparkTableName, + url, + table, + params.getOrDefault("username", ""), + params.getOrDefault("password", "")); generateSqlInfo.setDdl(ddl); generateSqlInfo.setDml(GenerateSqlTemplate.generateDmlSql(table)); @@ -548,25 +549,174 @@ public GenerateSqlInfo getSparkSqlByJdbc( @Override public GenerateSqlInfo getJdbcSqlByDsNameAndEnvId( - String dataSourceName, - String database, - String table, - String system, - String userName, - String envId) - throws ErrorException { + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException { DsInfoResponse dsInfoResponse = - queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { return invokeMetaMethod( - dsInfoResponse.getDsType(), - "getJdbcSql", - new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, - GenerateSqlInfo.class); + dsInfoResponse.getDsType(), + "getJdbcSql", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table}, + GenerateSqlInfo.class); + } + return new GenerateSqlInfo(); + } + + @Override + public GenerateSqlInfo getFlinkSqlByDsNameAndEnvId( + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) { + DsInfoResponse dsInfoResponse = + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + + if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { + List columns = new ArrayList<>(); + try { + columns = + invokeMetaMethod( + dsInfoResponse.getDsType(), + "getColumns", + new Object[] { + dsInfoResponse.getCreator(), + dsInfoResponse.getParams(), + database, + dsInfoResponse + .getDsType() + .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue()) + ? "_doc" + : table + }, + List.class); + } catch (Exception e) { + logger.warn("Fail to get Sql columns(获取字段列表失败)"); + } + if (CacheConfiguration.MYSQL_RELATIONSHIP_LIST + .getValue() + .contains(dsInfoResponse.getDsType())) { + String sqlConnectUrl = + invokeMetaMethod( + dsInfoResponse.getDsType(), + "getSqlConnectUrl", + new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()}, + String.class); + + return getFlinkSqlByJdbc( + database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl); + } else if (dsInfoResponse.getDsType().equalsIgnoreCase(DataSourceTypeEnum.KAFKA.getValue())) { + return getFlinkSqlByKafka(table, dsInfoResponse.getParams(), columns); + } else if (dsInfoResponse + .getDsType() + .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())) { + return getFlinkSqlByElasticsearch(table, columns); + } } + return new GenerateSqlInfo(); } + public GenerateSqlInfo getFlinkSqlByElasticsearch(String table, List columns) { + GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); + String flinkTableName = table.contains(".") ? table.substring(table.indexOf(".") + 1) : table; + String columnStr = "*"; + String columnAndTYpeStr = "*"; + if (CollectionUtils.isNotEmpty(columns)) { + columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(",")); + columnAndTYpeStr = + columns.stream() + .map(column -> column.getName() + " " + column.getType()) + .collect(Collectors.joining(",")); + } + String ddl = + String.format(FlinkSqlTemplate.ES_DDL_SQL_TEMPLATE, flinkTableName, columnAndTYpeStr); + + generateSqlInfo.setDdl(ddl); + generateSqlInfo.setDml(FlinkSqlTemplate.generateDmlSql(table)); + generateSqlInfo.setDql(FlinkSqlTemplate.generateDqlSql(columnStr, table)); + + return generateSqlInfo; + } + + private GenerateSqlInfo getFlinkSqlByKafka( + String table, Map params, List columns) { + GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); + + String flinkTableName = table.contains(".") ? table.substring(table.indexOf(".") + 1) : table; + String columnStr = "*"; + String columnAndTYpeStr = "*"; + if (CollectionUtils.isNotEmpty(columns)) { + columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(",")); + columnAndTYpeStr = + columns.stream() + .map(column -> column.getName() + " " + column.getType()) + .collect(Collectors.joining(",")); + } + String kafkaServers = String.valueOf(params.getOrDefault("uris", "localhost:9092")); + String ddl = + String.format( + FlinkSqlTemplate.KAFKA_DDL_SQL_TEMPLATE, + flinkTableName, + columnAndTYpeStr, + kafkaServers); + + generateSqlInfo.setDdl(ddl); + generateSqlInfo.setDml(FlinkSqlTemplate.generateDmlSql(table)); + generateSqlInfo.setDql(FlinkSqlTemplate.generateDqlSql(columnStr, table)); + + return generateSqlInfo; + } + + private GenerateSqlInfo getFlinkSqlByJdbc( + String database, + String table, + Map params, + List columns, + String sqlConnectUrl) { + GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo(); + String flinkTableName = table.contains(".") ? table.substring(table.indexOf(".") + 1) : table; + + String columnStr = "*"; + String columnAndTYpeStr = "*"; + if (CollectionUtils.isNotEmpty(columns)) { + columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(",")); + columnAndTYpeStr = + columns.stream() + .map(column -> column.getName() + " " + column.getType()) + .collect(Collectors.joining(",")); + } + + String url = + String.format( + sqlConnectUrl, + params.getOrDefault("host", ""), + params.getOrDefault("port", ""), + database); + String ddl = + String.format( + FlinkSqlTemplate.JDBC_DDL_SQL_TEMPLATE, + flinkTableName, + columnAndTYpeStr, + url, + params.getOrDefault("username", ""), + params.getOrDefault("password", ""), + table); + + generateSqlInfo.setDdl(ddl); + generateSqlInfo.setDml(FlinkSqlTemplate.generateDmlSql(table)); + generateSqlInfo.setDql(FlinkSqlTemplate.generateDqlSql(columnStr, table)); + + return generateSqlInfo; + } + /** * Request to get data source information (type and connection parameters) * @@ -577,7 +727,7 @@ public GenerateSqlInfo getJdbcSqlByDsNameAndEnvId( */ @Deprecated public DsInfoResponse reqToGetDataSourceInfo(String dataSourceId, String system, String userName) - throws ErrorException { + throws ErrorException { Object rpcResult = null; try { rpcResult = dataSourceRpcSender.ask(new DsInfoQueryRequest(dataSourceId, null, system)); @@ -588,12 +738,12 @@ public DsInfoResponse reqToGetDataSourceInfo(String dataSourceId, String system, DsInfoResponse response = (DsInfoResponse) rpcResult; if (!response.getStatus()) { throw new ErrorException( - -1, "Error in Data Source Manager Server[数据源服务出错]: " + response.getErrorMsg()); + -1, "Error in Data Source Manager Server[数据源服务出错]: " + response.getErrorMsg()); } boolean hasPermission = - (AuthContext.isAdministrator(userName) - || (StringUtils.isNotBlank(response.getCreator()) - && userName.equals(response.getCreator()))); + (AuthContext.isAdministrator(userName) + || (StringUtils.isNotBlank(response.getCreator()) + && userName.equals(response.getCreator()))); if (!hasPermission) { throw new ErrorException(-1, "Don't have query permission for data source [没有数据源的查询权限]"); } else if (response.getParams().isEmpty()) { @@ -614,7 +764,7 @@ public DsInfoResponse reqToGetDataSourceInfo(String dataSourceId, String system, * @throws ErrorException */ public DsInfoResponse queryDataSourceInfoByName( - String dataSourceName, String system, String userName) throws ErrorException { + String dataSourceName, String system, String userName) throws ErrorException { return queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, null); } @@ -629,14 +779,14 @@ public DsInfoResponse queryDataSourceInfoByName( * @throws ErrorException */ public DsInfoResponse queryDataSourceInfoByNameAndEnvId( - String dataSourceName, String system, String userName, String envId) throws ErrorException { + String dataSourceName, String system, String userName, String envId) throws ErrorException { Object rpcResult = null; boolean useDefault = false; try { rpcResult = reqGetDefaultDataSource(dataSourceName); if (Objects.isNull(rpcResult)) { rpcResult = - dataSourceRpcSender.ask(new DsInfoQueryRequest(null, dataSourceName, system, envId)); + dataSourceRpcSender.ask(new DsInfoQueryRequest(null, dataSourceName, system, envId)); } else { useDefault = true; } @@ -647,12 +797,12 @@ public DsInfoResponse queryDataSourceInfoByNameAndEnvId( DsInfoResponse response = (DsInfoResponse) rpcResult; if (!response.getStatus()) { throw new ErrorException( - -1, "Error in Data Source Manager Server[数据源服务出错]: " + response.getErrorMsg()); + -1, "Error in Data Source Manager Server[数据源服务出错]: " + response.getErrorMsg()); } boolean hasPermission = - (AuthContext.isAdministrator(userName) - || (StringUtils.isNotBlank(response.getCreator()) - && userName.equals(response.getCreator()))); + (AuthContext.isAdministrator(userName) + || (StringUtils.isNotBlank(response.getCreator()) + && userName.equals(response.getCreator()))); if (!hasPermission) { throw new ErrorException(-1, "Don't have query permission for data source [没有数据源的查询权限]"); } else if (!useDefault && response.getParams().isEmpty()) { @@ -673,13 +823,13 @@ public DsInfoResponse queryDataSourceInfoByNameAndEnvId( private DsInfoResponse reqGetDefaultDataSource(String dataSourceName) { DataSource dataSource = DataSources.getDefault(dataSourceName); return (Objects.nonNull(dataSource)) - ? new DsInfoResponse( + ? new DsInfoResponse( true, dataSource.getDataSourceType().getName(), dataSource.getConnectParams(), dataSource.getCreateUser(), "") - : null; + : null; } /** @@ -690,17 +840,17 @@ private DsInfoResponse reqGetDefaultDataSource(String dataSourceName) { */ @SuppressWarnings("unchecked") private T invokeMetaMethod( - String dsType, String method, Object[] methodArgs, Class returnType) - throws MetaMethodInvokeException { + String dsType, String method, Object[] methodArgs, Class returnType) + throws MetaMethodInvokeException { BiFunction invoker; try { invoker = metaClassLoaderManager.getInvoker(dsType); } catch (Exception e) { // TODO ERROR CODE throw new MetaMethodInvokeException( - FAILED_METADATA_SERVICE.getErrorCode(), - "Load meta service for " + dsType + " fail 加载 [" + dsType + "] 元数据服务失败", - e); + FAILED_METADATA_SERVICE.getErrorCode(), + "Load meta service for " + dsType + " fail 加载 [" + dsType + "] 元数据服务失败", + e); } if (Objects.nonNull(invoker)) { try { @@ -714,11 +864,11 @@ private T invokeMetaMethod( } // TODO ERROR CODE throw new MetaMethodInvokeException( - method, - methodArgs, - INVOKE_METHOD_FAIL.getErrorCode(), - MessageFormat.format(INVOKE_METHOD_FAIL.getErrorDesc(), method, e.getMessage()), - e); + method, + methodArgs, + INVOKE_METHOD_FAIL.getErrorCode(), + MessageFormat.format(INVOKE_METHOD_FAIL.getErrorDesc(), method, e.getMessage()), + e); } } return null; diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/domain/FlinkSqlTemplate.java b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/domain/FlinkSqlTemplate.java new file mode 100644 index 0000000000..b28ede659a --- /dev/null +++ b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/domain/FlinkSqlTemplate.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.metadata.query.common.domain; + +public class FlinkSqlTemplate { + + public static final String ES_DDL_SQL_TEMPLATE = + "CREATE TABLE %s ( " + + "%s " + + ") " + + "WITH (" + + "'connector' = 'elasticsearch-7'," + + "'hosts' = 'http://localhost:9200'," + + "'index' = 'testIndex'" + + ")"; + + public static final String JDBC_DDL_SQL_TEMPLATE = + "CREATE TABLE %s ( " + + "%s " + + ") " + + "WITH (" + + "'connector' = 'jdbc'," + + "'url' = '%s'," + + "'username' = '%s'," + + "'password' = '%'," + + "'table-name' = '%s'" + + ")"; + + public static final String KAFKA_DDL_SQL_TEMPLATE = + "CREATE TABLE %s ( " + + "%s " + + ") " + + "WITH (" + + "'connector' = 'kafka'," + + "'properties.bootstrap.servers' = '%s'," + + "'properties.group.id' = 'testGroup'," + + "'topic' = 'testTopic'" + + "'format' = 'csv'" + + ")"; + + public static final String DML_SQL_TEMPLATE = "INSERT INTO %s SELECT * FROM ${resultTable}"; + + public static final String DQL_SQL_TEMPLATE = "SELECT %s FROM %s"; + + public static String generateDqlSql(String columns, String table) { + return String.format(FlinkSqlTemplate.DQL_SQL_TEMPLATE, columns, table); + } + + public static String generateDmlSql(String table) { + return String.format(FlinkSqlTemplate.DML_SQL_TEMPLATE, table); + } +}