Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions src/Dplr/Dplr.php
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,94 @@ private function addTask(Task $task): self
return $this;
}

public function newTaskList(): TaskList
{
return new TaskList();
}

public function addTaskList(TaskList $taskList): self
{
if (-1 === $this->multipleThread) {
throw new \RuntimeException('task lists can be run only in threaded mode');
}

// Not sure...
// fill previous steps by empty tasks to sync with the main thread
// if ($this->multipleThread > 0) {
// // init thread if not exists
// if (!isset($this->tasks[$this->multipleThread])) {
// $this->tasks[$this->multipleThread] = [];
// }

// $mainThreadCount = count($this->tasks[0]);
// $currentThreadCount = count($this->tasks[$this->multipleThread]);
// if ($mainThreadCount <= $currentThreadCount) {
// throw new \RuntimeException(sprintf('Thread #%d is bigger than main thread', $this->multipleThread));
// }

// if ($currentThreadCount < $mainThreadCount - 1) {
// for ($i = $currentThreadCount; $i < $mainThreadCount - 1; ++$i) {
// $this->tasks[$this->multipleThread][] = null;
// }
// }
// }

$this->tasks[$this->multipleThread++] = $this->tasksFromTasksList($taskList);

return $this;
}

/** @return Task[] */
private function tasksFromTasksList(TaskList $taskList): array
{
$tasks = [];

foreach ($taskList->getTasks() as $t) {
$servers = null;
$serverGroup = $t->getServerGroup();
if (null !== $serverGroup) {
$servers = $this->getServersByGroup($serverGroup);
if (!count($servers)) {
throw new \InvalidArgumentException(sprintf('Not found servers for group "%s"', $serverGroup));
}
}

switch (true) {
case $t instanceof PreProcessedUpload:
$data = [
'Action' => Task::ACTION_SCP,
'Source' => $t->getSource(),
'Target' => $t->getTarget(),
'Hosts' => $serverGroup ? $servers : $this->getServers(),
'Timeout' => ($t->getTimeout() > 0 ? $t->getTimeout() : $this->defaultTimeout) * 1000,
];
break;

case $t instanceof PreProcessedCommand:
$data = [
'Action' => Task::ACTION_SSH,
'Cmd' => $t->getCommand(),
'Hosts' => $serverGroup ? $servers : $this->getServers(),
'Timeout' => ($t->getTimeout() > 0 ? $t->getTimeout() : $this->defaultTimeout) * 1000,
];
break;

default:
throw new \RuntimeException(\sprintf(
'unknown task type: %s', gettype($t),
));
}

$tasks[] = new Task(
$data,
$t->getOnSuccess(),
$t->getOnFailure(),
);
}

return $tasks;
}

/*
* Adding command task.
*/
Expand Down
22 changes: 22 additions & 0 deletions src/Dplr/PreProcessedCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Dplr;

class PreProcessedCommand extends PreProcessedTask
{
protected $cmd;

public function __construct(
string $cmd,
?string $serverGroup = null, ?int $timeout = null, ?callable $onSuccess = null, ?callable $onFailure = null)
{
parent::__construct($serverGroup, $timeout, $onSuccess, $onFailure);

$this->cmd = $cmd;
}

public function getCommand(): string
{
return $this->cmd;
}
}
43 changes: 43 additions & 0 deletions src/Dplr/PreProcessedTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Dplr;

abstract class PreProcessedTask
{
protected $timeout;
protected $serverGroup;
protected $onSuccess;
protected $onFailure;

public function __construct(
?string $serverGroup = null,
?int $timeout = null,
?callable $onSuccess = null,
?callable $onFailure = null,
) {
$this->serverGroup = $serverGroup;
$this->timeout = $timeout;
$this->onSuccess = $onSuccess;
$this->onFailure = $onFailure;
}

public function getTimeout(): ?int
{
return $this->timeout;
}

public function getServerGroup(): ?string
{
return $this->serverGroup;
}

public function getOnSuccess(): ?callable
{
return $this->onSuccess;
}

public function getOnFailure(): ?callable
{
return $this->onFailure;
}
}
30 changes: 30 additions & 0 deletions src/Dplr/PreProcessedUpload.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Dplr;

