diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index c0680127c47..ac804b3b364 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -27,6 +27,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6232](https://github.com/apache/incubator-seata/pull/6232)] convert to utf8mb4 if mysql column is json type - [[#6278](https://github.com/apache/incubator-seata/pull/6278)] fix ProtocolV1SerializerTest failed - [[#6324](https://github.com/apache/incubator-seata/pull/6324)] fix Parse protocol file failed +- [[#6348](https://github.com/apache/incubator-seata/pull/6348)] fix the problem that the number of mirrors before and after may be inconsisten - [[#6331](https://github.com/apache/incubator-seata/pull/6331)] fixed the problem that TCC nested transactions cannot add TwoPhaseBusinessAction and GlobalTransactional annotations at the same time - [[#6354](https://github.com/apache/incubator-seata/pull/6354)] fix dynamic degradation does not work properly - [[#6363](https://github.com/apache/incubator-seata/pull/6363)] fix known problems of docker image diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 5d3eaf99ebf..c9c1cd4e128 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -27,6 +27,7 @@ - [[#6232](https://github.com/apache/incubator-seata/pull/6232)] 修复在mysql的json类型下出现Cannot create a JSON value from a string with CHARACTER SET 'binary'问题 - [[#6278](https://github.com/apache/incubator-seata/pull/6278)] 修复 ProtocolV1SerializerTest 失败问题 - [[#6324](https://github.com/apache/incubator-seata/pull/6324)] 修复 Parse protocol file failed +- [[#6348](https://github.com/apache/incubator-seata/pull/6348)] 修复读已提交隔离级别下前镜像可能跟后镜像数据行不同的问题 - [[#6331](https://github.com/apache/incubator-seata/pull/6331)] 修复TCC嵌套事务不能同时添加TwoPhaseBusinessAction和GlobalTransactional两个注解的问题 - [[#6354](https://github.com/apache/incubator-seata/pull/6354)] 修复动态升降级不能正常工作问题 - [[#6363](https://github.com/apache/incubator-seata/pull/6363)] 修复docker镜像中的已知问题 diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java index 3ba490a0e82..ddcb7124b42 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java @@ -26,6 +26,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.rm.datasource.AbstractConnectionProxy; import org.apache.seata.rm.datasource.ConnectionContext; @@ -34,6 +35,7 @@ import org.apache.seata.rm.datasource.exception.TableMetaException; import org.apache.seata.rm.datasource.sql.struct.TableRecords; import org.apache.seata.sqlparser.SQLRecognizer; +import org.apache.seata.sqlparser.SQLType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +99,16 @@ protected T executeAutoCommitFalse(Object[] args) throws Exception { try { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); + int updateCount = statementProxy.getUpdateCount(); + if (updateCount > 0) { + if (SQLType.UPDATE == sqlRecognizer.getSQLType()) { + if (updateCount > beforeImage.size()) { + String errorMsg = + "Before image size is not equaled to after image size, probably because you use read committed, please retry transaction."; + throw new ShouldNeverHappenException(errorMsg); + } + } + } TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/UpdateExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/UpdateExecutor.java index 5459f13abbd..955e5294a19 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/UpdateExecutor.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/UpdateExecutor.java @@ -95,20 +95,27 @@ protected String buildBeforeImageSQL(TableMeta tableMeta, ArrayList @Override protected TableRecords afterImage(TableRecords beforeImage) throws SQLException { - TableMeta tmeta = getTableMeta(); if (beforeImage == null || beforeImage.size() == 0) { return TableRecords.empty(getTableMeta()); } - String selectSQL = buildAfterImageSQL(tmeta, beforeImage); + TableMeta tmeta = getTableMeta(); PreparedStatement pst = null; ResultSet rs = null; - try { - pst = statementProxy.getConnection().prepareStatement(selectSQL); - SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst); - rs = pst.executeQuery(); - return TableRecords.buildRecords(tmeta, rs); - } finally { - IOUtil.close(rs, pst); + SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer; + List whereColumns = recognizer.getWhereColumns(); + boolean contain = tmeta.containsPK(whereColumns); + if (contain) { + String selectSQL = buildAfterImageSQL(tmeta, beforeImage); + try { + pst = statementProxy.getConnection().prepareStatement(selectSQL); + SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst); + rs = pst.executeQuery(); + return TableRecords.buildRecords(tmeta, rs); + } finally { + IOUtil.close(rs, pst); + } + } else { + return beforeImage(); } } diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java index 3d3fd79c99a..2c65cdfa10c 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java @@ -20,10 +20,6 @@ import org.apache.seata.rm.datasource.ConnectionContext; import org.apache.seata.rm.datasource.ConnectionProxy; import org.apache.seata.rm.datasource.PreparedStatementProxy; -import org.apache.seata.rm.datasource.exec.AbstractDMLBaseExecutor; -import org.apache.seata.rm.datasource.exec.LockConflictException; -import org.apache.seata.rm.datasource.exec.LockWaitTimeoutException; -import org.apache.seata.rm.datasource.exec.StatementCallback; import org.apache.seata.rm.datasource.exec.mysql.MySQLInsertExecutor; import org.apache.seata.rm.datasource.exec.oracle.OracleInsertExecutor; import org.apache.seata.sqlparser.struct.TableMeta; diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/handler/EscapeHandlerTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/handler/EscapeHandlerTest.java index 9f8f55e5f29..f73db665e4b 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/handler/EscapeHandlerTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/handler/EscapeHandlerTest.java @@ -125,4 +125,35 @@ public void testInsertColumnsEscape() { Assertions.assertTrue(insertCol.contains("\"")); } } + + + @Test + public void testGetWhereColumns() { + String sql = "UPDATE t1 SET name1 = 'name1', name2 = 'name2' WHERE t1.id between ? and ? or `name1`= ? and name2= ?"; + + SQLStatement statement = SQLUtils.parseStatements(sql, "mysql").get(0); + + MySQLUpdateRecognizer mySQLUpdateRecognizer = new MySQLUpdateRecognizer(sql, statement); + List whereColumns = mySQLUpdateRecognizer.getWhereColumns(); + Assertions.assertEquals("id", whereColumns.get(0)); + Assertions.assertEquals("name1", whereColumns.get(1)); + Assertions.assertEquals("name2", whereColumns.get(2)); + sql = "UPDATE t1 SET name1 = 'name1', name2 = 'name2' WHERE id between ? and ?"; + + statement = SQLUtils.parseStatements(sql, "mysql").get(0); + + mySQLUpdateRecognizer = new MySQLUpdateRecognizer(sql, statement); + whereColumns = mySQLUpdateRecognizer.getWhereColumns(); + Assertions.assertEquals("id", whereColumns.get(0)); + + sql = "UPDATE t1 SET name1 = 'name1', name2 = 'name2' WHERE id in(?,? ) and createTime between ? and ?"; + + statement = SQLUtils.parseStatements(sql, "mysql").get(0); + + mySQLUpdateRecognizer = new MySQLUpdateRecognizer(sql, statement); + whereColumns = mySQLUpdateRecognizer.getWhereColumns(); + Assertions.assertEquals("id", whereColumns.get(0)); + Assertions.assertEquals("createTime", whereColumns.get(1)); + } + } diff --git a/sqlparser/seata-sqlparser-antlr/src/main/java/org/apache/seata/sqlparser/antlr/mysql/AntlrMySQLUpdateRecognizer.java b/sqlparser/seata-sqlparser-antlr/src/main/java/org/apache/seata/sqlparser/antlr/mysql/AntlrMySQLUpdateRecognizer.java index d7b99f79727..6990d97632b 100644 --- a/sqlparser/seata-sqlparser-antlr/src/main/java/org/apache/seata/sqlparser/antlr/mysql/AntlrMySQLUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-antlr/src/main/java/org/apache/seata/sqlparser/antlr/mysql/AntlrMySQLUpdateRecognizer.java @@ -16,6 +16,7 @@ */ package org.apache.seata.sqlparser.antlr.mysql; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.sqlparser.util.ColumnUtils; import org.apache.seata.sqlparser.ParametersHolder; import org.apache.seata.sqlparser.SQLType; @@ -29,6 +30,7 @@ import org.antlr.v4.runtime.tree.ParseTreeWalker; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -129,4 +131,24 @@ public String getTableName() { public String getOriginalSQL() { return sqlContext.getOriginalSQL(); } + + @Override + public List getWhereColumns() { + List sqls = sqlContext.getUpdateForWhereColumnNames(); + if (CollectionUtils.isNotEmpty(sqls)) { + List list = new ArrayList<>(sqls.size()); + for (MySqlContext.SQL sql : sqls) { + String column = sql.getUpdateWhereColumnName(); + int index = column.indexOf("."); + if (index > 0) { + // table.column -> column name + column = column.substring(index + 1); + } + list.add(column); + } + return list; + } + return Collections.emptyList(); + } + } diff --git a/sqlparser/seata-sqlparser-core/src/main/java/org/apache/seata/sqlparser/SQLUpdateRecognizer.java b/sqlparser/seata-sqlparser-core/src/main/java/org/apache/seata/sqlparser/SQLUpdateRecognizer.java index da5733a0726..325693cd130 100644 --- a/sqlparser/seata-sqlparser-core/src/main/java/org/apache/seata/sqlparser/SQLUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-core/src/main/java/org/apache/seata/sqlparser/SQLUpdateRecognizer.java @@ -55,4 +55,12 @@ default String getTableAlias(String tableName) { * @return (`a`, `b`, `c`) -> (a, b, c) */ List getUpdateColumnsUnEscape(); + + /** + * Gets where columns. + * + * @return the where columns + */ + List getWhereColumns(); + } diff --git a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/BaseRecognizer.java b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/BaseRecognizer.java index 71afd726584..714cabe1c84 100644 --- a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/BaseRecognizer.java +++ b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/BaseRecognizer.java @@ -16,16 +16,22 @@ */ package org.apache.seata.sqlparser.druid; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.SQLLimit; +import com.alibaba.druid.sql.ast.SQLObject; import com.alibaba.druid.sql.ast.SQLOrderBy; import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.ast.expr.SQLBetweenExpr; import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; import com.alibaba.druid.sql.ast.expr.SQLExistsExpr; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; import com.alibaba.druid.sql.ast.expr.SQLInListExpr; import com.alibaba.druid.sql.ast.expr.SQLInSubQueryExpr; import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; import com.alibaba.druid.sql.ast.statement.SQLInsertStatement; import com.alibaba.druid.sql.ast.statement.SQLMergeStatement; import com.alibaba.druid.sql.ast.statement.SQLReplaceStatement; @@ -33,6 +39,7 @@ import com.alibaba.druid.sql.visitor.SQLASTVisitor; import com.alibaba.druid.sql.visitor.SQLASTVisitorAdapter; import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.sqlparser.SQLParsingException; import org.apache.seata.sqlparser.SQLRecognizer; @@ -160,4 +167,60 @@ public boolean visit(SQLInsertStatement x) { getAst().accept(visitor); return true; } + + public List getWhereColumns(SQLExpr sqlExpr) { + // single condition + if (sqlExpr instanceof SQLBinaryOpExpr) { + return getWhereColumns(Collections.singletonList(sqlExpr)); + } else { + // multiple conditions + return getWhereColumns(sqlExpr.getChildren()); + } + } + + public List getWhereColumns(List list) { + if (CollectionUtils.isNotEmpty(list)) { + List columns = new ArrayList<>(list.size()); + for (SQLObject sqlObject : list) { + if (sqlObject instanceof SQLIdentifierExpr) { + columns.add(((SQLIdentifierExpr)sqlObject).getName()); + } else { + getWhereColumns(sqlObject, columns); + } + } + return columns; + } + return Collections.emptyList(); + } + + public void getWhereColumns(SQLObject sqlExpr, List list) { + if (sqlExpr instanceof SQLBinaryOpExpr) { + SQLExpr left = ((SQLBinaryOpExpr)sqlExpr).getLeft(); + getWhereColumn(left, list); + SQLExpr right = ((SQLBinaryOpExpr)sqlExpr).getRight(); + getWhereColumn(right, list); + } + } + + public void getWhereColumn(SQLExpr left, List list) { + if (left instanceof SQLBetweenExpr) { + SQLExpr expr = ((SQLBetweenExpr)left).getTestExpr(); + if (expr instanceof SQLIdentifierExpr) { + list.add(((SQLIdentifierExpr)expr).getName()); + } + if (expr instanceof SQLPropertyExpr) { + list.add(((SQLPropertyExpr)expr).getName()); + } + } else if (left instanceof SQLIdentifierExpr) { + list.add(((SQLIdentifierExpr)left).getName()); + } else if (left instanceof SQLInListExpr) { + SQLExpr expr = ((SQLInListExpr)left).getExpr(); + if (expr instanceof SQLIdentifierExpr) { + list.add(((SQLIdentifierExpr)expr).getName()); + } + } else if (left instanceof SQLBinaryOpExpr) { + getWhereColumns(left, list); + } + } + } diff --git a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/dm/DmUpdateRecognizer.java b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/dm/DmUpdateRecognizer.java index cd6f0c7a270..345333c2ab3 100644 --- a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/dm/DmUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/dm/DmUpdateRecognizer.java @@ -173,6 +173,13 @@ public List getUpdateColumnsUnEscape() { return ColumnUtils.delEscape(updateColumns, getDbType()); } + + @Override + public List getWhereColumns() { + SQLExpr where = ast.getWhere(); + return ColumnUtils.delEscape(super.getWhereColumns(where), getDbType()); + } + @Override protected SQLStatement getAst() { return this.ast; diff --git a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/mysql/MySQLUpdateRecognizer.java b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/mysql/MySQLUpdateRecognizer.java index 8eae8c2e740..cf6905b0239 100644 --- a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/mysql/MySQLUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/mysql/MySQLUpdateRecognizer.java @@ -249,4 +249,12 @@ public boolean visit(SQLExprTableSource x) { visitor.visit(tableSource); return tableName.toString(); } + + + @Override + public List getWhereColumns() { + SQLExpr where = ast.getWhere(); + return ColumnUtils.delEscape(super.getWhereColumns(where), getDbType()); + } + } diff --git a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/oracle/OracleUpdateRecognizer.java b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/oracle/OracleUpdateRecognizer.java index b5e567a39d2..93fe31b90e4 100644 --- a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/oracle/OracleUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/oracle/OracleUpdateRecognizer.java @@ -176,6 +176,13 @@ public boolean visit(SQLJoinTableSource x) { return sb.toString(); } + + @Override + public List getWhereColumns() { + SQLExpr where = ast.getWhere(); + return ColumnUtils.delEscape(super.getWhereColumns(where), getDbType()); + } + @Override protected SQLStatement getAst() { return ast; diff --git a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/postgresql/PostgresqlUpdateRecognizer.java b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/postgresql/PostgresqlUpdateRecognizer.java index 877c00be6f5..bb83d093de7 100644 --- a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/postgresql/PostgresqlUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/postgresql/PostgresqlUpdateRecognizer.java @@ -173,6 +173,12 @@ public String getOrderByCondition(ParametersHolder parametersHolder, ArrayList getWhereColumns() { + SQLExpr where = ast.getWhere(); + return ColumnUtils.delEscape(super.getWhereColumns(where), getDbType()); + } + @Override protected SQLStatement getAst() { return ast; diff --git a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/sqlserver/SqlServerUpdateRecognizer.java b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/sqlserver/SqlServerUpdateRecognizer.java index 4c674c01290..2fd6ce03d76 100644 --- a/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/sqlserver/SqlServerUpdateRecognizer.java +++ b/sqlparser/seata-sqlparser-druid/src/main/java/org/apache/seata/sqlparser/druid/sqlserver/SqlServerUpdateRecognizer.java @@ -201,6 +201,13 @@ public String getOrderByCondition(ParametersHolder parametersHolder, ArrayList getWhereColumns() { + SQLExpr where = ast.getWhere(); + return ColumnUtils.delEscape(super.getWhereColumns(where), getDbType()); + } + @Override protected SQLStatement getAst() { return ast; diff --git a/sqlparser/seata-sqlparser-druid/src/test/java/org/apache/seata/sqlparser/druid/MySQLUpdateRecognizerTest.java b/sqlparser/seata-sqlparser-druid/src/test/java/org/apache/seata/sqlparser/druid/MySQLUpdateRecognizerTest.java index 02dd7627e29..ad16c5bfea9 100644 --- a/sqlparser/seata-sqlparser-druid/src/test/java/org/apache/seata/sqlparser/druid/MySQLUpdateRecognizerTest.java +++ b/sqlparser/seata-sqlparser-druid/src/test/java/org/apache/seata/sqlparser/druid/MySQLUpdateRecognizerTest.java @@ -375,4 +375,5 @@ public void testGetUpdateColumns_2() { Assertions.assertTrue(updateColumn.contains("`")); } } + }