8 protected $hasDispatch;
10 const WRAPPER_NAME =
'streammanager.wrapper';
11 const WRAPPER_CLASS =
'\\fpoirotte\\StreamManager\\StreamWrapper';
13 public function __construct()
15 if (!in_array(static::WRAPPER_NAME, stream_get_wrappers())) {
16 if (
false === stream_wrapper_register(static::WRAPPER_NAME, static::WRAPPER_CLASS)) {
17 throw new \RuntimeException(
'Could not register stream wrapper');
21 $this->streams = array();
22 $this->hasDispatch = function_exists(
'pcntl_signal_dispatch');
27 return count($this->streams);
32 return isset($this->streams[$offset]);
37 unset($this->streams[$offset]);
42 if (!isset($this->streams[$offset])) {
45 return $this->streams[$offset];
50 if (!is_resource($value) ||
'stream' !== get_resource_type($value)) {
51 throw new \InvalidArgumentException(
'A stream was expected');
55 $meta = stream_get_meta_data($value);
56 $uri = isset($meta[
'uri']) ? $meta[
'uri'] :
'';
57 if (strncasecmp(static::WRAPPER_NAME .
':', $uri, strlen(static::WRAPPER_NAME) + 1)) {
60 $options = stream_context_get_options($value);
61 if (
false === $options) {
62 throw new \RuntimeException(
'Could not retrieve stream context options');
65 $options[self::WRAPPER_NAME][
'stream'] = $value;
66 if (!isset($options[self::WRAPPER_NAME][
'readCallback'])) {
67 $options[self::WRAPPER_NAME][
'readCallback'] = null;
70 if (!isset($options[self::WRAPPER_NAME][
'closeCallback'])) {
71 $options[self::WRAPPER_NAME][
'closeCallback'] = null;
74 $ctx = stream_context_create($options);
75 $value = fopen(static::WRAPPER_NAME .
'://',
'r+b',
false, $ctx);
76 if (
false === $value) {
77 throw new \RuntimeException(
'Could not wrap the stream');
86 foreach ($options as $wrapper => $suboptions) {
87 foreach ($suboptions as $optname => $optvalue) {
88 stream_context_set_option($value, $wrapper, $optname, $optvalue);
93 if (!is_string($offset) || is_numeric($offset)) {
94 throw new \InvalidArgumentException(
'You must assign a name to the stream');
96 $this->streams[$offset] = $value;
99 public function loopOnce()
107 $streams = $this->streams;
110 foreach ($this->streams as $name => $stream) {
111 $ctx = stream_context_get_options($stream);
112 $rawStream = $ctx[self::WRAPPER_NAME][
'stream'];
114 if (!array_key_exists(
'readCallback', $ctx[self::WRAPPER_NAME])) {
115 throw new \RuntimeException(
'Invalid read callback');
118 if (null !== $ctx[self::WRAPPER_NAME][
'readCallback']) {
119 $r[$name] = $rawStream;
123 $stat = fstat($stream);
124 if ($stat[
'size'] > 0) {
125 $w[$name] = $rawStream;
129 if (!count($r + $w + $e)) {
133 $nb = @stream_select($r, $w, $e, null, null);
135 if ($this->hasDispatch) {
136 pcntl_signal_dispatch();
147 throw new \RuntimeException(
'Invalid return value from stream_select()');
150 foreach ($r as $name => $stream) {
151 $ctx = stream_context_get_options($streams[$name]);
155 if (!feof($stream)) {
156 if (!array_key_exists(
'readCallback', $ctx[self::WRAPPER_NAME])) {
157 throw new \RuntimeException(
"Undefined 'readCallback'");
160 if (null !== $ctx[self::WRAPPER_NAME][
'readCallback']) {
161 if (!is_callable($ctx[self::WRAPPER_NAME][
'readCallback'])) {
162 throw new \RuntimeException(
"Invalid 'readCallback'");
166 call_user_func($ctx[self::WRAPPER_NAME][
'readCallback'], $this, $streams[$name], $name);
177 if (!array_key_exists(
'closeCallback', $ctx[self::WRAPPER_NAME])) {
178 throw new \RuntimeException(
"Missing 'closeCallback'");
181 if (null !== $ctx[self::WRAPPER_NAME][
'closeCallback']) {
182 if (!is_callable($ctx[self::WRAPPER_NAME][
'closeCallback'])) {
183 throw new \RuntimeException(
"Invalid 'closeCallback'");
186 call_user_func($ctx[self::WRAPPER_NAME][
'closeCallback'], $this, $streams[$name], $name);
189 fclose($streams[$name]);
193 unset($this->streams[$name]);
196 foreach (array_keys($w) as $name) {
198 stream_set_timeout($streams[$name], -1, -1);
205 public function loop($iter = 0)
207 if (!is_int($iter) || $iter < 0) {
208 throw new \InvalidArgumentException(
'Invalid iteration count');
213 $continue = $this->loopOnce();
218 for ($i = 0; $i < $iter; $i++) {
219 if (!$this->loopOnce()) {
Interface to provide accessing objects as arrays.
offsetSet($offset, $value)
Classes implementing Countable can be used with the count() function.