class PreProcessedUpload extends PreProcessedTask
{
protected $source;
protected $target;

public function __construct(
string $source,
string $target,
?string $serverGroup = null, ?int $timeout = null, ?callable $onSuccess = null, ?callable $onFailure = null)
{
parent::__construct($serverGroup, $timeout, $onSuccess, $onFailure);

$this->source = $source;
$this->target = $target;
}

public function getSource(): string
{
return $this->source;
}

public function getTarget(): string
{
return $this->target;
}
}
64 changes: 64 additions & 0 deletions src/Dplr/TaskList.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

namespace Dplr;

class TaskList
{
/** @var PreProcessedTask[] */
private array $tasks = [];

/*
* Adding command task.
*/
public function command(
string $command,
?string $serverGroup = null,
?int $timeout = null,
?callable $onSuccess = null,
?callable $onFailure = null,
): self {
$this->tasks[] = new PreProcessedCommand(
$command,
$serverGroup,
$timeout,
$onSuccess,
$onFailure,
);

return $this;
}

/*
* Adding uploading task.
*/
public function upload(
string $localFile,
string $remoteFile,
?string $serverGroup = null,
?int $timeout = null,
?callable $onSuccess = null,
?callable $onFailure = null,
): self {
$this->tasks[] = new PreProcessedUpload(
$localFile,
$remoteFile,
$serverGroup,
$timeout,
$onSuccess,
$onFailure,
);

return $this;
}

/** @return PreProcessedTask[] */
public function getTasks(): array
{
return $this->tasks;
}

public function isEmpty(): bool
{
return [] === $this->tasks;
}
}
98 changes: 98 additions & 0 deletions tests/Dplr/Tests/DplrTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,104 @@ public function testMultiThread(): void
}
}

public function testMultiThreadOrderMismatch(): void
{
$d = new Dplr(self::USER, self::GOSSHA_PATH, self::SSH_KEY);

$d
->addServer('remote_1', ['remote_1'])
->addServer('remote_2', ['remote_2'])
->addServer('remote_3', ['remote_3'])
;

$d->multi();
foreach (['remote_1', 'remote_2', 'remote_3'] as $srv) {
$d->command('sleep 1', $srv);
$d->command('echo ' . $srv, $srv);
}
$d->end();

$this->assertTrue($d->hasTasks());

$output = '';
$d->run(function (string $s) use (&$output) {
$output .= $s;
});

$this->assertTrue($d->isSuccessful());
$this->assertFalse($d->hasTasks());
$this->assertEquals(
"CMD sleep 1 \nCMD echo remote_1 \nCMD sleep 1 \nCMD echo remote_2 \nCMD sleep 1 \nCMD echo remote_3 ......\n",
$output
);

$report = $d->getReport();
$this->assertEquals(6, $report['total']);
$this->assertEquals(6, $report['successful']);
$this->assertEquals(0, $report['failed']);
}

public function testMultiTaskList(): void
{
$d = new Dplr(self::USER, self::GOSSHA_PATH, self::SSH_KEY);

$d
->addServer('remote_1', ['remote_1'])
->addServer('remote_2', ['remote_2'])
->addServer('remote_3', ['remote_3'])
;

$d->multi();
$d->addTaskList(
$d->newTaskList()
->command('echo r11', 'remote_1')
->command('echo r12', 'remote_1')
->command('echo r13', 'remote_1')
);
$d->addTaskList(
$d->newTaskList()
->command('echo r21', 'remote_2')
->command('echo r22', 'remote_2')
);
$d->addTaskList(
$d->newTaskList()
->command('echo r31', 'remote_3')
);
$d->end();

$output = '';
$d->run(function (string $s) use (&$output) {
$output .= $s;
});

$this->assertTrue($d->isSuccessful());
$this->assertFalse($d->hasTasks());
$this->assertEquals(
"CMD echo r11 \nCMD echo r21 \nCMD echo r31 ...\nCMD echo r12 \nCMD echo r22 ..\nCMD echo r13 .\n",
$output
);

$report = $d->getReport();
$this->assertEquals(6, $report['total']);
$this->assertEquals(6, $report['successful']);
$this->assertEquals(0, $report['failed']);

$reports = [
'r11',
'r21',
'r31',
'r12',
'r22',
'r13',
];
foreach ($d->getReports() as $idx => $report) {
$this->assertEquals(
$reports[$idx],
trim($report->getOutput()),
);
}
}

public function testLimitedConcurrency(): void
{
$d = new Dplr(self::USER, self::GOSSHA_PATH, self::SSH_KEY, 16, 1);
Expand Down