diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c169b47..6646605 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: coverage: xdebug ini-file: development - run: composer install - - run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 + - run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M - run: bash tests/wait-for-mysql.sh - run: vendor/bin/phpunit --coverage-text if: ${{ matrix.php >= 7.3 }} @@ -49,6 +49,6 @@ jobs: uses: docker://hhvm/hhvm:3.30-lts-latest with: args: hhvm composer.phar install - - run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 + - run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M - run: bash tests/wait-for-mysql.sh - run: docker run -i --rm --workdir=/data -v "$(pwd):/data" --net=host hhvm/hhvm:3.30-lts-latest hhvm vendor/bin/phpunit diff --git a/src/Io/Parser.php b/src/Io/Parser.php index c3006e9..32a056c 100644 --- a/src/Io/Parser.php +++ b/src/Io/Parser.php @@ -103,6 +103,10 @@ class Parser * @var Executor */ protected $executor; + /** + * Current packet for split packet paring + */ + protected $packet = null; public function __construct(DuplexStreamInterface $stream, Executor $executor) { @@ -165,22 +169,48 @@ public function handleData($data) return; } - $packet = $this->buffer->readBuffer($this->pctSize); + if ($this->packet !== null) { + /** + * We are in packet splitting + * Append data + */ + $packet = null; + $this->packet->append($this->buffer->read($this->pctSize)); + if ($this->pctSize < 0xffffff) { + /** + * We're done + */ + $packet = $this->packet; + $this->packet = null; + } + } else { + $packet = $this->buffer->readBuffer($this->pctSize); + } + /** + * Remember last packet size as split packets may have ended with 0 length packet. + */ + $lastPctSize = $this->pctSize; $this->state = self::STATE_STANDBY; $this->pctSize = self::PACKET_SIZE_HEADER; + if ($this->packet === null && $packet->length() === 0xffffff && $lastPctSize > 0) { + /** + * Start reading split packets + */ + $this->packet = $packet; + } elseif ($packet !== null) { + try { + $this->parsePacket($packet); + } catch (\UnderflowException $e) { + $this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e)); + $this->stream->close(); + return; + } - try { - $this->parsePacket($packet); - } catch (\UnderflowException $e) { - $this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e)); - $this->stream->close(); - return; - } - - if ($packet->length() !== 0) { - $this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)')); - $this->stream->close(); - return; + if ($packet->length() !== 0) { + $this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)')); + $this->stream->close(); + return; + } } } } @@ -262,7 +292,7 @@ private function parsePacket(Buffer $packet) $this->debug(sprintf("AffectedRows: %d, InsertId: %d, WarningCount:%d", $this->affectedRows, $this->insertId, $this->warningCount)); $this->onSuccess(); $this->nextRequest(); - } elseif ($fieldCount === 0xFE) { + } elseif ($fieldCount === 0xFE && $packet->length() < 0xfffffe) { // EOF Packet $packet->skip(4); // warn, status if ($this->rsState === self::RS_STATE_ROW) { @@ -388,7 +418,35 @@ public function onClose() public function sendPacket($packet) { - return $this->stream->write($this->buffer->buildInt3(\strlen($packet)) . $this->buffer->buildInt1($this->seq++) . $packet); + /** + * If packet is longer than 0xffffff (16M), we should split and send many packets + * + */ + $packet_len = \strlen($packet); + $this->debug('sendPacket: len: ' . $packet_len); + + if ($packet_len >= 0xffffff) { + $this->debug('Packet split: packet_len: ' . $packet_len); + $ret = null; + while ($packet_len > 0) { + $part = \substr($packet, 0, 0xffffff); + $part_len = \strlen($part); + $ret = $this->stream->write($this->buffer->buildInt3($part_len) . $this->buffer->buildInt1($this->seq++) . $part); + $packet = \substr($packet, $part_len); + $packet_len = \strlen($packet); + // If last part was exactly 0xffffff in size, we need to send an empty packet to signal end + // of packet splitting. + if (\strlen($packet) === 0 && $part_len === 0xffffff) { + $ret = $this->stream->write($this->buffer->buildInt3(0) . $this->buffer->buildInt1($this->seq++)); + } + } + $this->debug('Packet sent'); + return $ret; + } + /** + * Packet is below 16M + */ + return $this->stream->write($this->buffer->buildInt3($packet_len) . $this->buffer->buildInt1($this->seq++) . $packet); } protected function nextRequest($isHandshake = false) diff --git a/tests/ResultQueryTest.php b/tests/ResultQueryTest.php index c779c9c..afe22e8 100644 --- a/tests/ResultQueryTest.php +++ b/tests/ResultQueryTest.php @@ -476,6 +476,118 @@ public function testSelectAfterDelay() Loop::run(); } + protected function checkMaxAllowedPacket($connection, $min = 0x1100000) + { + return $connection->query('SHOW VARIABLES LIKE \'max_allowed_packet\'')->then( + function ($res) use ($min, $connection) { + $current = $res->resultRows[0]['Value']; + if ($current < $min) { + $this->markTestSkipped('max_allowed_packet too low: current: ' . $current . ' min: ' . $min); + } + return true; + } + ); + } + + public function testSelectStaticTextSplitPacketsExactlyBelow16MiB() + { + $uri = $this->getConnectionString(); + $connection = new MysqlClient($uri); + + $this->checkMaxAllowedPacket($connection, 0x1000000)->then( + function () use ($connection) { + /** + * This should be exactly below 16MiB packet + * + * x03 + "select ''" = len(10) + */ + $text = str_repeat('A', 0xffffff - 11); + $connection->query('select \'' . $text . '\'')->then( + function (MysqlResult $command) use ($text) { + $this->assertCount(1, $command->resultRows); + $this->assertCount(1, $command->resultRows[0]); + $this->assertSame($text, reset($command->resultRows[0])); + } + ); + $connection->quit(); + }, + function (\Exception $error) { + $this->markTestSkipped($error->getMessage()); + } + ); + Loop::run(); + } + + public function testSelectStaticTextSplitPacketsExactly16MiB() + { + $uri = $this->getConnectionString(); + $connection = new MysqlClient($uri); + + $this->checkMaxAllowedPacket($connection)->then( + function () use ($connection) { + /** + * This should be exactly at 16MiB packet + * + * x03 + "select ''" = len(10) + */ + $text = str_repeat('A', 0xffffff - 10); + $connection->query('select \'' . $text . '\'')->then( + function (MysqlResult $command) use ($text) { + $this->assertCount(1, $command->resultRows); + $this->assertCount(1, $command->resultRows[0]); + $this->assertSame($text, reset($command->resultRows[0])); + } + ); + $connection->quit(); + }, + function (\Exception $error) { + if (method_exists($this, 'assertStringContainsString')) { + // PHPUnit 9+ + $this->assertStringContainsString('max_allowed_packet too low: current:', $error->getMessage()); + } else { + // legacy PHPUnit < 9 + $this->assertContains('max_allowed_packet too low: current:', $error->getMessage()); + } + } + ); + Loop::run(); + } + + public function testSelectStaticTextSplitPacketsAbove16MiB() + { + $uri = $this->getConnectionString(); + $connection = new MysqlClient($uri); + + $this->checkMaxAllowedPacket($connection)->then( + function () use ($connection) { + /** + * This should be exactly at 16MiB + 10 packet + * + * x03 + "select ''" = len(10) + */ + $text = str_repeat('A', 0xffffff); + $connection->query('select \'' . $text . '\'')->then( + function (MysqlResult $command) use ($text) { + $this->assertCount(1, $command->resultRows); + $this->assertCount(1, $command->resultRows[0]); + $this->assertSame($text, reset($command->resultRows[0])); + } + ); + $connection->quit(); + }, + function (\Exception $error) { + if (method_exists($this, 'assertStringContainsString')) { + // PHPUnit 9+ + $this->assertStringContainsString('max_allowed_packet too low: current:', $error->getMessage()); + } else { + // legacy PHPUnit < 9 + $this->assertContains('max_allowed_packet too low: current:', $error->getMessage()); + } + } + ); + Loop::run(); + } + public function testQueryStreamStaticEmptyEmitsSingleRow() { $connection = $this->createConnection(Loop::get());