From: Michael Wallner Date: Mon, 13 Jun 2016 05:19:23 +0000 (+0200) Subject: user event loop support for the curl client X-Git-Tag: RELEASE_2_6_0_BETA1~14 X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fext-http;a=commitdiff_plain;h=55d74c2d07edcf6d51fe62bc257c3bde0f7f25d4 user event loop support for the curl client --- diff --git a/.gitignore b/.gitignore index fc8d761..33cd7c6 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ lcov_data *~ *.phar vendor/ +tests/helper/server.log diff --git a/src/php_http.c b/src/php_http.c index 2ff20f1..abb8377 100644 --- a/src/php_http.c +++ b/src/php_http.c @@ -150,6 +150,7 @@ PHP_MINIT_FUNCTION(http) #if PHP_HTTP_HAVE_CURL || SUCCESS != PHP_MINIT_CALL(http_curl) || SUCCESS != PHP_MINIT_CALL(http_client_curl) + || SUCCESS != PHP_MINIT_CALL(http_client_curl_user) #endif || SUCCESS != PHP_MINIT_CALL(http_url) || SUCCESS != PHP_MINIT_CALL(http_env) diff --git a/src/php_http_client_curl.c b/src/php_http_client_curl.c index d3f5200..863b342 100644 --- a/src/php_http_client_curl.c +++ b/src/php_http_client_curl.c @@ -13,6 +13,7 @@ #include "php_http_api.h" #include "php_http_client.h" #include "php_http_client_curl_event.h" +#include "php_http_client_curl_user.h" #if PHP_HTTP_HAVE_CURL @@ -667,6 +668,27 @@ void php_http_client_curl_responsehandler(php_http_client_t *context) } } +void php_http_client_curl_loop(php_http_client_t *client, curl_socket_t s, int curl_action) +{ + CURLMcode rc; + php_http_client_curl_t *curl = client->ctx; + TSRMLS_FETCH_FROM_CTX(client->ts); + +#if DBG_EVENTS + fprintf(stderr, "H"); +#endif + + do { + rc = curl_multi_socket_action(curl->handle->multi, s, curl_action, &curl->unfinished); + } while (CURLM_CALL_MULTI_PERFORM == rc); + + if (CURLM_OK != rc) { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "%s", curl_multi_strerror(rc)); + } + + php_http_client_curl_responsehandler(client); +} + /* curl options */ static php_http_options_t php_http_curle_options, php_http_curlm_options; @@ -1534,22 +1556,17 @@ static ZEND_RESULT_CODE php_http_curlm_option_set_pipelining_bl(php_http_option_ } #endif -#if PHP_HTTP_HAVE_EVENT -static inline ZEND_RESULT_CODE php_http_curlm_use_eventloop(php_http_client_t *h, zend_bool enable) +static inline ZEND_RESULT_CODE php_http_curlm_use_eventloop(php_http_client_t *h, php_http_client_curl_ops_t *ev_ops, zval *init_data) { php_http_client_curl_t *curl = h->ctx; + void *ev_ctx; - if (enable) { - if (!curl->ev_ops) { - if (!(curl->ev_ops = php_http_client_curl_event_ops_get())) { - return FAILURE; - } - } - if (curl->ev_ops && !curl->ev_ctx) { - if (!(curl->ev_ctx = curl->ev_ops->init(h))) { - return FAILURE; - } + if (ev_ops) { + if (!(ev_ctx = ev_ops->init(h, init_data))) { + return FAILURE; } + curl->ev_ctx = ev_ctx; + curl->ev_ops = ev_ops; } else { if (curl->ev_ops) { if (curl->ev_ctx) { @@ -1565,13 +1582,19 @@ static inline ZEND_RESULT_CODE php_http_curlm_use_eventloop(php_http_client_t *h static ZEND_RESULT_CODE php_http_curlm_option_set_use_eventloop(php_http_option_t *opt, zval *value, void *userdata) { php_http_client_t *client = userdata; + php_http_client_curl_ops_t *ev_ops = NULL; + TSRMLS_FETCH_FROM_CTX(client->ts); - if (Z_TYPE_P(value) == IS_OBJECT /* && instanceof_function */) { - abort(); + if (Z_TYPE_P(value) == IS_OBJECT && instanceof_function(Z_OBJCE_P(value), php_http_client_curl_user_class_entry TSRMLS_CC)) { + ev_ops = php_http_client_curl_user_ops_get(); +#if PHP_HTTP_HAVE_EVENT + } else if (value && z_is_true(value)) { + ev_ops = php_http_client_curl_event_ops_get(); +#endif } - return php_http_curlm_use_eventloop(client, value && z_is_true(value)); + + return php_http_curlm_use_eventloop(client, ev_ops, value); } -#endif static ZEND_RESULT_CODE php_http_curlm_option_set_share_cookies(php_http_option_t *opt, zval *value, void *userdata) { @@ -1661,11 +1684,9 @@ static void php_http_curlm_options_init(php_http_options_t *registry TSRMLS_DC) } #endif /* events */ -#if PHP_HTTP_HAVE_EVENT - if ((opt = php_http_option_register(registry, ZEND_STRL("use_eventloop"), 0, IS_BOOL))) { + if ((opt = php_http_option_register(registry, ZEND_STRL("use_eventloop"), 0, 0))) { opt->setter = php_http_curlm_option_set_use_eventloop; } -#endif /* share */ if ((opt = php_http_option_register(registry, ZEND_STRL("share_cookies"), 0, IS_BOOL))) { opt->setter = php_http_curlm_option_set_share_cookies; @@ -2238,7 +2259,9 @@ static ZEND_RESULT_CODE php_http_client_curl_setopt(php_http_client_t *h, php_ht case PHP_HTTP_CLIENT_OPT_USE_EVENTS: #if PHP_HTTP_HAVE_EVENT - return php_http_curlm_use_eventloop(h, *(zend_bool *) arg); + return php_http_curlm_use_eventloop(h, (*(zend_bool *) arg) + ? php_http_client_curl_event_ops_get() + : NULL, NULL); break; #endif diff --git a/src/php_http_client_curl.h b/src/php_http_client_curl.h index f84107f..beeb8df 100644 --- a/src/php_http_client_curl.h +++ b/src/php_http_client_curl.h @@ -49,6 +49,7 @@ static inline void php_http_client_curl_get_timeout(php_http_client_curl_t *curl } PHP_HTTP_API void php_http_client_curl_responsehandler(php_http_client_t *client); +PHP_HTTP_API void php_http_client_curl_loop(php_http_client_t *client, curl_socket_t s, int curl_action); PHP_MINIT_FUNCTION(http_client_curl); PHP_MSHUTDOWN_FUNCTION(http_client_curl); diff --git a/src/php_http_client_curl_user.c b/src/php_http_client_curl_user.c new file mode 100644 index 0000000..94d7f41 --- /dev/null +++ b/src/php_http_client_curl_user.c @@ -0,0 +1,348 @@ +/* + +--------------------------------------------------------------------+ + | PECL :: http | + +--------------------------------------------------------------------+ + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted provided that the conditions mentioned | + | in the accompanying LICENSE file are met. | + +--------------------------------------------------------------------+ + | Copyright (c) 2004-2014, Michael Wallner | + +--------------------------------------------------------------------+ +*/ + +#include "php_http_api.h" +#include "php_http_client.h" +#include "php_http_client_curl.h" +#include "php_http_client_curl_user.h" + +#include "php_network.h" +#include "zend_closures.h" + +#if PHP_HTTP_HAVE_CURL + +typedef struct php_http_client_curl_user_context { + php_http_client_t *client; + zval *user; + zend_function closure; + php_http_object_method_t timer; + php_http_object_method_t socket; + php_http_object_method_t once; + php_http_object_method_t wait; + php_http_object_method_t send; +} php_http_client_curl_user_context_t; + +typedef struct php_http_client_curl_user_ev { + php_stream *socket; + php_http_client_curl_user_context_t *context; +} php_http_client_curl_user_ev_t; + +static void php_http_client_curl_user_handler(INTERNAL_FUNCTION_PARAMETERS) +{ + zval *zstream = NULL, *zclient = NULL; + php_stream *stream = NULL; + long action = 0; + php_socket_t fd = CURL_SOCKET_TIMEOUT; + php_http_client_object_t *client = NULL; + + if (SUCCESS != zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|rl", &zclient, php_http_client_class_entry, &zstream, &action)) { + return; + } + + client = zend_object_store_get_object(zclient TSRMLS_CC); + if (zstream) { + php_stream_from_zval(stream, &zstream); + + if (SUCCESS != php_stream_cast(stream, PHP_STREAM_AS_SOCKETD, (void *) &fd, 1)) { + return; + } + } + php_http_client_curl_loop(client->client, fd, action); +} + +static void php_http_client_curl_user_timer(CURLM *multi, long timeout_ms, void *timer_data) +{ + php_http_client_curl_user_context_t *context = timer_data; + +#if DBG_EVENTS + fprintf(stderr, "\ntimer <- timeout_ms: %ld\n", timeout_ms); +#endif + + if (timeout_ms <= 0) { + php_http_client_curl_loop(context->client, CURL_SOCKET_TIMEOUT, 0); + } else if (timeout_ms > 0) { + zval **args[1], *ztimeout; + TSRMLS_FETCH_FROM_CTX(context->client->ts); + + MAKE_STD_ZVAL(ztimeout); + ZVAL_LONG(ztimeout, timeout_ms); + args[0] = &ztimeout; + php_http_object_method_call(&context->timer, context->user, NULL, 1, args TSRMLS_CC); + zval_ptr_dtor(&ztimeout); + } +} + +static int php_http_client_curl_user_socket(CURL *easy, curl_socket_t sock, int action, void *socket_data, void *assign_data) +{ + php_http_client_curl_user_context_t *ctx = socket_data; + php_http_client_curl_t *curl = ctx->client->ctx; + php_http_client_curl_user_ev_t *ev = assign_data; + zval **args[2], *zaction, *zsocket; + TSRMLS_FETCH_FROM_CTX(ctx->client->ts); + +#if DBG_EVENTS + fprintf(stderr, "S"); +#endif + + if (!ev) { + ev = ecalloc(1, sizeof(*ev)); + ev->context = ctx; + ev->socket = php_stream_sock_open_from_socket(sock, NULL); + + curl_multi_assign(curl->handle->multi, sock, ev); + } + + switch (action) { + case CURL_POLL_IN: + case CURL_POLL_OUT: + case CURL_POLL_INOUT: + case CURL_POLL_REMOVE: + case CURL_POLL_NONE: + MAKE_STD_ZVAL(zsocket); + php_stream_to_zval(ev->socket, zsocket); + args[0] = &zsocket; + MAKE_STD_ZVAL(zaction); + ZVAL_LONG(zaction, action); + args[1] = &zaction; + php_http_object_method_call(&ctx->socket, ctx->user, NULL, 2, args TSRMLS_CC); + zval_ptr_dtor(&zsocket); + zval_ptr_dtor(&zaction); + break; + + default: + php_error_docref(NULL TSRMLS_CC, E_WARNING, "Unknown socket action %d", action); + return -1; + } + + if (action == CURL_POLL_REMOVE && ev) { + efree(ev); + } + return 0; +} + +static ZEND_RESULT_CODE php_http_client_curl_user_once(void *context) +{ + php_http_client_curl_user_context_t *ctx = context; + TSRMLS_FETCH_FROM_CTX(ctx->client->ts); + +#if DBG_EVENTS + fprintf(stderr, "O"); +#endif + + return php_http_object_method_call(&ctx->once, ctx->user, NULL, 0, NULL TSRMLS_CC); +} + +static ZEND_RESULT_CODE php_http_client_curl_user_wait(void *context, struct timeval *custom_timeout) +{ + php_http_client_curl_user_context_t *ctx = context; + struct timeval timeout; + zval **args[1], *ztimeout; + ZEND_RESULT_CODE rv; + TSRMLS_FETCH_FROM_CTX(ctx->client->ts); + +#if DBG_EVENTS + fprintf(stderr, "W"); +#endif + + if (!custom_timeout || !timerisset(custom_timeout)) { + php_http_client_curl_get_timeout(ctx->client->ctx, 1000, &timeout); + custom_timeout = &timeout; + } + + MAKE_STD_ZVAL(ztimeout); + ZVAL_LONG(ztimeout, custom_timeout->tv_sec * 1000 + custom_timeout->tv_usec / 1000); + args[0] = &ztimeout; + rv = php_http_object_method_call(&ctx->wait, ctx->user, NULL, 1, args TSRMLS_CC); + zval_ptr_dtor(&ztimeout); + + return rv; +} + +static ZEND_RESULT_CODE php_http_client_curl_user_exec(void *context) +{ + php_http_client_curl_user_context_t *ctx = context; + php_http_client_curl_t *curl = ctx->client->ctx; + TSRMLS_FETCH_FROM_CTX(ctx->client->ts); + +#if DBG_EVENTS + fprintf(stderr, "E"); +#endif + + /* kickstart */ + php_http_client_curl_loop(ctx->client, CURL_SOCKET_TIMEOUT, 0); + + do { + if (SUCCESS != php_http_object_method_call(&ctx->send, ctx->user, NULL, 0, NULL TSRMLS_CC)) { + return FAILURE; + } + } while (curl->unfinished && !EG(exception)); + + return SUCCESS; +} + +static void *php_http_client_curl_user_init(php_http_client_t *client, void *user_data) +{ + php_http_client_curl_t *curl = client->ctx; + php_http_client_curl_user_context_t *ctx; + php_http_object_method_t init; + zval *zclosure, **args[1]; + TSRMLS_FETCH_FROM_CTX(client->ts); + +#if DBG_EVENTS + fprintf(stderr, "I"); +#endif + + ctx = ecalloc(1, sizeof(*ctx)); + ctx->client = client; + ctx->user = user_data; + Z_ADDREF_P(ctx->user); + + memset(&ctx->closure, 0, sizeof(ctx->closure)); + ctx->closure.common.type = ZEND_INTERNAL_FUNCTION; + ctx->closure.common.function_name = "php_http_client_curl_user_handler"; + ctx->closure.internal_function.handler = php_http_client_curl_user_handler; + + MAKE_STD_ZVAL(zclosure); + zend_create_closure(zclosure, &ctx->closure, NULL, NULL TSRMLS_CC); + args[0] = &zclosure; + + php_http_object_method_init(&init, ctx->user, ZEND_STRL("init") TSRMLS_CC); + php_http_object_method_call(&init, ctx->user, NULL, 1, args TSRMLS_CC); + php_http_object_method_dtor(&init); + zval_ptr_dtor(&zclosure); + + php_http_object_method_init(&ctx->timer, ctx->user, ZEND_STRL("timer") TSRMLS_CC); + php_http_object_method_init(&ctx->socket, ctx->user, ZEND_STRL("socket") TSRMLS_CC); + php_http_object_method_init(&ctx->once, ctx->user, ZEND_STRL("once") TSRMLS_CC); + php_http_object_method_init(&ctx->wait, ctx->user, ZEND_STRL("wait") TSRMLS_CC); + php_http_object_method_init(&ctx->send, ctx->user, ZEND_STRL("send") TSRMLS_CC); + + curl_multi_setopt(curl->handle->multi, CURLMOPT_SOCKETDATA, ctx); + curl_multi_setopt(curl->handle->multi, CURLMOPT_SOCKETFUNCTION, php_http_client_curl_user_socket); + curl_multi_setopt(curl->handle->multi, CURLMOPT_TIMERDATA, ctx); + curl_multi_setopt(curl->handle->multi, CURLMOPT_TIMERFUNCTION, php_http_client_curl_user_timer); + + return ctx; +} + +static void php_http_client_curl_user_dtor(void **context) +{ + php_http_client_curl_user_context_t *ctx = *context; + php_http_client_curl_t *curl; + +#if DBG_EVENTS + fprintf(stderr, "D"); +#endif + + ZEND_ASSERT(ctx); + + curl = ctx->client->ctx; + + curl_multi_setopt(curl->handle->multi, CURLMOPT_SOCKETDATA, NULL); + curl_multi_setopt(curl->handle->multi, CURLMOPT_SOCKETFUNCTION, NULL); + curl_multi_setopt(curl->handle->multi, CURLMOPT_TIMERDATA, NULL); + curl_multi_setopt(curl->handle->multi, CURLMOPT_TIMERFUNCTION, NULL); + + php_http_object_method_dtor(&ctx->timer); + php_http_object_method_dtor(&ctx->socket); + php_http_object_method_dtor(&ctx->once); + php_http_object_method_dtor(&ctx->wait); + php_http_object_method_dtor(&ctx->send); + + zval_ptr_dtor(&ctx->user); + + efree(ctx); + *context = NULL; +} + +static php_http_client_curl_ops_t php_http_client_curl_user_ops = { + &php_http_client_curl_user_init, + &php_http_client_curl_user_dtor, + &php_http_client_curl_user_once, + &php_http_client_curl_user_wait, + &php_http_client_curl_user_exec, +}; + +php_http_client_curl_ops_t *php_http_client_curl_user_ops_get() +{ + return &php_http_client_curl_user_ops; +} + +zend_class_entry *php_http_client_curl_user_class_entry; + +ZEND_BEGIN_ARG_INFO_EX(ai_init, 0, 0, 1) + ZEND_ARG_TYPE_INFO(0, run, IS_CALLABLE, 0) +ZEND_END_ARG_INFO(); +ZEND_BEGIN_ARG_INFO_EX(ai_timer, 0, 0, 1) +#if PHP_VERSION_ID >= 70000 + ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) +#else + ZEND_ARG_INFO(0, timeout_ms) +#endif +ZEND_END_ARG_INFO(); +ZEND_BEGIN_ARG_INFO_EX(ai_socket, 0, 0, 2) +#if PHP_VERSION_ID >= 70000 + ZEND_ARG_TYPE_INFO(0, socket, IS_RESOURCE, 0) + ZEND_ARG_TYPE_INFO(0, action, IS_LONG, 0) +#else + ZEND_ARG_INFO(0, socket) + ZEND_ARG_INFO(0, action) +#endif +ZEND_END_ARG_INFO(); +ZEND_BEGIN_ARG_INFO_EX(ai_once, 0, 0, 0) +ZEND_END_ARG_INFO(); +ZEND_BEGIN_ARG_INFO_EX(ai_wait, 0, 0, 0) +#if PHP_VERSION_ID >= 70000 + ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) +#else + ZEND_ARG_INFO(0, timeout_ms) +#endif +ZEND_END_ARG_INFO(); +ZEND_BEGIN_ARG_INFO_EX(ai_send, 0, 0, 0) +ZEND_END_ARG_INFO(); + +static zend_function_entry php_http_client_curl_user_methods[] = { + PHP_ABSTRACT_ME(HttpClientCurlUser, init, ai_init) + PHP_ABSTRACT_ME(HttpClientCurlUser, timer, ai_timer) + PHP_ABSTRACT_ME(HttpClientCurlUser, socket, ai_socket) + PHP_ABSTRACT_ME(HttpClientCurlUser, once, ai_once) + PHP_ABSTRACT_ME(HttpClientCurlUser, wait, ai_wait) + PHP_ABSTRACT_ME(HttpClientCulrUser, send, ai_send) + {0} +}; + +PHP_MINIT_FUNCTION(http_client_curl_user) +{ + zend_class_entry ce = {0}; + + INIT_NS_CLASS_ENTRY(ce, "http\\Client\\Curl", "User", php_http_client_curl_user_methods); + php_http_client_curl_user_class_entry = zend_register_internal_interface(&ce TSRMLS_CC); + + zend_declare_class_constant_long(php_http_client_curl_user_class_entry, ZEND_STRL("POLL_NONE"), CURL_POLL_NONE TSRMLS_CC); + zend_declare_class_constant_long(php_http_client_curl_user_class_entry, ZEND_STRL("POLL_IN"), CURL_POLL_IN TSRMLS_CC); + zend_declare_class_constant_long(php_http_client_curl_user_class_entry, ZEND_STRL("POLL_OUT"), CURL_POLL_OUT TSRMLS_CC); + zend_declare_class_constant_long(php_http_client_curl_user_class_entry, ZEND_STRL("POLL_INOUT"), CURL_POLL_INOUT TSRMLS_CC); + zend_declare_class_constant_long(php_http_client_curl_user_class_entry, ZEND_STRL("POLL_REMOVE"), CURL_POLL_REMOVE TSRMLS_CC); + + return SUCCESS; +} + +#endif /* PHP_HTTP_HAVE_CURL */ + +/* + * Local variables: + * tab-width: 4 + * c-basic-offset: 4 + * End: + * vim600: noet sw=4 ts=4 fdm=marker + * vim<600: noet sw=4 ts=4 + */ diff --git a/src/php_http_client_curl_user.h b/src/php_http_client_curl_user.h new file mode 100644 index 0000000..8a6778f --- /dev/null +++ b/src/php_http_client_curl_user.h @@ -0,0 +1,89 @@ +/* + +--------------------------------------------------------------------+ + | PECL :: http | + +--------------------------------------------------------------------+ + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted provided that the conditions mentioned | + | in the accompanying LICENSE file are met. | + +--------------------------------------------------------------------+ + | Copyright (c) 2004-2014, Michael Wallner | + +--------------------------------------------------------------------+ +*/ + +#if PHP_HTTP_HAVE_CURL + +PHP_HTTP_API zend_class_entry *php_http_client_curl_user_class_entry; +PHP_HTTP_API php_http_client_curl_ops_t *php_http_client_curl_user_ops_get(); +PHP_MINIT_FUNCTION(http_client_curl_user); + +#endif + +#if 0 + +--FILE-- + [], + "W" => [] + ]; + private $R = []; + private $W = []; + private $timeout = 1000; + + function __construct(http\Client $client) { + $this->client = $client; + } + + function init(callable $run) { + $this->run = $run; + } + + function timer($timeout_ms) { + echo "T"; + $this->timeout = $timeout_ms; + } + + function socket($socket, $action) { + echo "S"; + + switch ($action) { + case self::POLL_NONE: + break; + case self::POLL_REMOVE: + if (false !== ($r = array_search($socket, $this->fds["R"], true))) { + echo "U"; + unset($this->fds["R"][$r]); + } + + if (false !== ($w = array_search($socket, $this->fds["W"], true))) { + echo "U"; + unset($this->fds["W"][$w]); + } + + break; + + default: + if ($action & self::POLL_IN) { + if (!in_array($socket, $this->fds["R"], true)) { + $this->fds["R"][] = $socket; + } + } + if ($action & self::POLL_OUT) { + if (!in_array($socket, $this->fds["W"], true)) { + $this->fds["W"][] = $socket; + } + } + break; + } + } + + function once() { + echo "O"; + + foreach ($this->W as $w) { + call_user_func($this->run, $this->client, $w, self::POLL_OUT); + } + foreach ($this->R as $r) { + call_user_func($this->run, $this->client, $r, self::POLL_IN); + } + return count($this->client); + } + + function wait($timeout_ms = null) { + echo "W"; + + if ($timeout_ms === null) { + $timeout_ms = $this->timeout; + } + $ts = floor($timeout_ms / 1000); + $tu = ($timeout_ms % 1000) * 1000; + + extract($this->fds); + + if (($wfds = count($R) + count($W))) { + $nfds = stream_select($R, $W, $E, $ts, $tu); + } else { + $nfds = 0; + } + $this->R = (array) $R; + $this->W = (array) $W; + + if ($nfds === false) { + return false; + } + if (!$nfds) { + if (!$wfds) { + echo "S"; + time_nanosleep($ts, $tu*1000); + } + call_user_func($this->run, $this->client); + } + + return true; + } + + function send() { + $this->wait(); + $this->once(); + } +} + + +include "helper/server.inc"; + +server("proxy.inc", function($port) { + $client = new http\Client; + $client->configure([ + "use_eventloop" => new UserHandler($client) + ]); + $client->enqueue(new http\Client\Request("GET", "http://localhost:$port/"), function($r) { + var_dump($r->getResponseCode()); + }); + $client->send(); +}); + +?> +===DONE=== +--EXPECTREGEX-- +Test +T[WST]*(O[WST]*)+U+int\(200\) +===DONE===