12 protected $filteredStream;
13 protected $outputBuffer =
'';
16 public function stream_open($path, $mode, $options, &$opened_path)
19 if (null === $this->context) {
20 throw new \InvalidArgumentException(
'A valid stream context is required');
23 $ctxOptions = stream_context_get_options($this->context);
24 $wrapper = StreamManager::WRAPPER_NAME;
25 if (!isset($ctxOptions[$wrapper][
'stream']) || !self::isStream($ctxOptions[$wrapper][
'stream'])) {
26 throw new \InvalidArgumentException(
'No stream specified in context');
29 $this->filteredStream = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
30 if (
false === $this->filteredStream) {
31 throw new \RuntimeException(
'Could not create a wrapper');
34 if (
false === stream_set_blocking($this->filteredStream[0],
false) ||
35 false === stream_set_blocking($this->filteredStream[1],
false)) {
36 throw new \RuntimeException(
'Could not set streams into non-blocking mode');
39 if (array_key_exists(
'readCallback', $ctxOptions[$wrapper]) &&
40 null !== $ctxOptions[$wrapper][
'readCallback'] &&
41 !is_callable($ctxOptions[$wrapper][
'readCallback'])) {
42 throw new \InvalidArgumentException(
'Invalid read callback');
45 if (array_key_exists(
'closeCallback', $ctxOptions[$wrapper]) &&
46 null !== $ctxOptions[$wrapper][
'closeCallback'] &&
47 !is_callable($ctxOptions[$wrapper][
'closeCallback'])) {
48 throw new \InvalidArgumentException(
'Invalid close callback');
51 $this->rawStream = $ctxOptions[$wrapper][
'stream'];
52 $this->filters = array();
59 return strlen($this->outputBuffer);
62 protected static function isStream($value)
64 return is_resource($value) &&
'stream' === get_resource_type($value);
68 public function stream_close()
71 fclose($this->filteredStream[0]);
72 fclose($this->filteredStream[1]);
73 fclose($this->rawStream);
77 public function stream_eof()
80 return feof($this->rawStream);
84 public function stream_flush()
87 return $this->sendOutput() && fflush($this->rawStream);
91 public function stream_lock($operation)
94 return flock($this->rawStream, $operation);
98 public function stream_seek($offset, $whence = SEEK_SET)
101 return !fseek($this->rawStream, $offset, $whence);
105 public function stream_set_option($option, $arg1, $arg2)
109 case STREAM_OPTION_BLOCKING:
110 return stream_set_blocking($this->rawStream, $arg1);
112 case STREAM_OPTION_READ_TIMEOUT:
114 if (-1 === $arg1 && -1 === $arg2) {
115 return $this->sendOutput();
117 return stream_set_timeout($this->rawStream, $arg1, $arg2);
119 case STREAM_OPTION_WRITE_BUFFER:
120 return stream_set_write_buffer($this->rawStream, $arg2);
123 throw new \RuntimeException(
'Invalid option');
128 public function stream_stat()
133 return array(
'size' => strlen($this->outputBuffer));
137 public function stream_tell()
140 return ftell($this->rawStream);
144 public function stream_truncate($new_size)
147 return ftruncate($this->rawStream, $rawSize);
151 public function stream_read($count)
154 $data = fread($this->rawStream, $count);
155 if (
false === $data) {
159 if (
'' === $data && feof($this->rawStream)) {
163 while (strlen($data) > 0) {
164 $written = fwrite($this->filteredStream[1], $data);
165 if (
false === $written) {
168 $data = (string) substr($data, $written);
171 $res = fread($this->filteredStream[0], 2 * $count);
176 public function stream_write($data)
180 while (strlen($data) > 0) {
181 $written = fwrite($this->filteredStream[0], $data);
182 if (
false === $written) {
183 throw new \RuntimeException(
'Error during write');
185 $data = (string) substr($data, $written);
188 while (
false !== ($read = fread($this->filteredStream[1], 8192))) {
192 $this->outputBuffer .= $read;
200 if (
false === $this->sendOutput()) {
201 throw new \RuntimeException(
'Error while sending output');
206 public function sendOutput()
208 while (strlen($this->outputBuffer) > 0) {
209 $written = fwrite($this->rawStream, $this->outputBuffer);
210 if (
false === $written) {
218 $this->outputBuffer = (string) substr($this->outputBuffer, $written);
Classes implementing Countable can be used with the count() function.