ProcessSupervisor.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. <?php
  2. namespace Nesk\Rialto;
  3. use Psr\Log\LogLevel;
  4. use RuntimeException;
  5. use Socket\Raw\Socket;
  6. use Socket\Raw\Factory as SocketFactory;
  7. use Socket\Raw\Exception as SocketException;
  8. use Nesk\Rialto\Exceptions\IdleTimeoutException;
  9. use Symfony\Component\Process\Process as SymfonyProcess;
  10. use Symfony\Component\Process\Exception\ProcessFailedException;
  11. use Nesk\Rialto\Interfaces\ShouldHandleProcessDelegation;
  12. use Nesk\Rialto\Exceptions\Node\Exception as NodeException;
  13. use Nesk\Rialto\Exceptions\Node\FatalException as NodeFatalException;
  14. class ProcessSupervisor
  15. {
  16. use Data\UnserializesData, Traits\UsesBasicResourceAsDefault;
  17. /**
  18. * A reasonable delay to let the process terminate itself (in milliseconds).
  19. *
  20. * @var int
  21. */
  22. protected const PROCESS_TERMINATION_DELAY = 100;
  23. /**
  24. * The size of a packet sent through the sockets (in bytes).
  25. *
  26. * @var int
  27. */
  28. protected const SOCKET_PACKET_SIZE = 1024;
  29. /**
  30. * The size of the header in each packet sent through the sockets (in bytes).
  31. *
  32. * @var int
  33. */
  34. protected const SOCKET_HEADER_SIZE = 5;
  35. /**
  36. * A short period to wait before reading the next chunk (in milliseconds), this avoids the next chunk to be read as
  37. * an empty string when PuPHPeteer is running on a slow environment.
  38. *
  39. * @var int
  40. */
  41. protected const SOCKET_NEXT_CHUNK_DELAY = 1;
  42. /**
  43. * Options to remove before sending them for the process.
  44. *
  45. * @var string[]
  46. */
  47. protected const USELESS_OPTIONS_FOR_PROCESS = [
  48. 'executable_path', 'read_timeout', 'stop_timeout', 'logger', 'debug',
  49. ];
  50. /**
  51. * The associative array containing the options.
  52. *
  53. * @var array
  54. */
  55. protected $options = [
  56. // Node's executable path
  57. 'executable_path' => 'node',
  58. // How much time (in seconds) the process can stay inactive before being killed (set to null to disable)
  59. 'idle_timeout' => 60,
  60. // How much time (in seconds) an instruction can take to return a value (set to null to disable)
  61. 'read_timeout' => 30,
  62. // How much time (in seconds) the process can take to shutdown properly before being killed
  63. 'stop_timeout' => 3,
  64. // A logger instance for debugging (must implement \Psr\Log\LoggerInterface)
  65. 'logger' => null,
  66. // Logs the output of console methods (console.log, console.debug, console.table, etc...) to the PHP logger
  67. 'log_node_console' => false,
  68. // Enables debugging mode:
  69. // - adds the --inspect flag to Node's command
  70. // - appends stack traces to Node exception messages
  71. 'debug' => false,
  72. ];
  73. /**
  74. * The running process.
  75. *
  76. * @var \Symfony\Component\Process\Process
  77. */
  78. protected $process;
  79. /**
  80. * The PID of the running process.
  81. *
  82. * @var int
  83. */
  84. protected $processPid;
  85. /**
  86. * The process delegate.
  87. *
  88. * @var \Nesk\Rialto\ShouldHandleProcessDelegation;
  89. */
  90. protected $delegate;
  91. /**
  92. * The client to communicate with the process.
  93. *
  94. * @var \Socket\Raw\Socket
  95. */
  96. protected $client;
  97. /**
  98. * The server port.
  99. *
  100. * @var int
  101. */
  102. protected $serverPort;
  103. /**
  104. * The logger instance.
  105. *
  106. * @var \Psr\Log\LoggerInterface
  107. */
  108. protected $logger;
  109. /**
  110. * Constructor.
  111. */
  112. public function __construct(
  113. string $connectionDelegatePath,
  114. ?ShouldHandleProcessDelegation $processDelegate = null,
  115. array $options = []
  116. ) {
  117. $this->logger = new Logger($options['logger'] ?? null);
  118. $this->applyOptions($options);
  119. $this->process = $this->createNewProcess($connectionDelegatePath);
  120. $this->processPid = $this->startProcess($this->process);
  121. $this->delegate = $processDelegate;
  122. $this->client = $this->createNewClient($this->serverPort());
  123. if ($this->options['debug']) {
  124. // Clear error output made by the "--inspect" flag
  125. $this->process->clearErrorOutput();
  126. }
  127. }
  128. /**
  129. * Destructor.
  130. */
  131. public function __destruct()
  132. {
  133. $logContext = ['pid' => $this->processPid];
  134. $this->waitForProcessTermination();
  135. if ($this->process->isRunning()) {
  136. $this->executeInstruction(Instruction::noop(), false); // Fetch the missing remote logs
  137. $this->logger->info('Stopping process with PID {pid}...', $logContext);
  138. $this->process->stop($this->options['stop_timeout']);
  139. $this->logger->info('Stopped process with PID {pid}', $logContext);
  140. } else {
  141. $this->logger->warning("The process cannot because be stopped because it's no longer running", $logContext);
  142. }
  143. }
  144. /**
  145. * Log data from the process standard streams.
  146. */
  147. protected function logProcessStandardStreams(): void
  148. {
  149. if (!empty($output = $this->process->getIncrementalOutput())) {
  150. $this->logger->notice('Received data on stdout: {output}', [
  151. 'pid' => $this->processPid,
  152. 'stream' => 'stdout',
  153. 'output' => $output,
  154. ]);
  155. }
  156. if (!empty($errorOutput = $this->process->getIncrementalErrorOutput())) {
  157. $this->logger->error('Received data on stderr: {output}', [
  158. 'pid' => $this->processPid,
  159. 'stream' => 'stderr',
  160. 'output' => $errorOutput,
  161. ]);
  162. }
  163. }
  164. /**
  165. * Apply the options.
  166. */
  167. protected function applyOptions(array $options): void
  168. {
  169. $this->logger->info('Applying options...', ['options' => $options]);
  170. $this->options = array_merge($this->options, $options);
  171. $this->logger->debug('Options applied and merged with defaults', ['options' => $this->options]);
  172. }
  173. /**
  174. * Return the script path of the Node process.
  175. *
  176. * In production, the script path must target the NPM package. In local development, the script path targets the
  177. * Composer package (since the NPM package is not installed).
  178. *
  179. * This avoids double declarations of some JS classes in production, due to a require with two different paths (one
  180. * with the NPM path, the other one with the Composer path).
  181. */
  182. protected function getProcessScriptPath(): string {
  183. static $scriptPath = null;
  184. if ($scriptPath !== null) {
  185. return $scriptPath;
  186. }
  187. // The script path in local development
  188. $scriptPath = __DIR__.'/node-process/serve.js';
  189. $process = new SymfonyProcess([
  190. $this->options['executable_path'],
  191. '-e',
  192. "process.stdout.write(require.resolve('@nesk/rialto/src/node-process/serve.js'))",
  193. ]);
  194. $exitCode = $process->run();
  195. if ($exitCode === 0) {
  196. // The script path in production
  197. $scriptPath = $process->getOutput();
  198. }
  199. return $scriptPath;
  200. }
  201. /**
  202. * Create a new Node process.
  203. *
  204. * @throws RuntimeException if the path to the connection delegate cannot be found.
  205. */
  206. protected function createNewProcess(string $connectionDelegatePath): SymfonyProcess
  207. {
  208. $realConnectionDelegatePath = realpath($connectionDelegatePath);
  209. if ($realConnectionDelegatePath === false) {
  210. throw new RuntimeException("Cannot find file or directory '$connectionDelegatePath'.");
  211. }
  212. // Remove useless options for the process
  213. $processOptions = array_diff_key($this->options, array_flip(self::USELESS_OPTIONS_FOR_PROCESS));
  214. return new SymfonyProcess(array_merge(
  215. [$this->options['executable_path']],
  216. $this->options['debug'] ? ['--inspect'] : [],
  217. [$this->getProcessScriptPath()],
  218. [$realConnectionDelegatePath],
  219. [json_encode((object) $processOptions)]
  220. ));
  221. }
  222. /**
  223. * Start the Node process.
  224. */
  225. protected function startProcess(SymfonyProcess $process): int
  226. {
  227. $this->logger->info('Starting process with command line: {commandline}', [
  228. 'commandline' => $process->getCommandLine(),
  229. ]);
  230. $process->start();
  231. $pid = $process->getPid();
  232. $this->logger->info('Process started with PID {pid}', ['pid' => $pid]);
  233. return $pid;
  234. }
  235. /**
  236. * Check if the process is still running without errors.
  237. *
  238. * @throws \Symfony\Component\Process\Exception\ProcessFailedException
  239. */
  240. protected function checkProcessStatus(): void
  241. {
  242. $this->logProcessStandardStreams();
  243. $process = $this->process;
  244. if (!empty($process->getErrorOutput())) {
  245. if (IdleTimeoutException::exceptionApplies($process)) {
  246. throw new IdleTimeoutException(
  247. $this->options['idle_timeout'],
  248. new NodeFatalException($process, $this->options['debug'])
  249. );
  250. } else if (NodeFatalException::exceptionApplies($process)) {
  251. throw new NodeFatalException($process, $this->options['debug']);
  252. } elseif ($process->isTerminated() && !$process->isSuccessful()) {
  253. throw new ProcessFailedException($process);
  254. }
  255. }
  256. if ($process->isTerminated()) {
  257. throw new Exceptions\ProcessUnexpectedlyTerminatedException($process);
  258. }
  259. }
  260. /**
  261. * Wait for process termination.
  262. *
  263. * The process might take a while to stop itself. So, before trying to check its status or reading its standard
  264. * streams, this method should be executed.
  265. */
  266. protected function waitForProcessTermination(): void {
  267. usleep(self::PROCESS_TERMINATION_DELAY * 1000);
  268. }
  269. /**
  270. * Return the port of the server.
  271. */
  272. protected function serverPort(): int
  273. {
  274. if ($this->serverPort !== null) {
  275. return $this->serverPort;
  276. }
  277. $iterator = $this->process->getIterator(SymfonyProcess::ITER_SKIP_ERR | SymfonyProcess::ITER_KEEP_OUTPUT);
  278. foreach ($iterator as $data) {
  279. return $this->serverPort = (int) $data;
  280. }
  281. // If the iterator didn't execute properly, then the process must have failed, we must check to be sure.
  282. $this->checkProcessStatus();
  283. }
  284. /**
  285. * Create a new client to communicate with the process.
  286. */
  287. protected function createNewClient(int $port): Socket
  288. {
  289. // Set the client as non-blocking to handle the exceptions thrown by the process
  290. return (new SocketFactory)
  291. ->createClient("tcp://127.0.0.1:$port")
  292. ->setBlocking(false);
  293. }
  294. /**
  295. * Send an instruction to the process for execution.
  296. */
  297. public function executeInstruction(Instruction $instruction, bool $instructionShouldBeLogged = true)
  298. {
  299. // Check the process status because it could have crash in idle status.
  300. $this->checkProcessStatus();
  301. $serializedInstruction = json_encode($instruction);
  302. if ($instructionShouldBeLogged) {
  303. $this->logger->debug('Sending an instruction to the port {port}...', [
  304. 'pid' => $this->processPid,
  305. 'port' => $this->serverPort(),
  306. // The instruction must be fully encoded and decoded to appear properly in the logs (this way,
  307. // JS functions and resources are serialized too).
  308. 'instruction' => json_decode($serializedInstruction, true),
  309. ]);
  310. }
  311. $this->client->selectWrite(1);
  312. $this->client->write($serializedInstruction);
  313. $value = $this->readNextProcessValue($instructionShouldBeLogged);
  314. // Check the process status if the value is null because, if the process crash while executing the instruction,
  315. // the socket closes and returns an empty value (which is converted to `null`).
  316. if ($value === null) {
  317. $this->checkProcessStatus();
  318. }
  319. return $value;
  320. }
  321. /**
  322. * Read the next value written by the process.
  323. *
  324. * @throws \Nesk\Rialto\Exceptions\ReadSocketTimeoutException if reading the socket exceeded the timeout.
  325. * @throws \Nesk\Rialto\Exceptions\Node\Exception if the process returned an error.
  326. */
  327. protected function readNextProcessValue(bool $valueShouldBeLogged = true)
  328. {
  329. $readTimeout = $this->options['read_timeout'];
  330. $payload = '';
  331. try {
  332. $startTimestamp = microtime(true);
  333. do {
  334. $this->client->selectRead($readTimeout);
  335. $packet = $this->client->read(static::SOCKET_PACKET_SIZE);
  336. $chunksLeft = (int) substr($packet, 0, static::SOCKET_HEADER_SIZE);
  337. $chunk = substr($packet, static::SOCKET_HEADER_SIZE);
  338. $payload .= $chunk;
  339. if ($chunksLeft > 0) {
  340. // The next chunk might be an empty string if don't wait a short period on slow environments.
  341. usleep(self::SOCKET_NEXT_CHUNK_DELAY * 1000);
  342. }
  343. } while ($chunksLeft > 0);
  344. } catch (SocketException $exception) {
  345. $this->waitForProcessTermination();
  346. $this->checkProcessStatus();
  347. // Extract the socket error code to throw more specific exceptions
  348. preg_match('/\(([A-Z_]+?)\)$/', $exception->getMessage(), $socketErrorMatches);
  349. $socketErrorCode = constant($socketErrorMatches[1]);
  350. $elapsedTime = microtime(true) - $startTimestamp;
  351. if ($socketErrorCode === SOCKET_EAGAIN && $readTimeout !== null && $elapsedTime >= $readTimeout) {
  352. throw new Exceptions\ReadSocketTimeoutException($readTimeout, $exception);
  353. }
  354. throw $exception;
  355. }
  356. $this->logProcessStandardStreams();
  357. ['logs' => $logs, 'value' => $value] = json_decode(base64_decode($payload), true);
  358. foreach ($logs ?: [] as $log) {
  359. $level = (new \ReflectionClass(LogLevel::class))->getConstant($log['level']);
  360. $messageContainsLineBreaks = strstr($log['message'], PHP_EOL) !== false;
  361. $formattedMessage = $messageContainsLineBreaks ? "\n{log}\n" : '{log}';
  362. $this->logger->log($level, "Received a $log[origin] log: $formattedMessage", [
  363. 'pid' => $this->processPid,
  364. 'port' => $this->serverPort(),
  365. 'log' => $log['message'],
  366. ]);
  367. }
  368. $value = $this->unserialize($value);
  369. if ($valueShouldBeLogged) {
  370. $this->logger->debug('Received data from the port {port}...', [
  371. 'pid' => $this->processPid,
  372. 'port' => $this->serverPort(),
  373. 'data' => $value,
  374. ]);
  375. }
  376. if ($value instanceof NodeException) {
  377. throw $value;
  378. }
  379. return $value;
  380. }
  381. }