diff --git a/src/Dplr/Dplr.php b/src/Dplr/Dplr.php index bec547f..c156f95 100644 --- a/src/Dplr/Dplr.php +++ b/src/Dplr/Dplr.php @@ -220,6 +220,94 @@ private function addTask(Task $task): self return $this; } + public function newTaskList(): TaskList + { + return new TaskList(); + } + + public function addTaskList(TaskList $taskList): self + { + if (-1 === $this->multipleThread) { + throw new \RuntimeException('task lists can be run only in threaded mode'); + } + + // Not sure... + // fill previous steps by empty tasks to sync with the main thread + // if ($this->multipleThread > 0) { + // // init thread if not exists + // if (!isset($this->tasks[$this->multipleThread])) { + // $this->tasks[$this->multipleThread] = []; + // } + + // $mainThreadCount = count($this->tasks[0]); + // $currentThreadCount = count($this->tasks[$this->multipleThread]); + // if ($mainThreadCount <= $currentThreadCount) { + // throw new \RuntimeException(sprintf('Thread #%d is bigger than main thread', $this->multipleThread)); + // } + + // if ($currentThreadCount < $mainThreadCount - 1) { + // for ($i = $currentThreadCount; $i < $mainThreadCount - 1; ++$i) { + // $this->tasks[$this->multipleThread][] = null; + // } + // } + // } + + $this->tasks[$this->multipleThread++] = $this->tasksFromTasksList($taskList); + + return $this; + } + + /** @return Task[] */ + private function tasksFromTasksList(TaskList $taskList): array + { + $tasks = []; + + foreach ($taskList->getTasks() as $t) { + $servers = null; + $serverGroup = $t->getServerGroup(); + if (null !== $serverGroup) { + $servers = $this->getServersByGroup($serverGroup); + if (!count($servers)) { + throw new \InvalidArgumentException(sprintf('Not found servers for group "%s"', $serverGroup)); + } + } + + switch (true) { + case $t instanceof PreProcessedUpload: + $data = [ + 'Action' => Task::ACTION_SCP, + 'Source' => $t->getSource(), + 'Target' => $t->getTarget(), + 'Hosts' => $serverGroup ? $servers : $this->getServers(), + 'Timeout' => ($t->getTimeout() > 0 ? $t->getTimeout() : $this->defaultTimeout) * 1000, + ]; + break; + + case $t instanceof PreProcessedCommand: + $data = [ + 'Action' => Task::ACTION_SSH, + 'Cmd' => $t->getCommand(), + 'Hosts' => $serverGroup ? $servers : $this->getServers(), + 'Timeout' => ($t->getTimeout() > 0 ? $t->getTimeout() : $this->defaultTimeout) * 1000, + ]; + break; + + default: + throw new \RuntimeException(\sprintf( + 'unknown task type: %s', gettype($t), + )); + } + + $tasks[] = new Task( + $data, + $t->getOnSuccess(), + $t->getOnFailure(), + ); + } + + return $tasks; + } + /* * Adding command task. */ diff --git a/src/Dplr/PreProcessedCommand.php b/src/Dplr/PreProcessedCommand.php new file mode 100644 index 0000000..73bcf41 --- /dev/null +++ b/src/Dplr/PreProcessedCommand.php @@ -0,0 +1,22 @@ +cmd = $cmd; + } + + public function getCommand(): string + { + return $this->cmd; + } +} diff --git a/src/Dplr/PreProcessedTask.php b/src/Dplr/PreProcessedTask.php new file mode 100644 index 0000000..20ab287 --- /dev/null +++ b/src/Dplr/PreProcessedTask.php @@ -0,0 +1,43 @@ +serverGroup = $serverGroup; + $this->timeout = $timeout; + $this->onSuccess = $onSuccess; + $this->onFailure = $onFailure; + } + + public function getTimeout(): ?int + { + return $this->timeout; + } + + public function getServerGroup(): ?string + { + return $this->serverGroup; + } + + public function getOnSuccess(): ?callable + { + return $this->onSuccess; + } + + public function getOnFailure(): ?callable + { + return $this->onFailure; + } +} diff --git a/src/Dplr/PreProcessedUpload.php b/src/Dplr/PreProcessedUpload.php new file mode 100644 index 0000000..fe73cfe --- /dev/null +++ b/src/Dplr/PreProcessedUpload.php @@ -0,0 +1,30 @@ +source = $source; + $this->target = $target; + } + + public function getSource(): string + { + return $this->source; + } + + public function getTarget(): string + { + return $this->target; + } +} diff --git a/src/Dplr/TaskList.php b/src/Dplr/TaskList.php new file mode 100644 index 0000000..1d5a5c5 --- /dev/null +++ b/src/Dplr/TaskList.php @@ -0,0 +1,64 @@ +tasks[] = new PreProcessedCommand( + $command, + $serverGroup, + $timeout, + $onSuccess, + $onFailure, + ); + + return $this; + } + + /* + * Adding uploading task. + */ + public function upload( + string $localFile, + string $remoteFile, + ?string $serverGroup = null, + ?int $timeout = null, + ?callable $onSuccess = null, + ?callable $onFailure = null, + ): self { + $this->tasks[] = new PreProcessedUpload( + $localFile, + $remoteFile, + $serverGroup, + $timeout, + $onSuccess, + $onFailure, + ); + + return $this; + } + + /** @return PreProcessedTask[] */ + public function getTasks(): array + { + return $this->tasks; + } + + public function isEmpty(): bool + { + return [] === $this->tasks; + } +} diff --git a/tests/Dplr/Tests/DplrTest.php b/tests/Dplr/Tests/DplrTest.php index 8ad504d..78f1929 100644 --- a/tests/Dplr/Tests/DplrTest.php +++ b/tests/Dplr/Tests/DplrTest.php @@ -209,6 +209,104 @@ public function testMultiThread(): void } } + public function testMultiThreadOrderMismatch(): void + { + $d = new Dplr(self::USER, self::GOSSHA_PATH, self::SSH_KEY); + + $d + ->addServer('remote_1', ['remote_1']) + ->addServer('remote_2', ['remote_2']) + ->addServer('remote_3', ['remote_3']) + ; + + $d->multi(); + foreach (['remote_1', 'remote_2', 'remote_3'] as $srv) { + $d->command('sleep 1', $srv); + $d->command('echo ' . $srv, $srv); + } + $d->end(); + + $this->assertTrue($d->hasTasks()); + + $output = ''; + $d->run(function (string $s) use (&$output) { + $output .= $s; + }); + + $this->assertTrue($d->isSuccessful()); + $this->assertFalse($d->hasTasks()); + $this->assertEquals( + "CMD sleep 1 \nCMD echo remote_1 \nCMD sleep 1 \nCMD echo remote_2 \nCMD sleep 1 \nCMD echo remote_3 ......\n", + $output + ); + + $report = $d->getReport(); + $this->assertEquals(6, $report['total']); + $this->assertEquals(6, $report['successful']); + $this->assertEquals(0, $report['failed']); + } + + public function testMultiTaskList(): void + { + $d = new Dplr(self::USER, self::GOSSHA_PATH, self::SSH_KEY); + + $d + ->addServer('remote_1', ['remote_1']) + ->addServer('remote_2', ['remote_2']) + ->addServer('remote_3', ['remote_3']) + ; + + $d->multi(); + $d->addTaskList( + $d->newTaskList() + ->command('echo r11', 'remote_1') + ->command('echo r12', 'remote_1') + ->command('echo r13', 'remote_1') + ); + $d->addTaskList( + $d->newTaskList() + ->command('echo r21', 'remote_2') + ->command('echo r22', 'remote_2') + ); + $d->addTaskList( + $d->newTaskList() + ->command('echo r31', 'remote_3') + ); + $d->end(); + + $output = ''; + $d->run(function (string $s) use (&$output) { + $output .= $s; + }); + + $this->assertTrue($d->isSuccessful()); + $this->assertFalse($d->hasTasks()); + $this->assertEquals( + "CMD echo r11 \nCMD echo r21 \nCMD echo r31 ...\nCMD echo r12 \nCMD echo r22 ..\nCMD echo r13 .\n", + $output + ); + + $report = $d->getReport(); + $this->assertEquals(6, $report['total']); + $this->assertEquals(6, $report['successful']); + $this->assertEquals(0, $report['failed']); + + $reports = [ + 'r11', + 'r21', + 'r31', + 'r12', + 'r22', + 'r13', + ]; + foreach ($d->getReports() as $idx => $report) { + $this->assertEquals( + $reports[$idx], + trim($report->getOutput()), + ); + } + } + public function testLimitedConcurrency(): void { $d = new Dplr(self::USER, self::GOSSHA_PATH, self::SSH_KEY, 16, 1);