X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fatick;a=blobdiff_plain;f=lib%2Fatick%2FIO%2FFilter.php;fp=lib%2Fatick%2FIO%2FFilter.php;h=bb28c0722179ca30e052a4368c978cfb20f635ae;hp=0000000000000000000000000000000000000000;hb=363315d4d8cbca1850183e27243e317359e261e5;hpb=35f7f2f1715347ce20976dcf6747047e2396b35e diff --git a/lib/atick/IO/Filter.php b/lib/atick/IO/Filter.php new file mode 100644 index 0000000..bb28c07 --- /dev/null +++ b/lib/atick/IO/Filter.php @@ -0,0 +1,103 @@ +input, $this->output) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + stream_filter_append($this->input, "atick\\IO\\StreamFilter", STREAM_FILTER_WRITE, compact("func", "ctor", "dtor")); + stream_set_blocking($this->output, false); + } + + /** + * Cleanup socketpair(2) resources + */ + function __destruct() { + if (is_resource($this->input)) { + fclose($this->input); + } + if (is_resource($this->output)) { + fclose($this->output); + } + } + + /** + * @inheritdoc + * @return resource + */ + function getOutput() { + return $this->output; + } + + /** + * @inheritdoc + * @return resource + */ + function getInput() { + return $this->input; + } + + /** + * @inheritdoc + * @param resource $fd + * @return resource + */ + function __invoke($fd = null) { + if ($fd) { + copy($fd, $this->getInput()); + } + return $this->getOutput(); + } +} + +class StreamFilter extends \php_user_filter +{ + public $filtername = "atick\\IO\\Func"; + public $params; + + function filter($in, $out, &$consumed, $closing) { + while ($bucket = stream_bucket_make_writeable($in)) { + $consumed += $bucket->datalen; + $bucket->data = call_user_func($this->params["func"], $this, $bucket->data, $closing); + stream_bucket_append($out, $bucket); + } + return PSFS_PASS_ON; + } + + function onClose() { + if (!empty($this->params["dtor"])) { + call_user_func($this->params["dtor"], $this); + } + } + + function onCreate() { + if (!empty($this->params["ctor"])) { + call_user_func($this->params["ctor"], $this); + } + } +} + +stream_filter_register("atick\\IO\\StreamFilter", "\\atick\\IO\\StreamFilter");