diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml
index 31e5e7b3fa..ec4e21fe59 100644
--- a/google-cloud-spanner/clirr-ignored-differences.xml
+++ b/google-cloud-spanner/clirr-ignored-differences.xml
@@ -850,6 +850,18 @@
     <className>com/google/cloud/spanner/connection/Connection</className>
     <method>java.lang.String getDefaultSequenceKind()</method>
   </difference>
+  
+  <!-- Default isolation level -->
+  <difference>
+    <differenceType>7012</differenceType>
+    <className>com/google/cloud/spanner/connection/Connection</className>
+    <method>void setDefaultIsolationLevel(com.google.spanner.v1.TransactionOptions$IsolationLevel)</method>
+  </difference>
+  <difference>
+    <differenceType>7012</differenceType>
+    <className>com/google/cloud/spanner/connection/Connection</className>
+    <method>com.google.spanner.v1.TransactionOptions$IsolationLevel getDefaultIsolationLevel()</method>
+  </difference>
 
   <!-- Removed ConnectionOptions$ConnectionProperty in favor of the more generic ConnectionProperty class. -->
   <difference>
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ClientSideStatementValueConverters.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ClientSideStatementValueConverters.java
index 09525d4fa2..89a33c8eca 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ClientSideStatementValueConverters.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ClientSideStatementValueConverters.java
@@ -33,6 +33,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.TransactionOptions;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.time.Duration;
@@ -382,6 +383,33 @@ public DirectedReadOptions convert(String value) {
     }
   }
 
