7 class Filter
implements IO
22 * @param callable $func filter proc
23 * @param callable $ctor constructor
24 * @param callable $dtor destructor
26 function __construct(callable
$func, callable
$ctor = null, callable
$dtor = null) {
28 * We don't have pipe(2) support, so we'll use socketpair(2) instead.
30 list($this->input
, $this->output
) = stream_socket_pair(STREAM_PF_UNIX
, STREAM_SOCK_STREAM
, STREAM_IPPROTO_IP
);
31 stream_filter_append($this->input
, "atick\\IO\\StreamFilter", STREAM_FILTER_WRITE
, compact("func", "ctor", "dtor"));
32 stream_set_blocking($this->output
, false);
36 * Cleanup socketpair(2) resources
38 function __destruct() {
39 if (is_resource($this->input
)) {
42 if (is_resource($this->output
)) {
43 fclose($this->output
);
51 function getOutput() {
68 function __invoke($fd = null) {
70 copy($fd, $this->getInput());
72 return $this->getOutput();
76 class StreamFilter
extends \php_user_filter
78 public $filtername = "atick\\IO\\Func";
81 function filter($in, $out, &$consumed, $closing) {
82 while ($bucket = stream_bucket_make_writeable($in)) {
83 $consumed +
= $bucket->datalen
;
84 $bucket->data
= call_user_func($this->params
["func"], $this, $bucket->data
, $closing);
85 stream_bucket_append($out, $bucket);
91 if (!empty($this->params
["dtor"])) {
92 call_user_func($this->params
["dtor"], $this);
97 if (!empty($this->params
["ctor"])) {
98 call_user_func($this->params
["ctor"], $this);
103 stream_filter_register("atick\\IO\\StreamFilter", "\\atick\\IO\\StreamFilter");