@@ -81,6 +81,8 @@ class Connection extends PrimaryReadReplicaConnection {
8181
8282 protected ?float $ transactionActiveSince = null ;
8383
84+ protected $ tableDirtyWrites = [];
85+
8486 /**
8587 * Initializes a new instance of the Connection class.
8688 *
@@ -257,13 +259,35 @@ public function prepare($sql, $limit = null, $offset = null): Statement {
257259 * @throws \Doctrine\DBAL\Exception
258260 */
259261 public function executeQuery (string $ sql , array $ params = [], $ types = [], QueryCacheProfile $ qcp = null ): Result {
262+ $ tables = $ this ->getQueriedTables ($ sql );
263+ if (count (array_intersect ($ this ->tableDirtyWrites , $ tables )) === 0 && !$ this ->isTransactionActive ()) {
264+ // No tables read that could have been written already in the same request and no transaction active
265+ // so we can switch back to the replica for reading as long as no writes happen that switch back to the primary
266+ $ this ->ensureConnectedToReplica ();
267+ $ this ->logger ->debug ('no dirty table reads: ' . $ sql , ['tables ' => $ this ->tableDirtyWrites , 'reads ' => $ tables ]);
268+ } else {
269+ // Read to a table that was previously written to
270+ // While this might not necessarily mean that we did a read after write it is an indication for a code path to check
271+ $ this ->logger ->debug ('dirty table reads: ' . $ sql , ['tables ' => $ this ->tableDirtyWrites , 'reads ' => $ tables , 'exception ' => new \Exception ()]);
272+ }
273+
260274 $ sql = $ this ->replaceTablePrefix ($ sql );
261275 $ sql = $ this ->adapter ->fixupStatement ($ sql );
262276 $ this ->queriesExecuted ++;
263277 $ this ->logQueryToFile ($ sql );
264278 return parent ::executeQuery ($ sql , $ params , $ types , $ qcp );
265279 }
266280
281+ /**
282+ * Helper function to get the list of tables affected by a given query
283+ * used to track dirty tables that received a write with the current request
284+ */
285+ private function getQueriedTables (string $ sql ): array {
286+ $ re = '/(\*PREFIX\*[A-z0-9_-]+)/mi ' ;
287+ preg_match_all ($ re , $ sql , $ matches );
288+ return array_map ([$ this , 'replaceTablePrefix ' ], $ matches [0 ] ?? []);
289+ }
290+
267291 /**
268292 * @throws Exception
269293 */
@@ -290,6 +314,9 @@ public function executeUpdate(string $sql, array $params = [], array $types = []
290314 * @throws \Doctrine\DBAL\Exception
291315 */
292316 public function executeStatement ($ sql , array $ params = [], array $ types = []): int {
317+ $ tables = $ this ->getQueriedTables ($ sql );
318+ $ this ->tableDirtyWrites = array_unique (array_merge ($ this ->tableDirtyWrites , $ tables ));
319+ $ this ->logger ->debug ('dirty table writes: ' . $ sql , ['tables ' => $ this ->tableDirtyWrites ]);
293320 $ sql = $ this ->replaceTablePrefix ($ sql );
294321 $ sql = $ this ->adapter ->fixupStatement ($ sql );
295322 $ this ->queriesExecuted ++;
0 commit comments