2021-11-23 13:20:36 +00:00
< ? php
declare ( strict_types = 1 );
namespace Rector\Parallel\Application ;
use Closure ;
2022-01-14 09:27:49 +00:00
use RectorPrefix20220114\Clue\React\NDJson\Decoder ;
use RectorPrefix20220114\Clue\React\NDJson\Encoder ;
use RectorPrefix20220114\Nette\Utils\Random ;
use RectorPrefix20220114\React\EventLoop\StreamSelectLoop ;
use RectorPrefix20220114\React\Socket\ConnectionInterface ;
use RectorPrefix20220114\React\Socket\TcpServer ;
2021-11-23 13:20:36 +00:00
use Rector\Core\Configuration\Option ;
use Rector\Core\Console\Command\ProcessCommand ;
use Rector\Core\Console\Command\WorkerCommand ;
use Rector\Core\ValueObject\Error\SystemError ;
use Rector\Core\ValueObject\Reporting\FileDiff ;
use Rector\Parallel\Command\WorkerCommandLineFactory ;
use Rector\Parallel\ValueObject\Bridge ;
2022-01-14 09:27:49 +00:00
use RectorPrefix20220114\Symfony\Component\Console\Command\Command ;
use RectorPrefix20220114\Symfony\Component\Console\Input\InputInterface ;
use RectorPrefix20220114\Symplify\EasyParallel\Enum\Action ;
use RectorPrefix20220114\Symplify\EasyParallel\Enum\Content ;
use RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactCommand ;
use RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactEvent ;
use RectorPrefix20220114\Symplify\EasyParallel\ValueObject\ParallelProcess ;
use RectorPrefix20220114\Symplify\EasyParallel\ValueObject\ProcessPool ;
use RectorPrefix20220114\Symplify\EasyParallel\ValueObject\Schedule ;
use RectorPrefix20220114\Symplify\PackageBuilder\Console\Command\CommandNaming ;
2021-11-23 13:20:36 +00:00
use Throwable ;
/**
* Inspired from @ see
* https :// github . com / phpstan / phpstan - src / commit / 9124 c66dcc55a222e21b1717ba5f60771f7dda92 #diff-39c7a3b0cbb217bbfff96fbb454e6e5e60c74cf92fbb0f9d246b8bebbaad2bb0
*
* https :// github . com / phpstan / phpstan - src / commit / b84acd2e3eadf66189a64fdbc6dd18ff76323f67 #diff-7f625777f1ce5384046df08abffd6c911cfbb1cfc8fcb2bdeaf78f337689e3e2R150
*/
final class ParallelFileProcessor
{
/**
* @ var int
*/
2022-01-11 15:27:59 +00:00
public const TIMEOUT_IN_SECONDS = 120 ;
2021-11-23 13:20:36 +00:00
/**
* @ var int
*/
private const SYSTEM_ERROR_COUNT_LIMIT = 20 ;
/**
* @ var \Symplify\EasyParallel\ValueObject\ProcessPool | null
*/
private $processPool = null ;
/**
2021-12-04 12:47:17 +00:00
* @ readonly
2021-11-23 13:20:36 +00:00
* @ var \Rector\Parallel\Command\WorkerCommandLineFactory
*/
private $workerCommandLineFactory ;
public function __construct ( \Rector\Parallel\Command\WorkerCommandLineFactory $workerCommandLineFactory )
{
$this -> workerCommandLineFactory = $workerCommandLineFactory ;
}
/**
* @ param Closure ( int ) : void | null $postFileCallback Used for progress bar jump
* @ return mixed []
*/
2022-01-14 09:27:49 +00:00
public function process ( \RectorPrefix20220114\Symplify\EasyParallel\ValueObject\Schedule $schedule , string $mainScript , \Closure $postFileCallback , \RectorPrefix20220114\Symfony\Component\Console\Input\InputInterface $input ) : array
2021-11-23 13:20:36 +00:00
{
$jobs = \array_reverse ( $schedule -> getJobs ());
2022-01-14 09:27:49 +00:00
$streamSelectLoop = new \RectorPrefix20220114\React\EventLoop\StreamSelectLoop ();
2021-11-23 13:20:36 +00:00
// basic properties setup
$numberOfProcesses = $schedule -> getNumberOfProcesses ();
// initial counters
$fileDiffs = [];
2022-01-04 13:23:36 +00:00
/** @var SystemError[] $systemErrors */
2021-11-23 13:20:36 +00:00
$systemErrors = [];
2022-01-14 09:27:49 +00:00
$tcpServer = new \RectorPrefix20220114\React\Socket\TcpServer ( '127.0.0.1:0' , $streamSelectLoop );
$this -> processPool = new \RectorPrefix20220114\Symplify\EasyParallel\ValueObject\ProcessPool ( $tcpServer );
$tcpServer -> on ( \RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactEvent :: CONNECTION , function ( \RectorPrefix20220114\React\Socket\ConnectionInterface $connection ) use ( & $jobs ) : void {
$inDecoder = new \RectorPrefix20220114\Clue\React\NDJson\Decoder ( $connection , \true , 512 , 0 , 4 * 1024 * 1024 );
$outEncoder = new \RectorPrefix20220114\Clue\React\NDJson\Encoder ( $connection );
$inDecoder -> on ( \RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactEvent :: DATA , function ( array $data ) use ( & $jobs , $inDecoder , $outEncoder ) : void {
$action = $data [ \RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactCommand :: ACTION ];
if ( $action !== \RectorPrefix20220114\Symplify\EasyParallel\Enum\Action :: HELLO ) {
2021-11-23 13:20:36 +00:00
return ;
}
$processIdentifier = $data [ \Rector\Core\Configuration\Option :: PARALLEL_IDENTIFIER ];
$parallelProcess = $this -> processPool -> getProcess ( $processIdentifier );
$parallelProcess -> bindConnection ( $inDecoder , $outEncoder );
if ( $jobs === []) {
$this -> processPool -> quitProcess ( $processIdentifier );
return ;
}
$job = \array_pop ( $jobs );
2022-01-14 09:27:49 +00:00
$parallelProcess -> request ([ \RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactCommand :: ACTION => \RectorPrefix20220114\Symplify\EasyParallel\Enum\Action :: MAIN , \RectorPrefix20220114\Symplify\EasyParallel\Enum\Content :: FILES => $job ]);
2021-11-23 13:20:36 +00:00
});
});
/** @var string $serverAddress */
$serverAddress = $tcpServer -> getAddress ();
/** @var int $serverPort */
$serverPort = \parse_url ( $serverAddress , \PHP_URL_PORT );
$systemErrorsCount = 0 ;
$reachedSystemErrorsCountLimit = \false ;
$handleErrorCallable = function ( \Throwable $throwable ) use ( & $systemErrors , & $systemErrorsCount , & $reachedSystemErrorsCountLimit ) : void {
2021-12-28 12:08:43 +00:00
$systemErrors [] = new \Rector\Core\ValueObject\Error\SystemError ( $throwable -> getMessage (), $throwable -> getFile (), $throwable -> getLine ());
2021-11-23 13:20:36 +00:00
++ $systemErrorsCount ;
$reachedSystemErrorsCountLimit = \true ;
$this -> processPool -> quitAll ();
};
for ( $i = 0 ; $i < $numberOfProcesses ; ++ $i ) {
// nothing else to process, stop now
if ( $jobs === []) {
break ;
}
2022-01-14 09:27:49 +00:00
$processIdentifier = \RectorPrefix20220114\Nette\Utils\Random :: generate ();
$workerCommandLine = $this -> workerCommandLineFactory -> create ( $mainScript , \Rector\Core\Console\Command\ProcessCommand :: class , \RectorPrefix20220114\Symplify\PackageBuilder\Console\Command\CommandNaming :: classToName ( \Rector\Core\Console\Command\WorkerCommand :: class ), $input , $processIdentifier , $serverPort );
$parallelProcess = new \RectorPrefix20220114\Symplify\EasyParallel\ValueObject\ParallelProcess ( $workerCommandLine , $streamSelectLoop , self :: TIMEOUT_IN_SECONDS );
2021-11-23 13:20:36 +00:00
$parallelProcess -> start (
// 1. callable on data
function ( array $json ) use ( $parallelProcess , & $systemErrors , & $fileDiffs , & $jobs , $postFileCallback , & $systemErrorsCount , & $reachedInternalErrorsCountLimit , $processIdentifier ) : void {
// decode arrays to objects
foreach ( $json [ \Rector\Parallel\ValueObject\Bridge :: SYSTEM_ERRORS ] as $jsonError ) {
if ( \is_string ( $jsonError )) {
2022-01-04 13:23:36 +00:00
$systemErrors [] = new \Rector\Core\ValueObject\Error\SystemError ( 'System error: ' . $jsonError );
2021-11-23 13:20:36 +00:00
continue ;
}
$systemErrors [] = \Rector\Core\ValueObject\Error\SystemError :: decode ( $jsonError );
}
foreach ( $json [ \Rector\Parallel\ValueObject\Bridge :: FILE_DIFFS ] as $jsonError ) {
$fileDiffs [] = \Rector\Core\ValueObject\Reporting\FileDiff :: decode ( $jsonError );
}
2021-11-28 18:05:13 +00:00
$postFileCallback ( $json [ \Rector\Parallel\ValueObject\Bridge :: FILES_COUNT ]);
2021-11-23 13:20:36 +00:00
$systemErrorsCount += $json [ \Rector\Parallel\ValueObject\Bridge :: SYSTEM_ERRORS_COUNT ];
if ( $systemErrorsCount >= self :: SYSTEM_ERROR_COUNT_LIMIT ) {
$reachedInternalErrorsCountLimit = \true ;
$this -> processPool -> quitAll ();
}
if ( $jobs === []) {
$this -> processPool -> quitProcess ( $processIdentifier );
return ;
}
$job = \array_pop ( $jobs );
2022-01-14 09:27:49 +00:00
$parallelProcess -> request ([ \RectorPrefix20220114\Symplify\EasyParallel\Enum\ReactCommand :: ACTION => \RectorPrefix20220114\Symplify\EasyParallel\Enum\Action :: MAIN , \RectorPrefix20220114\Symplify\EasyParallel\Enum\Content :: FILES => $job ]);
2021-11-23 13:20:36 +00:00
},
// 2. callable on error
$handleErrorCallable ,
// 3. callable on exit
function ( $exitCode , string $stdErr ) use ( & $systemErrors , $processIdentifier ) : void {
$this -> processPool -> tryQuitProcess ( $processIdentifier );
2022-01-14 09:27:49 +00:00
if ( $exitCode === \RectorPrefix20220114\Symfony\Component\Console\Command\Command :: SUCCESS ) {
2021-11-23 13:20:36 +00:00
return ;
}
if ( $exitCode === null ) {
return ;
}
2022-01-04 13:23:36 +00:00
$systemErrors [] = new \Rector\Core\ValueObject\Error\SystemError ( 'Child process error: ' . $stdErr );
2021-11-23 13:20:36 +00:00
}
);
$this -> processPool -> attachProcess ( $processIdentifier , $parallelProcess );
}
$streamSelectLoop -> run ();
if ( $reachedSystemErrorsCountLimit ) {
2022-01-04 13:23:36 +00:00
$systemErrors [] = new \Rector\Core\ValueObject\Error\SystemError ( \sprintf ( 'Reached system errors count limit of %d, exiting...' , self :: SYSTEM_ERROR_COUNT_LIMIT ));
2021-11-23 13:20:36 +00:00
}
return [ \Rector\Parallel\ValueObject\Bridge :: FILE_DIFFS => $fileDiffs , \Rector\Parallel\ValueObject\Bridge :: SYSTEM_ERRORS => $systemErrors , \Rector\Parallel\ValueObject\Bridge :: SYSTEM_ERRORS_COUNT => \count ( $systemErrors )];
}
}