StreamManager  v0.1.1
PHP stream manager
StreamManager.php
1 <?php
2 
3 namespace fpoirotte;
4 
6 {
7  protected $streams;
8  protected $hasDispatch;
9 
10  const WRAPPER_NAME = 'streammanager.wrapper';
11  const WRAPPER_CLASS = '\\fpoirotte\\StreamManager\\StreamWrapper';
12 
13  public function __construct()
14  {
15  if (!in_array(static::WRAPPER_NAME, stream_get_wrappers())) {
16  if (false === stream_wrapper_register(static::WRAPPER_NAME, static::WRAPPER_CLASS)) {
17  throw new \RuntimeException('Could not register stream wrapper');
18  }
19  }
20 
21  $this->streams = array();
22  $this->hasDispatch = function_exists('pcntl_signal_dispatch');
23  }
24 
25  public function count()
26  {
27  return count($this->streams);
28  }
29 
30  public function offsetExists($offset)
31  {
32  return isset($this->streams[$offset]);
33  }
34 
35  public function offsetUnset($offset)
36  {
37  unset($this->streams[$offset]);
38  }
39 
40  public function offsetGet($offset)
41  {
42  if (!isset($this->streams[$offset])) {
43  return null;
44  }
45  return $this->streams[$offset];
46  }
47 
48  public function offsetSet($offset, $value)
49  {
50  if (!is_resource($value) || 'stream' !== get_resource_type($value)) {
51  throw new \InvalidArgumentException('A stream was expected');
52  }
53 
54  // Wrap foreign streams
55  $meta = stream_get_meta_data($value);
56  $uri = isset($meta['uri']) ? $meta['uri'] : '';
57  if (strncasecmp(static::WRAPPER_NAME . ':', $uri, strlen(static::WRAPPER_NAME) + 1)) {
58  // We need to use "self" instead of "static" in various places here
59  // because that's what the stream wrapper will look for later on.
60  $options = stream_context_get_options($value);
61  if (false === $options) {
62  throw new \RuntimeException('Could not retrieve stream context options');
63  }
64 
65  $options[self::WRAPPER_NAME]['stream'] = $value;
66  if (!isset($options[self::WRAPPER_NAME]['readCallback'])) {
67  $options[self::WRAPPER_NAME]['readCallback'] = null;
68  }
69 
70  if (!isset($options[self::WRAPPER_NAME]['closeCallback'])) {
71  $options[self::WRAPPER_NAME]['closeCallback'] = null;
72  }
73 
74  $ctx = stream_context_create($options);
75  $value = fopen(static::WRAPPER_NAME . '://', 'r+b', false, $ctx);
76  if (false === $value) {
77  throw new \RuntimeException('Could not wrap the stream');
78  }
79 
80  // For some reason, PHP loses track of the options associated
81  // with the stream during the call to fopen().
82  // Therefore, we bind the options back manually.
83  //
84  // Also, HHVM has issues when setting an array of options
85  // on a stream rather than a stream context, hence these loops.
86  foreach ($options as $wrapper => $suboptions) {
87  foreach ($suboptions as $optname => $optvalue) {
88  stream_context_set_option($value, $wrapper, $optname, $optvalue);
89  }
90  }
91  }
92 
93  if (!is_string($offset) || is_numeric($offset)) {
94  throw new \InvalidArgumentException('You must assign a name to the stream');
95  }
96  $this->streams[$offset] = $value;
97  }
98 
99  public function loopOnce()
100  {
101  do {
102  $r = array();
103  $w = array();
104  $e = array();
105 
106  // Save the current streams in case a callback updates the manager.
107  $streams = $this->streams;
108 
109  // Prepare the streams select()ion.
110  foreach ($this->streams as $name => $stream) {
111  $ctx = stream_context_get_options($stream);
112  $rawStream = $ctx[self::WRAPPER_NAME]['stream'];
113 
114  if (!array_key_exists('readCallback', $ctx[self::WRAPPER_NAME])) {
115  throw new \RuntimeException('Invalid read callback');
116  }
117 
118  if (null !== $ctx[self::WRAPPER_NAME]['readCallback']) {
119  $r[$name] = $rawStream;
120  }
121 
122  // HACK: fstat() is used to measure the output buffer's size.
123  $stat = fstat($stream);
124  if ($stat['size'] > 0) {
125  $w[$name] = $rawStream;
126  }
127  }
128 
129  if (!count($r + $w + $e)) {
130  return false;
131  }
132 
133  $nb = @stream_select($r, $w, $e, null, null);
134 
135  if ($this->hasDispatch) {
136  pcntl_signal_dispatch();
137  }
138 
139  if (false === $nb) {
140  // The call has been interrupted, try again.
141  continue;
142  }
143 
144  if (0 === $nb) {
145  // This should never happen since $tv_sec & $tc_usec
146  // are both null, ie. stream_select() should wait forever.
147  throw new \RuntimeException('Invalid return value from stream_select()');
148  }
149 
150  foreach ($r as $name => $stream) {
151  $ctx = stream_context_get_options($streams[$name]);
152 
153  // If the event supposedly says there is
154  // incoming data to read, try to do so.
155  if (!feof($stream)) {
156  if (!array_key_exists('readCallback', $ctx[self::WRAPPER_NAME])) {
157  throw new \RuntimeException("Undefined 'readCallback'");
158  }
159 
160  if (null !== $ctx[self::WRAPPER_NAME]['readCallback']) {
161  if (!is_callable($ctx[self::WRAPPER_NAME]['readCallback'])) {
162  throw new \RuntimeException("Invalid 'readCallback'");
163  }
164 
165  try {
166  call_user_func($ctx[self::WRAPPER_NAME]['readCallback'], $this, $streams[$name], $name);
167  continue;
168  } catch (\fpoirotte\StreamManager\EOFException $e) {
169  // Catch the exception:
170  // the closeCallback will be called instead
171  }
172  }
173  }
174 
175  // If EOF had been signaled or no data could be read
176  // (meaning that EOF was reached), call 'closeCallback'.
177  if (!array_key_exists('closeCallback', $ctx[self::WRAPPER_NAME])) {
178  throw new \RuntimeException("Missing 'closeCallback'");
179  }
180 
181  if (null !== $ctx[self::WRAPPER_NAME]['closeCallback']) {
182  if (!is_callable($ctx[self::WRAPPER_NAME]['closeCallback'])) {
183  throw new \RuntimeException("Invalid 'closeCallback'");
184  }
185 
186  call_user_func($ctx[self::WRAPPER_NAME]['closeCallback'], $this, $streams[$name], $name);
187  } else {
188  // The default behaviour is to simply close the stream
189  fclose($streams[$name]);
190  }
191 
192  // Make sure the stream is not managed anymore
193  unset($this->streams[$name]);
194  }
195 
196  foreach (array_keys($w) as $name) {
197  // Try to flush the stream's output buffer.
198  stream_set_timeout($streams[$name], -1, -1);
199  }
200 
201  return true;
202  } while (true);
203  }
204 
205  public function loop($iter = 0)
206  {
207  if (!is_int($iter) || $iter < 0) {
208  throw new \InvalidArgumentException('Invalid iteration count');
209  }
210 
211  if (0 === $iter) {
212  do {
213  $continue = $this->loopOnce();
214  } while ($continue);
215  return;
216  }
217 
218  for ($i = 0; $i < $iter; $i++) {
219  if (!$this->loopOnce()) {
220  break;
221  }
222  }
223  }
224 }
Interface to provide accessing objects as arrays.
Definition: ArrayAccess.php:10
offsetSet($offset, $value)
Classes implementing Countable can be used with the count() function.
Definition: Countable.php:11