2021-11-23 13:20:36 +00:00
|
|
|
<?php
|
|
|
|
|
|
|
|
declare (strict_types=1);
|
|
|
|
namespace Rector\Parallel\Application;
|
|
|
|
|
|
|
|
use Closure;
|
2022-01-04 01:08:47 +00:00
|
|
|
use RectorPrefix20220104\Clue\React\NDJson\Decoder;
|
|
|
|
use RectorPrefix20220104\Clue\React\NDJson\Encoder;
|
|
|
|
use RectorPrefix20220104\Nette\Utils\Random;
|
|
|
|
use RectorPrefix20220104\React\EventLoop\StreamSelectLoop;
|
|
|
|
use RectorPrefix20220104\React\Socket\ConnectionInterface;
|
|
|
|
use RectorPrefix20220104\React\Socket\TcpServer;
|
2021-11-23 13:20:36 +00:00
|
|
|
use Rector\Core\Configuration\Option;
|
|
|
|
use Rector\Core\Console\Command\ProcessCommand;
|
|
|
|
use Rector\Core\Console\Command\WorkerCommand;
|
|
|
|
use Rector\Core\ValueObject\Error\SystemError;
|
|
|
|
use Rector\Core\ValueObject\Reporting\FileDiff;
|
|
|
|
use Rector\Parallel\Command\WorkerCommandLineFactory;
|
|
|
|
use Rector\Parallel\ValueObject\Bridge;
|
2022-01-04 01:08:47 +00:00
|
|
|
use RectorPrefix20220104\Symfony\Component\Console\Command\Command;
|
|
|
|
use RectorPrefix20220104\Symfony\Component\Console\Input\InputInterface;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\Enum\Action;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\Enum\Content;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactCommand;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactEvent;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\ValueObject\ParallelProcess;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\ValueObject\ProcessPool;
|
|
|
|
use RectorPrefix20220104\Symplify\EasyParallel\ValueObject\Schedule;
|
|
|
|
use RectorPrefix20220104\Symplify\PackageBuilder\Console\Command\CommandNaming;
|
2021-11-23 13:20:36 +00:00
|
|
|
use Throwable;
|
|
|
|
/**
|
|
|
|
* Inspired from @see
|
|
|
|
* https://github.com/phpstan/phpstan-src/commit/9124c66dcc55a222e21b1717ba5f60771f7dda92#diff-39c7a3b0cbb217bbfff96fbb454e6e5e60c74cf92fbb0f9d246b8bebbaad2bb0
|
|
|
|
*
|
|
|
|
* https://github.com/phpstan/phpstan-src/commit/b84acd2e3eadf66189a64fdbc6dd18ff76323f67#diff-7f625777f1ce5384046df08abffd6c911cfbb1cfc8fcb2bdeaf78f337689e3e2R150
|
|
|
|
*/
|
|
|
|
final class ParallelFileProcessor
|
|
|
|
{
|
|
|
|
/**
|
|
|
|
* @var int
|
|
|
|
*/
|
|
|
|
public const TIMEOUT_IN_SECONDS = 60;
|
|
|
|
/**
|
|
|
|
* @var int
|
|
|
|
*/
|
|
|
|
private const SYSTEM_ERROR_COUNT_LIMIT = 20;
|
|
|
|
/**
|
|
|
|
* @var \Symplify\EasyParallel\ValueObject\ProcessPool|null
|
|
|
|
*/
|
|
|
|
private $processPool = null;
|
|
|
|
/**
|
2021-12-04 12:47:17 +00:00
|
|
|
* @readonly
|
2021-11-23 13:20:36 +00:00
|
|
|
* @var \Rector\Parallel\Command\WorkerCommandLineFactory
|
|
|
|
*/
|
|
|
|
private $workerCommandLineFactory;
|
|
|
|
public function __construct(\Rector\Parallel\Command\WorkerCommandLineFactory $workerCommandLineFactory)
|
|
|
|
{
|
|
|
|
$this->workerCommandLineFactory = $workerCommandLineFactory;
|
|
|
|
}
|
|
|
|
/**
|
|
|
|
* @param Closure(int): void|null $postFileCallback Used for progress bar jump
|
|
|
|
* @return mixed[]
|
|
|
|
*/
|
2022-01-04 01:08:47 +00:00
|
|
|
public function process(\RectorPrefix20220104\Symplify\EasyParallel\ValueObject\Schedule $schedule, string $mainScript, \Closure $postFileCallback, ?string $projectConfigFile, \RectorPrefix20220104\Symfony\Component\Console\Input\InputInterface $input) : array
|
2021-11-23 13:20:36 +00:00
|
|
|
{
|
|
|
|
$jobs = \array_reverse($schedule->getJobs());
|
2022-01-04 01:08:47 +00:00
|
|
|
$streamSelectLoop = new \RectorPrefix20220104\React\EventLoop\StreamSelectLoop();
|
2021-11-23 13:20:36 +00:00
|
|
|
// basic properties setup
|
|
|
|
$numberOfProcesses = $schedule->getNumberOfProcesses();
|
|
|
|
// initial counters
|
|
|
|
$fileDiffs = [];
|
|
|
|
$systemErrors = [];
|
2022-01-04 01:08:47 +00:00
|
|
|
$tcpServer = new \RectorPrefix20220104\React\Socket\TcpServer('127.0.0.1:0', $streamSelectLoop);
|
|
|
|
$this->processPool = new \RectorPrefix20220104\Symplify\EasyParallel\ValueObject\ProcessPool($tcpServer);
|
|
|
|
$tcpServer->on(\RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactEvent::CONNECTION, function (\RectorPrefix20220104\React\Socket\ConnectionInterface $connection) use(&$jobs) : void {
|
|
|
|
$inDecoder = new \RectorPrefix20220104\Clue\React\NDJson\Decoder($connection, \true, 512, 0, 4 * 1024 * 1024);
|
|
|
|
$outEncoder = new \RectorPrefix20220104\Clue\React\NDJson\Encoder($connection);
|
|
|
|
$inDecoder->on(\RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactEvent::DATA, function (array $data) use(&$jobs, $inDecoder, $outEncoder) : void {
|
|
|
|
$action = $data[\RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactCommand::ACTION];
|
|
|
|
if ($action !== \RectorPrefix20220104\Symplify\EasyParallel\Enum\Action::HELLO) {
|
2021-11-23 13:20:36 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
$processIdentifier = $data[\Rector\Core\Configuration\Option::PARALLEL_IDENTIFIER];
|
|
|
|
$parallelProcess = $this->processPool->getProcess($processIdentifier);
|
|
|
|
$parallelProcess->bindConnection($inDecoder, $outEncoder);
|
|
|
|
if ($jobs === []) {
|
|
|
|
$this->processPool->quitProcess($processIdentifier);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
$job = \array_pop($jobs);
|
2022-01-04 01:08:47 +00:00
|
|
|
$parallelProcess->request([\RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactCommand::ACTION => \RectorPrefix20220104\Symplify\EasyParallel\Enum\Action::MAIN, \RectorPrefix20220104\Symplify\EasyParallel\Enum\Content::FILES => $job]);
|
2021-11-23 13:20:36 +00:00
|
|
|
});
|
|
|
|
});
|
|
|
|
/** @var string $serverAddress */
|
|
|
|
$serverAddress = $tcpServer->getAddress();
|
|
|
|
/** @var int $serverPort */
|
|
|
|
$serverPort = \parse_url($serverAddress, \PHP_URL_PORT);
|
|
|
|
$systemErrorsCount = 0;
|
|
|
|
$reachedSystemErrorsCountLimit = \false;
|
|
|
|
$handleErrorCallable = function (\Throwable $throwable) use(&$systemErrors, &$systemErrorsCount, &$reachedSystemErrorsCountLimit) : void {
|
2021-12-28 12:08:43 +00:00
|
|
|
$systemErrors[] = new \Rector\Core\ValueObject\Error\SystemError($throwable->getMessage(), $throwable->getFile(), $throwable->getLine());
|
2021-11-23 13:20:36 +00:00
|
|
|
++$systemErrorsCount;
|
|
|
|
$reachedSystemErrorsCountLimit = \true;
|
|
|
|
$this->processPool->quitAll();
|
|
|
|
};
|
|
|
|
for ($i = 0; $i < $numberOfProcesses; ++$i) {
|
|
|
|
// nothing else to process, stop now
|
|
|
|
if ($jobs === []) {
|
|
|
|
break;
|
|
|
|
}
|
2022-01-04 01:08:47 +00:00
|
|
|
$processIdentifier = \RectorPrefix20220104\Nette\Utils\Random::generate();
|
|
|
|
$workerCommandLine = $this->workerCommandLineFactory->create($mainScript, \Rector\Core\Console\Command\ProcessCommand::class, \RectorPrefix20220104\Symplify\PackageBuilder\Console\Command\CommandNaming::classToName(\Rector\Core\Console\Command\WorkerCommand::class), $projectConfigFile, $input, $processIdentifier, $serverPort);
|
|
|
|
$parallelProcess = new \RectorPrefix20220104\Symplify\EasyParallel\ValueObject\ParallelProcess($workerCommandLine, $streamSelectLoop, self::TIMEOUT_IN_SECONDS);
|
2021-11-23 13:20:36 +00:00
|
|
|
$parallelProcess->start(
|
|
|
|
// 1. callable on data
|
|
|
|
function (array $json) use($parallelProcess, &$systemErrors, &$fileDiffs, &$jobs, $postFileCallback, &$systemErrorsCount, &$reachedInternalErrorsCountLimit, $processIdentifier) : void {
|
|
|
|
// decode arrays to objects
|
|
|
|
foreach ($json[\Rector\Parallel\ValueObject\Bridge::SYSTEM_ERRORS] as $jsonError) {
|
|
|
|
if (\is_string($jsonError)) {
|
|
|
|
$systemErrors[] = 'System error: ' . $jsonError;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
$systemErrors[] = \Rector\Core\ValueObject\Error\SystemError::decode($jsonError);
|
|
|
|
}
|
|
|
|
foreach ($json[\Rector\Parallel\ValueObject\Bridge::FILE_DIFFS] as $jsonError) {
|
|
|
|
$fileDiffs[] = \Rector\Core\ValueObject\Reporting\FileDiff::decode($jsonError);
|
|
|
|
}
|
2021-11-28 18:05:13 +00:00
|
|
|
$postFileCallback($json[\Rector\Parallel\ValueObject\Bridge::FILES_COUNT]);
|
2021-11-23 13:20:36 +00:00
|
|
|
$systemErrorsCount += $json[\Rector\Parallel\ValueObject\Bridge::SYSTEM_ERRORS_COUNT];
|
|
|
|
if ($systemErrorsCount >= self::SYSTEM_ERROR_COUNT_LIMIT) {
|
|
|
|
$reachedInternalErrorsCountLimit = \true;
|
|
|
|
$this->processPool->quitAll();
|
|
|
|
}
|
|
|
|
if ($jobs === []) {
|
|
|
|
$this->processPool->quitProcess($processIdentifier);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
$job = \array_pop($jobs);
|
2022-01-04 01:08:47 +00:00
|
|
|
$parallelProcess->request([\RectorPrefix20220104\Symplify\EasyParallel\Enum\ReactCommand::ACTION => \RectorPrefix20220104\Symplify\EasyParallel\Enum\Action::MAIN, \RectorPrefix20220104\Symplify\EasyParallel\Enum\Content::FILES => $job]);
|
2021-11-23 13:20:36 +00:00
|
|
|
},
|
|
|
|
// 2. callable on error
|
|
|
|
$handleErrorCallable,
|
|
|
|
// 3. callable on exit
|
|
|
|
function ($exitCode, string $stdErr) use(&$systemErrors, $processIdentifier) : void {
|
|
|
|
$this->processPool->tryQuitProcess($processIdentifier);
|
2022-01-04 01:08:47 +00:00
|
|
|
if ($exitCode === \RectorPrefix20220104\Symfony\Component\Console\Command\Command::SUCCESS) {
|
2021-11-23 13:20:36 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
if ($exitCode === null) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
$systemErrors[] = 'Child process error: ' . $stdErr;
|
|
|
|
}
|
|
|
|
);
|
|
|
|
$this->processPool->attachProcess($processIdentifier, $parallelProcess);
|
|
|
|
}
|
|
|
|
$streamSelectLoop->run();
|
|
|
|
if ($reachedSystemErrorsCountLimit) {
|
|
|
|
$systemErrors[] = \sprintf('Reached system errors count limit of %d, exiting...', self::SYSTEM_ERROR_COUNT_LIMIT);
|
|
|
|
}
|
|
|
|
return [\Rector\Parallel\ValueObject\Bridge::FILE_DIFFS => $fileDiffs, \Rector\Parallel\ValueObject\Bridge::SYSTEM_ERRORS => $systemErrors, \Rector\Parallel\ValueObject\Bridge::SYSTEM_ERRORS_COUNT => \count($systemErrors)];
|
|
|
|
}
|
|
|
|
}
|