push  v0.0.1
PHP shell
Manager.php
1 <?php
2 
3 namespace fpoirotte\push;
4 
5 class Manager extends Protocol
6 {
7  const OP_COMMAND = "\x00";
8  const OP_SIGNAL = "\x01";
9 
10  static protected $workerCls = '\\fpoirotte\\push\\Worker';
11 
12  protected $workerPid;
13  protected $statements;
14  protected $working;
15  protected $evalLocation;
16 
17  public function __construct()
18  {
19  parent::__construct(static::$workerCls);
20  $this->workerPid = -1;
21  $this->statements = array();
22  $this->working = false;
23  $this->evalLocation = '';
24  }
25 
26  public function isWorking()
27  {
28  return $this->working;
29  }
30 
31  public function getEvalLocation()
32  {
33  return $this->evalLocation;
34  }
35 
36  protected function replaceSpecialStream($streamName, $new)
37  {
38  $special = array('STDIN', 'STDOUT', 'STDERR');
39  $tmp = eio_dup2($new, array_search($streamName, $special, true));
40  if ($tmp === false) {
41  throw new \RuntimeException('Could not duplicate stream');
42  }
43 
44  if (!eio_event_loop()) {
45  throw new \RuntimeException('Could not run event loop');
46  }
47  }
48 
49  public function prepare(&$stdout, &$stderr, &$control)
50  {
51  $pipes = array();
52  $managerPid = getmypid();
53 
54  /* Open several socket pairs for IPC:
55  * 1 = worker's STDOUT (unidirectional)
56  * 2 = worker's STDERR (unidirectional)
57  * 3 = control socket (bidirectional)
58  */
59  for ($i = 1; $i < 4; $i++) {
60  if (!$pipes[$i] = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) {
61  while ($i > 1) {
62  fclose($pipes[$i-1][0]);
63  fclose($pipes[$i-1][1]);
64  $i--;
65  }
66  throw new \RuntimeException('Failed to create socket pairs');
67  }
68  }
69 
70  $pid = pcntl_fork();
71  if ($pid < 0) {
72  for ($i = 1; $i < 4; $i++) {
73  fclose($pipes[$i][0]);
74  fclose($pipes[$i][1]);
75  }
76  throw new \RuntimeException('Failed to fork child process');
77  }
78 
79  if ($pid > 0) {
80  $this->workerPid = $pid;
81  $this->statements = array();
82 
83  fclose($pipes[1][1]);
84  fclose($pipes[2][1]);
85  fclose($pipes[3][1]);
86 
87  $stdout = $pipes[1][0];
88  $stderr = $pipes[2][0];
89  $control = $pipes[3][0];
90  $this->socket = $pipes[3][0];
91 
92  // Using stream_select() is the preferred way
93  // to deal with I/O events on these streams.
94  stream_set_blocking($stdout, 0);
95  stream_set_blocking($stderr, 0);
96  } else {
97  $title = "push (worker for process $managerPid)";
98  if (function_exists('cli_set_process_title')) {
99  cli_set_process_title($title);
100  } elseif (function_exists('setproctitle')) {
101  setproctitle($title);
102  }
103 
104  // Close the reading end of the streams.
105  fclose($pipes[1][0]);
106  fclose($pipes[2][0]);
107  fclose($pipes[3][0]);
108 
109  // Overwrite the special constants STDIN, STDOUT & STDERR
110  // with new streams so that the worker's output/errors can
111  // by intercepted.
112  $this->replaceSpecialStream("STDIN", fopen('/dev/null', 'rb'));
113  $this->replaceSpecialStream("STDOUT", $pipes[1][1]);
114  $this->replaceSpecialStream("STDERR", $pipes[2][1]);
115 
116  $cls = static::$workerCls;
117  $worker = new $cls($pipes[3][1]);
118  $worker->run();
119  }
120  }
121 
122  protected function sendWork()
123  {
124  if (!count($this->statements) || $this->working) {
125  return;
126  }
127 
128  $this->working = true;
129  $this->send(self::OP_COMMAND, array_shift($this->statements));
130  }
131 
132  public function sendCommands($data)
133  {
134  // Push the new statement and reindex the queue.
135  array_push($this->statements, $data);
136  $this->statements = array_values($this->statements);
137  $this->sendWork();
138  }
139 
140  public function sendSignal($signo)
141  {
142  if (!is_int($signo)) {
143  if (!strncmp($signo, 'SIG', 3) && defined($signo)) {
144  $signo = constant($signo);
145  } else {
146  throw new \RuntimeException();
147  }
148  }
149 
150  switch ($signo) {
151  case SIGCHLD:
152  $child = pcntl_waitpid($this->workerPid, $status, WNOHANG);
153  if ($child > 0) {
154  if (pcntl_wifexited($child)) {
155  exit(pcntl_wexitstatus($child));
156  }
157  }
158  return; // The worker must not receive its own signal.
159  }
160 
161  $this->send(self::OP_SIGNAL, (string) $signo);
162  }
163 
164  protected function handleREADY($data)
165  {
166  // $data contains the file & line where PHP evaluates the code.
167  // We use this to filter out the output from errors/exceptions.
168  $this->evalLocation = $data;
169  }
170 
171  protected function handleSTART($data)
172  {
173  $this->workerPid = (int) $data;
174  }
175 
176  protected function handleEND()
177  {
178  $this->working = false;
179  $this->sendWork();
180  }
181 }