+  /**
+   * Converter for converting strings to {@link
+   * com.google.spanner.v1.TransactionOptions.IsolationLevel} values.
+   */
+  static class IsolationLevelConverter
+      implements ClientSideStatementValueConverter<TransactionOptions.IsolationLevel> {
+    static final IsolationLevelConverter INSTANCE = new IsolationLevelConverter();
+
+    private final CaseInsensitiveEnumMap<TransactionOptions.IsolationLevel> values =
+        new CaseInsensitiveEnumMap<>(TransactionOptions.IsolationLevel.class);
+
+    private IsolationLevelConverter() {}
+
+    /** Constructor needed for reflection. */
+    public IsolationLevelConverter(String allowedValues) {}
+
+    @Override
+    public Class<TransactionOptions.IsolationLevel> getParameterClass() {
+      return TransactionOptions.IsolationLevel.class;
+    }
+
+    @Override
+    public TransactionOptions.IsolationLevel convert(String value) {
+      return values.get(value);
+    }
+  }
+
   /** Converter for converting strings to {@link AutocommitDmlMode} values. */
   static class AutocommitDmlModeConverter
       implements ClientSideStatementValueConverter<AutocommitDmlMode> {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java
index 7bf4e47bd9..b0c63e347a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java
@@ -42,6 +42,7 @@
 import com.google.spanner.v1.DirectedReadOptions;
 import com.google.spanner.v1.ExecuteBatchDmlRequest;
 import com.google.spanner.v1.ResultSetStats;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.Set;
@@ -219,6 +220,12 @@ public interface Connection extends AutoCloseable {
   /** @return <code>true</code> if this connection is in read-only mode */
   boolean isReadOnly();
 
+  /** Sets the default isolation level for read/write transactions for this connection. */
+  void setDefaultIsolationLevel(IsolationLevel isolationLevel);
+
+  /** Returns the default isolation level for read/write transactions for this connection. */
+  IsolationLevel getDefaultIsolationLevel();
+
   /**
    * Sets the duration the connection should wait before automatically aborting the execution of a
    * statement. The default is no timeout. Statement timeouts are applied all types of statements,
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
index 4c0c95a91a..24dc4d2303 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
@@ -27,6 +27,7 @@
 import static com.google.cloud.spanner.connection.ConnectionProperties.AUTO_PARTITION_MODE;
 import static com.google.cloud.spanner.connection.ConnectionProperties.DATA_BOOST_ENABLED;
 import static com.google.cloud.spanner.connection.ConnectionProperties.DDL_IN_TRANSACTION_MODE;
+import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_ISOLATION_LEVEL;
 import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_SEQUENCE_KIND;
 import static com.google.cloud.spanner.connection.ConnectionProperties.DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
 import static com.google.cloud.spanner.connection.ConnectionProperties.DIRECTED_READ;
@@ -90,6 +91,7 @@
 import com.google.spanner.v1.DirectedReadOptions;
 import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
 import com.google.spanner.v1.ResultSetStats;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.common.AttributesBuilder;
@@ -478,6 +480,7 @@ private void reset(Context context, boolean inTransaction) {
     this.connectionState.resetValue(RETRY_ABORTS_INTERNALLY, context, inTransaction);
     this.connectionState.resetValue(AUTOCOMMIT, context, inTransaction);
     this.connectionState.resetValue(READONLY, context, inTransaction);
+    this.connectionState.resetValue(DEFAULT_ISOLATION_LEVEL, context, inTransaction);
     this.connectionState.resetValue(READ_ONLY_STALENESS, context, inTransaction);
     this.connectionState.resetValue(OPTIMIZER_VERSION, context, inTransaction);
     this.connectionState.resetValue(OPTIMIZER_STATISTICS_PACKAGE, context, inTransaction);
@@ -635,6 +638,24 @@ public boolean isReadOnly() {
     return getConnectionPropertyValue(READONLY);
   }
 
+  @Override
+  public void setDefaultIsolationLevel(IsolationLevel isolationLevel) {
+    ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
+    ConnectionPreconditions.checkState(
+        !isBatchActive(), "Cannot default isolation level while in a batch");
+    ConnectionPreconditions.checkState(
+        !isTransactionStarted(),
+        "Cannot set default isolation level while a transaction is active");
+    setConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL, isolationLevel);
+    clearLastTransactionAndSetDefaultTransactionOptions();
+  }
+
+  @Override
+  public IsolationLevel getDefaultIsolationLevel() {
+    ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
+    return getConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL);
+  }
+
   private void clearLastTransactionAndSetDefaultTransactionOptions() {
     setDefaultTransactionOptions();
     this.currentUnitOfWork = null;
@@ -2196,6 +2217,7 @@ UnitOfWork createNewUnitOfWork(
               .setUsesEmulator(options.usesEmulator())
               .setUseAutoSavepointsForEmulator(options.useAutoSavepointsForEmulator())
               .setDatabaseClient(dbClient)
+              .setIsolationLevel(getConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL))
               .setDelayTransactionStartUntilFirstWrite(
                   getConnectionPropertyValue(DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE))
               .setKeepTransactionAlive(getConnectionPropertyValue(KEEP_TRANSACTION_ALIVE))
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java
index d31f14f658..54d3461b78 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionProperties.java
@@ -112,6 +112,7 @@
 import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.DdlInTransactionModeConverter;
 import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.DialectConverter;
 import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.DurationConverter;
+import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.IsolationLevelConverter;
 import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.LongConverter;
 import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.NonNegativeIntegerConverter;
 import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.ReadOnlyStalenessConverter;
@@ -123,7 +124,9 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import java.time.Duration;
+import java.util.Arrays;
 
 /** Utility class that defines all known connection properties. */
 public class ConnectionProperties {
@@ -397,13 +400,28 @@ public class ConnectionProperties {
           BOOLEANS,
           BooleanConverter.INSTANCE,
           Context.USER);
+  static final ConnectionProperty<IsolationLevel> DEFAULT_ISOLATION_LEVEL =
+      create(
+          "default_isolation_level",
+          "The transaction isolation level that is used by default for read/write transactions. "
+              + "The default is isolation_level_unspecified, which means that the connection will use the "
+              + "default isolation level of the database that it is connected to.",
+          IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
+          new IsolationLevel[] {
+            IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
+            IsolationLevel.SERIALIZABLE,
+            IsolationLevel.REPEATABLE_READ
+          },
+          IsolationLevelConverter.INSTANCE,
+          Context.USER);
   static final ConnectionProperty<AutocommitDmlMode> AUTOCOMMIT_DML_MODE =
       create(
           "autocommit_dml_mode",
           "Determines the transaction type that is used to execute "
               + "DML statements when the connection is in auto-commit mode.",
           AutocommitDmlMode.TRANSACTIONAL,
-          AutocommitDmlMode.values(),
+          // Add 'null' as a valid value.
+          Arrays.copyOf(AutocommitDmlMode.values(), AutocommitDmlMode.values().length + 1),
           AutocommitDmlModeConverter.INSTANCE,
           Context.USER);
   static final ConnectionProperty<Boolean> RETRY_ABORTS_INTERNALLY =
@@ -519,7 +537,8 @@ public class ConnectionProperties {
           RPC_PRIORITY_NAME,
           "Sets the priority for all RPC invocations from this connection (HIGH/MEDIUM/LOW). The default is HIGH.",
           DEFAULT_RPC_PRIORITY,
-          RpcPriority.values(),
+          // Add 'null' as a valid value.
+          Arrays.copyOf(RpcPriority.values(), RpcPriority.values().length + 1),
           RpcPriorityConverter.INSTANCE,
           Context.USER);
   static final ConnectionProperty<SavepointSupport> SAVEPOINT_SUPPORT =
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionState.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionState.java
index b732d617c2..4a6d2cf98f 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionState.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionState.java
@@ -27,10 +27,12 @@
 import com.google.cloud.spanner.connection.ConnectionProperty.Context;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Suppliers;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
@@ -233,6 +235,7 @@ private <T> void internalSetValue(
       T value,
       Map<String, ConnectionPropertyValue<?>> currentProperties,
       Context context) {
+    checkValidValue(property, value);
     ConnectionPropertyValue<T> newValue = cast(currentProperties.get(property.getKey()));
     if (newValue == null) {
       ConnectionPropertyValue<T> existingValue = cast(properties.get(property.getKey()));
@@ -249,6 +252,23 @@ private <T> void internalSetValue(
     currentProperties.put(property.getKey(), newValue);
   }
 
+  static <T> void checkValidValue(ConnectionProperty<T> property, T value) {
+    if (property.getValidValues() == null || property.getValidValues().length == 0) {
+      return;
+    }
+    if (Arrays.stream(property.getValidValues())
+        .noneMatch(validValue -> Objects.equals(validValue, value))) {
+      throw invalidParamValueError(property, value);
+    }
+  }
+
+  /** Creates an exception for an invalid value for a connection property. */
+  static <T> SpannerException invalidParamValueError(ConnectionProperty<T> property, T value) {
+    return SpannerExceptionFactory.newSpannerException(
+        ErrorCode.INVALID_ARGUMENT,
+        String.format("invalid value \"%s\" for configuration property \"%s\"", value, property));
+  }
+
   /** Creates an exception for an unknown connection property. */
   static SpannerException unknownParamError(ConnectionProperty<?> property) {
     return SpannerExceptionFactory.newSpannerException(
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
index 1f6ab6bf0c..3e4e98b16c 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
@@ -60,6 +60,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.spanner.v1.SpannerGrpc;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.context.Scope;
 import java.time.Duration;
@@ -151,6 +152,7 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction {
   private final long keepAliveIntervalMillis;
   private final ReentrantLock keepAliveLock;
   private final SavepointSupport savepointSupport;
+  @Nonnull private final IsolationLevel isolationLevel;
   private int transactionRetryAttempts;
   private int successfulRetries;
   private volatile ApiFuture<TransactionContext> txContextFuture;
@@ -202,6 +204,7 @@ static class Builder extends AbstractMultiUseTransaction.Builder<Builder, ReadWr
     private boolean returnCommitStats;
     private Duration maxCommitDelay;
     private SavepointSupport savepointSupport;
+    private IsolationLevel isolationLevel;
 
     private Builder() {}
 
@@ -251,6 +254,11 @@ Builder setSavepointSupport(SavepointSupport savepointSupport) {
       return this;
     }
 
+    Builder setIsolationLevel(IsolationLevel isolationLevel) {
+      this.isolationLevel = Preconditions.checkNotNull(isolationLevel);
+      return this;
+    }
+
     @Override
     ReadWriteTransaction build() {
       Preconditions.checkState(dbClient != null, "No DatabaseClient client specified");
@@ -259,6 +267,7 @@ ReadWriteTransaction build() {
       Preconditions.checkState(
           hasTransactionRetryListeners(), "TransactionRetryListeners are not specified");
       Preconditions.checkState(savepointSupport != null, "SavepointSupport is not specified");
+      Preconditions.checkState(isolationLevel != null, "IsolationLevel is not specified");
       return new ReadWriteTransaction(this);
     }
   }
@@ -293,6 +302,7 @@ private ReadWriteTransaction(Builder builder) {
     this.keepAliveLock = this.keepTransactionAlive ? new ReentrantLock() : null;
     this.retryAbortsInternally = builder.retryAbortsInternally;
     this.savepointSupport = builder.savepointSupport;
+    this.isolationLevel = Preconditions.checkNotNull(builder.isolationLevel);
     this.transactionOptions = extractOptions(builder);
   }
 
@@ -313,6 +323,9 @@ private TransactionOption[] extractOptions(Builder builder) {
     if (this.rpcPriority != null) {
       numOptions++;
     }
+    if (this.isolationLevel != IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED) {
+      numOptions++;
+    }
     TransactionOption[] options = new TransactionOption[numOptions];
     int index = 0;
     if (builder.returnCommitStats) {
@@ -330,6 +343,9 @@ private TransactionOption[] extractOptions(Builder builder) {
     if (this.rpcPriority != null) {
       options[index++] = Options.priority(this.rpcPriority);
     }
+    if (this.isolationLevel != IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED) {
+      options[index++] = Options.isolationLevel(this.isolationLevel);
+    }
     return options;
   }
 
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
index 8a5cd6a26d..123b71ff01 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
@@ -19,6 +19,7 @@
 import static com.google.cloud.spanner.connection.AbstractStatementParser.COMMIT_STATEMENT;
 import static com.google.cloud.spanner.connection.AbstractStatementParser.RUN_BATCH_STATEMENT;
 import static com.google.cloud.spanner.connection.ConnectionProperties.AUTOCOMMIT_DML_MODE;
+import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_ISOLATION_LEVEL;
 import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_SEQUENCE_KIND;
 import static com.google.cloud.spanner.connection.ConnectionProperties.MAX_COMMIT_DELAY;
 import static com.google.cloud.spanner.connection.ConnectionProperties.READONLY;
@@ -60,6 +61,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.spanner.admin.database.v1.DatabaseAdminGrpc;
 import com.google.spanner.v1.SpannerGrpc;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import io.opentelemetry.context.Scope;
 import java.util.Arrays;
 import java.util.UUID;
@@ -508,6 +510,10 @@ private TransactionRunner createWriteTransaction() {
     if (connectionState.getValue(MAX_COMMIT_DELAY).getValue() != null) {
       numOptions++;
     }
+    if (connectionState.getValue(DEFAULT_ISOLATION_LEVEL).getValue()
+        != IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED) {
+      numOptions++;
+    }
     if (numOptions == 0) {
       return dbClient.readWriteTransaction();
     }
@@ -526,6 +532,11 @@ private TransactionRunner createWriteTransaction() {
       options[index++] =
           Options.maxCommitDelay(connectionState.getValue(MAX_COMMIT_DELAY).getValue());
     }
+    if (connectionState.getValue(DEFAULT_ISOLATION_LEVEL).getValue()
+        != IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED) {
+      options[index++] =
+          Options.isolationLevel(connectionState.getValue(DEFAULT_ISOLATION_LEVEL).getValue());
+    }
     return dbClient.readWriteTransaction(options);
   }
 
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java
index c48c670335..c7c2308687 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java
@@ -16,22 +16,43 @@
 
 package com.google.cloud.spanner.connection;
 
+import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_ISOLATION_LEVEL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
 import com.google.spanner.v1.BeginTransactionRequest;
 import com.google.spanner.v1.CommitRequest;
 import com.google.spanner.v1.ExecuteBatchDmlRequest;
 import com.google.spanner.v1.ExecuteSqlRequest;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
+import java.util.Collections;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class AutoCommitMockServerTest extends AbstractMockServerTest {
 
+  @Parameter public IsolationLevel isolationLevel;
+
+  @Parameters(name = "isolationLevel = {0}")
+  public static Object[] data() {
+    return DEFAULT_ISOLATION_LEVEL.getValidValues();
+  }
+
+  @Override
+  protected ITConnection createConnection() {
+    return createConnection(
+        Collections.emptyList(),
+        Collections.emptyList(),
+        String.format(";default_isolation_level=%s", isolationLevel));
+  }
+
   @Test
   public void testQuery() {
     try (Connection connection = createConnection()) {
@@ -43,6 +64,9 @@ public void testQuery() {
     ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
     assertTrue(request.getTransaction().hasSingleUse());
     assertTrue(request.getTransaction().getSingleUse().hasReadOnly());
+    assertEquals(
+        IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
+        request.getTransaction().getSingleUse().getIsolationLevel());
     assertFalse(request.getLastStatement());
   }
 
@@ -56,6 +80,7 @@ public void testDml() {
     ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
     assertTrue(request.getTransaction().hasBegin());
     assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
     assertTrue(request.getLastStatement());
     assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
   }
@@ -71,6 +96,7 @@ public void testDmlReturning() {
     ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
     assertTrue(request.getTransaction().hasBegin());
     assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
     assertTrue(request.getLastStatement());
     assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
   }
@@ -89,6 +115,7 @@ public void testBatchDml() {
         mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class).get(0);
     assertTrue(request.getTransaction().hasBegin());
     assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
     assertTrue(request.getLastStatements());
     assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
   }
@@ -104,6 +131,8 @@ public void testPartitionedDml() {
     BeginTransactionRequest beginRequest =
         mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0);
     assertTrue(beginRequest.getOptions().hasPartitionedDml());
+    assertEquals(
+        IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, beginRequest.getOptions().getIsolationLevel());
     assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
     ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
     assertTrue(request.getTransaction().hasId());
@@ -122,6 +151,7 @@ public void testDmlAborted() {
     for (ExecuteSqlRequest request : mockSpanner.getRequestsOfType(ExecuteSqlRequest.class)) {
       assertTrue(request.getTransaction().hasBegin());
       assertTrue(request.getTransaction().getBegin().hasReadWrite());
+      assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
       assertTrue(request.getLastStatement());
     }
     assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class));
@@ -139,6 +169,7 @@ public void testDmlReturningAborted() {
     for (ExecuteSqlRequest request : mockSpanner.getRequestsOfType(ExecuteSqlRequest.class)) {
       assertTrue(request.getTransaction().hasBegin());
       assertTrue(request.getTransaction().getBegin().hasReadWrite());
+      assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
       assertTrue(request.getLastStatement());
     }
     assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class));
@@ -159,6 +190,7 @@ public void testBatchDmlAborted() {
         mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class)) {
       assertTrue(request.getTransaction().hasBegin());
       assertTrue(request.getTransaction().getBegin().hasReadWrite());
+      assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
       assertTrue(request.getLastStatements());
     }
     assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class));
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java
index 9fbb5b5bf1..17994b3468 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java
@@ -59,6 +59,7 @@
 import com.google.protobuf.ProtocolMessageEnum;
 import com.google.rpc.RetryInfo;
 import com.google.spanner.v1.ResultSetStats;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import io.grpc.Metadata;
 import io.grpc.StatusRuntimeException;
 import io.grpc.protobuf.ProtoUtils;
@@ -174,6 +175,7 @@ private ReadWriteTransaction createSubject(
     return ReadWriteTransaction.newBuilder()
         .setDatabaseClient(client)
         .setRetryAbortsInternally(withRetry)
+        .setIsolationLevel(IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED)
         .setSavepointSupport(SavepointSupport.FAIL_AFTER_ROLLBACK)
         .setTransactionRetryListeners(Collections.emptyList())
         .withStatementExecutor(new StatementExecutor())
@@ -473,6 +475,7 @@ public void testRetry() {
       ReadWriteTransaction subject =
           ReadWriteTransaction.newBuilder()
               .setRetryAbortsInternally(true)
+              .setIsolationLevel(IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED)
               .setSavepointSupport(SavepointSupport.FAIL_AFTER_ROLLBACK)
               .setTransactionRetryListeners(Collections.emptyList())
               .setDatabaseClient(client)
@@ -502,6 +505,7 @@ public void testChecksumResultSet() {
     ReadWriteTransaction transaction =
         ReadWriteTransaction.newBuilder()
             .setRetryAbortsInternally(true)
+            .setIsolationLevel(IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED)
             .setSavepointSupport(SavepointSupport.FAIL_AFTER_ROLLBACK)
             .setTransactionRetryListeners(Collections.emptyList())
             .setDatabaseClient(client)
@@ -737,6 +741,7 @@ public void testChecksumResultSetWithArray() {
     ReadWriteTransaction transaction =
         ReadWriteTransaction.newBuilder()
             .setRetryAbortsInternally(true)
+            .setIsolationLevel(IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED)
             .setSavepointSupport(SavepointSupport.FAIL_AFTER_ROLLBACK)
             .setTransactionRetryListeners(Collections.emptyList())
             .setDatabaseClient(client)
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java
index 91662ef866..9d4d7f65bf 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/RunTransactionMockServerTest.java
@@ -16,6 +16,7 @@
 
 package com.google.cloud.spanner.connection;
 
+import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_ISOLATION_LEVEL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -28,6 +29,8 @@
 import com.google.spanner.v1.CommitRequest;
 import com.google.spanner.v1.ExecuteSqlRequest;
 import com.google.spanner.v1.RollbackRequest;
+import com.google.spanner.v1.TransactionOptions;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
 import io.grpc.Status;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Test;
@@ -39,32 +42,48 @@ public class RunTransactionMockServerTest extends AbstractMockServerTest {
 
   @Test
   public void testRunTransaction() {
-    try (Connection connection = createConnection()) {
-      connection.runTransaction(
-          transaction -> {
-            assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
-            assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
-            return null;
-          });
+    for (IsolationLevel isolationLevel : DEFAULT_ISOLATION_LEVEL.getValidValues()) {
+      try (Connection connection = createConnection()) {
+        connection.setDefaultIsolationLevel(isolationLevel);
+        connection.runTransaction(
+            transaction -> {
+              assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
+              assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
+              return null;
+            });
+      }
+      assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+      assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
+      TransactionOptions transactionOptions =
+          mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getTransaction().getBegin();
+      assertEquals(isolationLevel, transactionOptions.getIsolationLevel());
+
+      mockSpanner.clearRequests();
     }
-    assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
-    assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
   }
 
   @Test
   public void testRunTransactionInAutoCommit() {
-    try (Connection connection = createConnection()) {
-      connection.setAutocommit(true);
+    for (IsolationLevel isolationLevel : DEFAULT_ISOLATION_LEVEL.getValidValues()) {
+      try (Connection connection = createConnection()) {
+        connection.setAutocommit(true);
+        connection.setDefaultIsolationLevel(isolationLevel);
 
-      connection.runTransaction(
-          transaction -> {
-            assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
-            assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
-            return null;
-          });
+        connection.runTransaction(
+            transaction -> {
+              assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
+              assertEquals(1L, transaction.executeUpdate(INSERT_STATEMENT));
+              return null;
+            });
+      }
+      assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+      assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
+      TransactionOptions transactionOptions =
+          mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getTransaction().getBegin();
+      assertEquals(isolationLevel, transactionOptions.getIsolationLevel());
+
+      mockSpanner.clearRequests();
     }
-    assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
-    assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
   }
 
   @Test
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java
new file mode 100644
index 0000000000..a6275af823
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner.connection;
+
+import static com.google.cloud.spanner.connection.ConnectionProperties.DEFAULT_ISOLATION_LEVEL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
+import com.google.spanner.v1.CommitRequest;
+import com.google.spanner.v1.ExecuteBatchDmlRequest;
+import com.google.spanner.v1.ExecuteSqlRequest;
+import com.google.spanner.v1.TransactionOptions.IsolationLevel;
+import java.util.Collections;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TransactionMockServerTest extends AbstractMockServerTest {
+
+  @Parameter public IsolationLevel isolationLevel;
+
+  @Parameters(name = "isolationLevel = {0}")
+  public static Object[] data() {
+    return DEFAULT_ISOLATION_LEVEL.getValidValues();
+  }
+
+  @Override
+  protected ITConnection createConnection() {
+    return createConnection(
+        Collections.emptyList(),
+        Collections.emptyList(),
+        String.format(";default_isolation_level=%s", isolationLevel));
+  }
+
+  @Test
+  public void testQuery() {
+    try (Connection connection = createConnection()) {
+      //noinspection EmptyTryBlock
+      try (ResultSet ignore = connection.executeQuery(SELECT1_STATEMENT)) {}
+      connection.commit();
+    }
+    assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+    ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
+    assertTrue(request.getTransaction().hasBegin());
+    assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
+    assertFalse(request.getLastStatement());
+    assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
+  }
+
+  @Test
+  public void testDml() {
+    try (Connection connection = createConnection()) {
+      connection.executeUpdate(INSERT_STATEMENT);
+      connection.commit();
+    }
+    assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+    ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
+    assertTrue(request.getTransaction().hasBegin());
+    assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
+    assertFalse(request.getLastStatement());
+    assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
+  }
+
+  @Test
+  public void testDmlReturning() {
+    try (Connection connection = createConnection()) {
+      //noinspection EmptyTryBlock
+      try (ResultSet ignore = connection.executeQuery(INSERT_RETURNING_STATEMENT)) {}
+      connection.commit();
+    }
+    assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+    ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
+    assertTrue(request.getTransaction().hasBegin());
+    assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
+    assertFalse(request.getLastStatement());
+    assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
+  }
+
+  @Test
+  public void testBatchDml() {
+    try (Connection connection = createConnection()) {
+      connection.startBatchDml();
+      connection.executeUpdate(INSERT_STATEMENT);
+      connection.executeUpdate(INSERT_STATEMENT);
+      connection.runBatch();
+      connection.commit();
+    }
+    assertEquals(1, mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class));
+    ExecuteBatchDmlRequest request =
+        mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class).get(0);
+    assertTrue(request.getTransaction().hasBegin());
+    assertTrue(request.getTransaction().getBegin().hasReadWrite());
+    assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
+    assertFalse(request.getLastStatements());
+    assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
+  }
+}