7 const OP_COMMAND =
"\x00";
8 const OP_SIGNAL =
"\x01";
10 static protected $workerCls =
'\\fpoirotte\\push\\Worker';
13 protected $statements;
15 protected $evalLocation;
17 public function __construct()
19 parent::__construct(static::$workerCls);
20 $this->workerPid = -1;
21 $this->statements = array();
22 $this->working =
false;
23 $this->evalLocation =
'';
26 public function isWorking()
28 return $this->working;
31 public function getEvalLocation()
33 return $this->evalLocation;
36 protected function replaceSpecialStream($streamName, $new)
38 $special = array(
'STDIN',
'STDOUT',
'STDERR');
39 $tmp = eio_dup2($new, array_search($streamName, $special,
true));
41 throw new \RuntimeException(
'Could not duplicate stream');
44 if (!eio_event_loop()) {
45 throw new \RuntimeException(
'Could not run event loop');
49 public function prepare(&$stdout, &$stderr, &$control)
52 $managerPid = getmypid();
59 for ($i = 1; $i < 4; $i++) {
60 if (!$pipes[$i] = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) {
62 fclose($pipes[$i-1][0]);
63 fclose($pipes[$i-1][1]);
66 throw new \RuntimeException(
'Failed to create socket pairs');
72 for ($i = 1; $i < 4; $i++) {
73 fclose($pipes[$i][0]);
74 fclose($pipes[$i][1]);
76 throw new \RuntimeException(
'Failed to fork child process');
80 $this->workerPid = $pid;
81 $this->statements = array();
87 $stdout = $pipes[1][0];
88 $stderr = $pipes[2][0];
89 $control = $pipes[3][0];
90 $this->socket = $pipes[3][0];
94 stream_set_blocking($stdout, 0);
95 stream_set_blocking($stderr, 0);
97 $title =
"push (worker for process $managerPid)";
98 if (function_exists(
'cli_set_process_title')) {
99 cli_set_process_title($title);
100 } elseif (function_exists(
'setproctitle')) {
101 setproctitle($title);
105 fclose($pipes[1][0]);
106 fclose($pipes[2][0]);
107 fclose($pipes[3][0]);
112 $this->replaceSpecialStream(
"STDIN", fopen(
'/dev/null',
'rb'));
113 $this->replaceSpecialStream(
"STDOUT", $pipes[1][1]);
114 $this->replaceSpecialStream(
"STDERR", $pipes[2][1]);
116 $cls = static::$workerCls;
117 $worker =
new $cls($pipes[3][1]);
122 protected function sendWork()
124 if (!count($this->statements) || $this->working) {
128 $this->working =
true;
129 $this->send(self::OP_COMMAND, array_shift($this->statements));
132 public function sendCommands($data)
135 array_push($this->statements, $data);
136 $this->statements = array_values($this->statements);
140 public function sendSignal($signo)
142 if (!is_int($signo)) {
143 if (!strncmp($signo,
'SIG', 3) && defined($signo)) {
144 $signo = constant($signo);
146 throw new \RuntimeException();
152 $child = pcntl_waitpid($this->workerPid, $status, WNOHANG);
154 if (pcntl_wifexited($child)) {
155 exit(pcntl_wexitstatus($child));
161 $this->send(self::OP_SIGNAL, (
string) $signo);
164 protected function handleREADY($data)
168 $this->evalLocation = $data;
171 protected function handleSTART($data)
173 $this->workerPid = (int) $data;
176 protected function handleEND()
178 $this->working =
false;