bb28c0722179ca30e052a4368c978cfb20f635ae
[m6w6/atick] / lib / atick / IO / Filter.php
1 <?php
2
3 namespace atick\IO;
4
5 use atick\IO;
6
7 class Filter implements IO
8 {
9 /**
10 * Input stream
11 * @var resource
12 */
13 protected $input;
14
15 /**
16 * Output stream
17 * @var resource
18 */
19 protected $output;
20
21 /**
22 * @param callable $func filter proc
23 * @param callable $ctor constructor
24 * @param callable $dtor destructor
25 */
26 function __construct(callable $func, callable $ctor = null, callable $dtor = null) {
27 /*
28 * We don't have pipe(2) support, so we'll use socketpair(2) instead.
29 */
30 list($this->input, $this->output) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
31 stream_filter_append($this->input, "atick\\IO\\StreamFilter", STREAM_FILTER_WRITE, compact("func", "ctor", "dtor"));
32 stream_set_blocking($this->output, false);
33 }
34
35 /**
36 * Cleanup socketpair(2) resources
37 */
38 function __destruct() {
39 if (is_resource($this->input)) {
40 fclose($this->input);
41 }
42 if (is_resource($this->output)) {
43 fclose($this->output);
44 }
45 }
46
47 /**
48 * @inheritdoc
49 * @return resource
50 */
51 function getOutput() {
52 return $this->output;
53 }
54
55 /**
56 * @inheritdoc
57 * @return resource
58 */
59 function getInput() {
60 return $this->input;
61 }
62
63 /**
64 * @inheritdoc
65 * @param resource $fd
66 * @return resource
67 */
68 function __invoke($fd = null) {
69 if ($fd) {
70 copy($fd, $this->getInput());
71 }
72 return $this->getOutput();
73 }
74 }
75
76 class StreamFilter extends \php_user_filter
77 {
78 public $filtername = "atick\\IO\\Func";
79 public $params;
80
81 function filter($in, $out, &$consumed, $closing) {
82 while ($bucket = stream_bucket_make_writeable($in)) {
83 $consumed += $bucket->datalen;
84 $bucket->data = call_user_func($this->params["func"], $this, $bucket->data, $closing);
85 stream_bucket_append($out, $bucket);
86 }
87 return PSFS_PASS_ON;
88 }
89
90 function onClose() {
91 if (!empty($this->params["dtor"])) {
92 call_user_func($this->params["dtor"], $this);
93 }
94 }
95
96 function onCreate() {
97 if (!empty($this->params["ctor"])) {
98 call_user_func($this->params["ctor"], $this);
99 }
100 }
101 }
102
103 stream_filter_register("atick\\IO\\StreamFilter", "\\atick\\IO\\StreamFilter");