diff --git a/RELEASE.md b/RELEASE.md index aca14313..fbfe3ae6 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # Release Notes -## [5.7.1] - TBD +## [5.7.1] - 2025-12-15 +- Fix batch write consistency [issue #257](https://github.com/datastax/cassandra-data-migrator/issues/257). - Upgraded Java driver to `4.19.2` to consume fixes upstream for addressing the issue described at [CASSJAVA-116](https://issues.apache.org/jira/browse/CASSJAVA-116) issue. ## [5.7.0] - 2025-11-19 diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index ca38d06c..061c15db 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -16,6 +16,7 @@ package com.datastax.cdm.job; import java.math.BigInteger; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletionStage; @@ -48,6 +49,7 @@ public class CopyJobSession extends AbstractJobSession { public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); private TargetUpsertStatement targetUpsertStatement; private TargetSelectByPKStatement targetSelectByPKStatement; + private BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED); protected CopyJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { super(originSession, targetSession, propHelper); @@ -55,6 +57,8 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Pro isCounterTable = this.originSession.getCqlTable().isCounterTable(); fetchSize = this.originSession.getCqlTable().getFetchSizeInRows(); batchSize = this.originSession.getCqlTable().getBatchSize(); + batch.setConsistencyLevel(this.targetSession.getCqlTable().getWriteConsistencyLevel()) + .setTimeout(Duration.ofSeconds(10)); logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL()); logger.info("CQL -- target select: {}", this.targetSession.getTargetSelectByPKStatement().getCQL()); @@ -68,7 +72,6 @@ protected void processPartitionRange(PartitionRange range) { if (null != trackRunFeature) trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED, ""); - BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED); JobCounter jobCounter = range.getJobCounter(); try {