/* $Id$ */
#define HTTP_WANT_CURL
+#define HTTP_WANT_EVENT
#include "php_http.h"
#if defined(ZEND_ENGINE_2) && defined(HTTP_HAVE_CURL)
# define HTTP_DEBUG_REQPOOLS 0
#endif
+#ifdef HTTP_HAVE_EVENT
+typedef struct _http_request_pool_event_t {
+ struct event evnt;
+ http_request_pool *pool;
+} http_request_pool_event;
+
+static int http_request_pool_socket_callback(CURL *easy, curl_socket_t s, int action, void *, void *);
+#endif
+
static int http_request_pool_compare_handles(void *h1, void *h2);
PHP_MINIT_FUNCTION(http_request_pool)
return SUCCESS;
}
+#ifdef HTTP_HAVE_EVENT
+PHP_RINIT_FUNCTION(http_request_pool)
+{
+ if (!HTTP_G->request.pool.event.base && !(HTTP_G->request.pool.event.base = event_init())) {
+ return FAILURE;
+ }
+
+ return SUCCESS;
+}
+#endif
+
/* {{{ http_request_pool *http_request_pool_init(http_request_pool *) */
PHP_HTTP_API http_request_pool *_http_request_pool_init(http_request_pool *pool TSRMLS_DC)
{
}
return NULL;
}
-
+
+ TSRMLS_SET_CTX(pool->tsrm_ls);
+
+#if HTTP_HAVE_EVENT
+ curl_multi_setopt(pool->ch, CURLMOPT_SOCKETDATA, pool);
+ curl_multi_setopt(pool->ch, CURLMOPT_SOCKETFUNCTION, http_request_pool_socket_callback);
+#endif
+
pool->unfinished = 0;
zend_llist_init(&pool->finished, sizeof(zval *), (llist_dtor_func_t) ZVAL_PTR_DTOR, 0);
zend_llist_init(&pool->handles, sizeof(zval *), (llist_dtor_func_t) ZVAL_PTR_DTOR, 0);
/* }}} */
/* {{{ STATUS http_request_pool_attach(http_request_pool *, zval *) */
-PHP_HTTP_API STATUS _http_request_pool_attach(http_request_pool *pool, zval *request TSRMLS_DC)
+PHP_HTTP_API STATUS _http_request_pool_attach(http_request_pool *pool, zval *request)
{
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
getObjectEx(http_request_object, req, request);
#if HTTP_DEBUG_REQPOOLS
/* }}} */
/* {{{ STATUS http_request_pool_detach(http_request_pool *, zval *) */
-PHP_HTTP_API STATUS _http_request_pool_detach(http_request_pool *pool, zval *request TSRMLS_DC)
+PHP_HTTP_API STATUS _http_request_pool_detach(http_request_pool *pool, zval *request)
{
CURLMcode code;
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
getObjectEx(http_request_object, req, request);
#if HTTP_DEBUG_REQPOOLS
/* }}} */
/* {{{ void http_request_pool_apply(http_request_pool *, http_request_pool_apply_func) */
-PHP_HTTP_API void _http_request_pool_apply(http_request_pool *pool, http_request_pool_apply_func cb TSRMLS_DC)
+PHP_HTTP_API void _http_request_pool_apply(http_request_pool *pool, http_request_pool_apply_func cb)
{
int count = zend_llist_count(&pool->handles);
}
for (i = 0; i < count; ++i) {
- if (cb(pool, handles[i] TSRMLS_CC)) {
+ if (cb(pool, handles[i])) {
break;
}
}
/* }}} */
/* {{{ void http_request_pool_apply_with_arg(http_request_pool *, http_request_pool_apply_with_arg_func, void *) */
-PHP_HTTP_API void _http_request_pool_apply_with_arg(http_request_pool *pool, http_request_pool_apply_with_arg_func cb, void *arg TSRMLS_DC)
+PHP_HTTP_API void _http_request_pool_apply_with_arg(http_request_pool *pool, http_request_pool_apply_with_arg_func cb, void *arg)
{
int count = zend_llist_count(&pool->handles);
}
for (i = 0; i < count; ++i) {
- if (cb(pool, handles[i], arg TSRMLS_CC)) {
+ if (cb(pool, handles[i], arg)) {
break;
}
}
/* }}} */
/* {{{ void http_request_pool_detach_all(http_request_pool *) */
-PHP_HTTP_API void _http_request_pool_detach_all(http_request_pool *pool TSRMLS_DC)
+PHP_HTTP_API void _http_request_pool_detach_all(http_request_pool *pool)
{
#if HTTP_DEBUG_REQPOOLS
fprintf(stderr, "Detaching %d requests from pool %p\n", zend_llist_count(&pool->handles), pool);
/* }}} */
/* {{{ STATUS http_request_pool_send(http_request_pool *) */
-PHP_HTTP_API STATUS _http_request_pool_send(http_request_pool *pool TSRMLS_DC)
+PHP_HTTP_API STATUS _http_request_pool_send(http_request_pool *pool)
{
-#if HTTP_DEBUG_REQPOOLS
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
+#ifdef HTTP_HAVE_EVENT
+ CURLMcode rc;
+
+ do {
+ rc = curl_multi_socket_all(pool->ch, &pool->unfinished);
+ } while (CURLM_CALL_MULTI_PERFORM == rc);
+
+ if (CURLM_OK != rc) {
+ http_error(HE_WARNING, HTTP_E_SOCKET, curl_multi_strerror(rc));
+ return FAILURE;
+ }
+
+ event_base_dispatch(HTTP_G->request.pool.event.base);
+
+ return SUCCESS;
+#else
+
+# if HTTP_DEBUG_REQPOOLS
fprintf(stderr, "Attempt to send %d requests of pool %p\n", zend_llist_count(&pool->handles), pool);
-#endif
+# endif
while (http_request_pool_perform(pool)) {
if (SUCCESS != http_request_pool_select(pool)) {
-#ifdef PHP_WIN32
+# ifdef PHP_WIN32
/* see http://msdn.microsoft.com/library/en-us/winsock/winsock/windows_sockets_error_codes_2.asp */
http_error_ex(HE_WARNING, HTTP_E_SOCKET, "WinSock error: %d", WSAGetLastError());
-#else
+# else
http_error(HE_WARNING, HTTP_E_SOCKET, strerror(errno));
-#endif
+# endif
return FAILURE;
}
}
-#if HTTP_DEBUG_REQPOOLS
+# if HTTP_DEBUG_REQPOOLS
fprintf(stderr, "Finished sending %d HttpRequests of pool %p (still unfinished: %d)\n", zend_llist_count(&pool->handles), pool, pool->unfinished);
-#endif
+# endif
return SUCCESS;
+#endif
}
/* }}} */
/* {{{ void http_request_pool_dtor(http_request_pool *) */
-PHP_HTTP_API void _http_request_pool_dtor(http_request_pool *pool TSRMLS_DC)
+PHP_HTTP_API void _http_request_pool_dtor(http_request_pool *pool)
{
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
+
#if HTTP_DEBUG_REQPOOLS
fprintf(stderr, "Destructing request pool %p\n", pool);
#endif
/* {{{ STATUS http_request_pool_select(http_request_pool *) */
PHP_HTTP_API STATUS _http_request_pool_select(http_request_pool *pool)
{
+#ifdef HTTP_HAVE_EVENT
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
+ http_error(HE_WARNING, HTTP_E_RUNTIME, "not implemented");
+ return FAILURE;
+#else
int MAX;
fd_set R, W, E;
struct timeval timeout = {1, 0};
-#ifdef HAVE_CURL_MULTI_TIMEOUT
+# ifdef HAVE_CURL_MULTI_TIMEOUT
long max_tout = 1000;
if ((CURLM_OK == curl_multi_timeout(pool->ch, &max_tout)) && (max_tout != -1)) {
timeout.tv_sec = max_tout / 1000;
timeout.tv_usec = (max_tout % 1000) * 1000;
}
-#endif
+# endif
FD_ZERO(&R);
FD_ZERO(&W);
}
}
return FAILURE;
+#endif
}
/* }}} */
/* {{{ int http_request_pool_perform(http_request_pool *) */
-PHP_HTTP_API int _http_request_pool_perform(http_request_pool *pool TSRMLS_DC)
+PHP_HTTP_API int _http_request_pool_perform(http_request_pool *pool)
{
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
+#ifdef HTTP_HAVE_EVENT
+ http_error(HE_WARNING, HTTP_E_RUNTIME, "not implemented");
+ return FAILURE;
+#else
CURLMsg *msg;
int remaining = 0;
}
return pool->unfinished;
+#endif
}
/* }}} */
/* {{{ void http_request_pool_responsehandler(http_request_pool *, zval *, void *) */
-int _http_request_pool_responsehandler(http_request_pool *pool, zval *req, void *ch TSRMLS_DC)
+int _http_request_pool_responsehandler(http_request_pool *pool, zval *req, void *ch)
{
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
getObjectEx(http_request_object, obj, req);
if ((!ch) || obj->request->ch == (CURL *) ch) {
}
/* }}} */
+#ifdef HTTP_HAVE_EVENT
+static void http_request_pool_event_callback(int socket, short action, void *event_data)
+{
+ CURLMcode rc;
+ CURLMsg *msg;
+ int remaining;
+ http_request_pool_event *ev = event_data;
+ http_request_pool *pool = ev->pool;
+ TSRMLS_FETCH_FROM_CTX(ev->pool->tsrm_ls);
+
+#if HTTP_DEBUG_REQPOOLS
+ {
+ static const char event_strings[][20] = {"TIMEOUT","READ","TIMEOUT|READ","WRITE","TIMEOUT|WRITE","READ|WRITE","TIMEOUT|READ|WRITE","SIGNAL"};
+ fprintf(stderr, "Event on socket %d (%s) event %p of pool %p\n", socket, event_strings[action], ev, pool);
+ }
+#endif
+
+ do {
+#ifdef HAVE_CURL_MULTI_SOCKET_ACTION
+ switch (action & (EV_READ|EV_WRITE)) {
+ case EV_READ:
+ rc = curl_multi_socket_action(pool->ch, socket, CURL_CSELECT_IN, &pool->unfinished);
+ break;
+ case EV_WRITE:
+ rc = curl_multi_socket_action(pool->ch, socket, CURL_CSELECT_OUT, &pool->unfinished);
+ break;
+ case EV_READ|EV_WRITE:
+ rc = curl_multi_socket_action(pool->chm socket, CURL_CSELECT_IN|CURL_CSELECT_OUT, &pool->unfinished);
+ break;
+ default:
+ http_error(HE_WARNING, HTTP_E_SOCKET, "Unknown event %d", (int) action);
+ return;
+ }
+#else
+ rc = curl_multi_socket(pool->ch, socket, &pool->unfinished);
+#endif
+ } while (CURLM_CALL_MULTI_PERFORM == rc);
+
+ /* don't use 'ev' below here, as it might 've been freed in the socket callback */
+
+ if (CURLM_OK != rc) {
+ http_error(HE_WARNING, HTTP_E_SOCKET, curl_multi_strerror(rc));
+ }
+
+ while ((msg = curl_multi_info_read(pool->ch, &remaining))) {
+ if (CURLMSG_DONE == msg->msg) {
+ if (CURLE_OK != msg->data.result) {
+ http_request *r = NULL;
+ curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &r);
+ http_error_ex(HE_WARNING, HTTP_E_REQUEST, "%s; %s (%s)", curl_easy_strerror(msg->data.result), r?r->_error:"", r?r->url:"");
+ }
+ http_request_pool_apply_with_arg(pool, _http_request_pool_responsehandler, msg->easy_handle);
+ }
+ }
+}
+
+static int http_request_pool_socket_callback(CURL *easy, curl_socket_t sock, int action, void *socket_data, void *assign_data)
+{
+ int events = EV_PERSIST;
+ http_request_pool *pool = socket_data;
+ http_request_pool_event *ev = assign_data;
+ TSRMLS_FETCH_FROM_CTX(pool->tsrm_ls);
+
+ if (!ev) {
+ ev = ecalloc(1, sizeof(http_request_pool_event));
+ ev->pool = pool;
+ curl_multi_assign(pool->ch, sock, ev);
+ } else {
+ event_del(&ev->evnt);
+ }
+
+#if HTTP_DEBUG_REQPOOLS
+ {
+ static const char action_strings[][8] = {"NONE", "IN", "OUT", "INOUT", "REMOVE"};
+ fprintf(stderr, "Callback on socket %d (%s) event %p of pool %p\n", (int) sock, action_strings[action], ev, pool);
+ }
+#endif
+
+ switch (action) {
+ case CURL_POLL_IN:
+ events |= EV_READ;
+ break;
+ case CURL_POLL_OUT:
+ events |= EV_WRITE;
+ break;
+ case CURL_POLL_INOUT:
+ events |= EV_READ|EV_WRITE;
+ break;
+
+ case CURL_POLL_REMOVE:
+ efree(ev);
+ case CURL_POLL_NONE:
+ return 0;
+
+ default:
+ http_error_ex(HE_WARNING, HTTP_E_SOCKET, "Unknown socket action %d", action);
+ return -1;
+ }
+
+ event_set(&ev->evnt, sock, events, http_request_pool_event_callback, ev);
+ event_base_set(HTTP_G->request.pool.event.base, &ev->evnt);
+ event_add(&ev->evnt, NULL);
+
+ return 0;
+}
+#endif /* HTTP_HAVE_EVENT */
+
#endif /* ZEND_ENGINE_2 && HTTP_HAVE_CURL */
zend_llist finished;
zend_llist handles;
int unfinished;
+#ifdef ZTS
+ void ***tsrm_ls;
+#endif
} http_request_pool;
-typedef int (*http_request_pool_apply_func)(http_request_pool *pool, zval *request TSRMLS_DC);
-typedef int (*http_request_pool_apply_with_arg_func)(http_request_pool *pool, zval *request, void *arg TSRMLS_DC);
+typedef int (*http_request_pool_apply_func)(http_request_pool *pool, zval *request);
+typedef int (*http_request_pool_apply_with_arg_func)(http_request_pool *pool, zval *request, void *arg);
PHP_MINIT_FUNCTION(http_request_pool);
+#ifdef HTTP_HAVE_EVENT
+PHP_RINIT_FUNCTION(http_request_pool);
+#endif
-#define http_request_pool_responsehandler(p, r, c) _http_request_pool_responsehandler((p), (r), (c) TSRMLS_CC)
-extern int _http_request_pool_responsehandler(http_request_pool *pool, zval *req, void *ch TSRMLS_DC);
+#define http_request_pool_responsehandler(p, r, c) _http_request_pool_responsehandler((p), (r), (c))
+extern int _http_request_pool_responsehandler(http_request_pool *pool, zval *req, void *ch);
#define http_request_pool_init(p) _http_request_pool_init((p) TSRMLS_CC)
PHP_HTTP_API http_request_pool *_http_request_pool_init(http_request_pool *pool TSRMLS_DC);
-#define http_request_pool_attach(p, r) _http_request_pool_attach((p), (r) TSRMLS_CC)
-PHP_HTTP_API STATUS _http_request_pool_attach(http_request_pool *pool, zval *request TSRMLS_DC);
+#define http_request_pool_attach(p, r) _http_request_pool_attach((p), (r))
+PHP_HTTP_API STATUS _http_request_pool_attach(http_request_pool *pool, zval *request);
-#define http_request_pool_detach(p, r) _http_request_pool_detach((p), (r) TSRMLS_CC)
-PHP_HTTP_API STATUS _http_request_pool_detach(http_request_pool *pool, zval *request TSRMLS_DC);
+#define http_request_pool_detach(p, r) _http_request_pool_detach((p), (r))
+PHP_HTTP_API STATUS _http_request_pool_detach(http_request_pool *pool, zval *request);
-#define http_request_pool_apply(p, f) _http_request_pool_apply((p), (f) TSRMLS_CC)
-PHP_HTTP_API void _http_request_pool_apply(http_request_pool *pool, http_request_pool_apply_func cb TSRMLS_DC);
+#define http_request_pool_apply(p, f) _http_request_pool_apply((p), (f))
+PHP_HTTP_API void _http_request_pool_apply(http_request_pool *pool, http_request_pool_apply_func cb);
-#define http_request_pool_apply_with_arg(p, f, a) _http_request_pool_apply_with_arg((p), (f), (a) TSRMLS_CC)
-PHP_HTTP_API void _http_request_pool_apply_with_arg(http_request_pool *pool, http_request_pool_apply_with_arg_func cb, void *arg TSRMLS_DC);
+#define http_request_pool_apply_with_arg(p, f, a) _http_request_pool_apply_with_arg((p), (f), (a))
+PHP_HTTP_API void _http_request_pool_apply_with_arg(http_request_pool *pool, http_request_pool_apply_with_arg_func cb, void *arg);
-#define http_request_pool_detach_all(p) _http_request_pool_detach_all((p) TSRMLS_CC)
-PHP_HTTP_API void _http_request_pool_detach_all(http_request_pool *pool TSRMLS_DC);
+#define http_request_pool_detach_all(p) _http_request_pool_detach_all((p))
+PHP_HTTP_API void _http_request_pool_detach_all(http_request_pool *pool);
-#define http_request_pool_send(p) _http_request_pool_send((p) TSRMLS_CC)
-PHP_HTTP_API STATUS _http_request_pool_send(http_request_pool *pool TSRMLS_DC);
+#define http_request_pool_send(p) _http_request_pool_send((p))
+PHP_HTTP_API STATUS _http_request_pool_send(http_request_pool *pool);
#define http_request_pool_select _http_request_pool_select
PHP_HTTP_API STATUS _http_request_pool_select(http_request_pool *pool);
-#define http_request_pool_perform(p) _http_request_pool_perform((p) TSRMLS_CC)
-PHP_HTTP_API int _http_request_pool_perform(http_request_pool *pool TSRMLS_DC);
+#define http_request_pool_perform(p) _http_request_pool_perform((p))
+PHP_HTTP_API int _http_request_pool_perform(http_request_pool *pool);
-#define http_request_pool_dtor(p) _http_request_pool_dtor((p) TSRMLS_CC)
-PHP_HTTP_API void _http_request_pool_dtor(http_request_pool *pool TSRMLS_DC);
+#define http_request_pool_dtor(p) _http_request_pool_dtor((p))
+PHP_HTTP_API void _http_request_pool_dtor(http_request_pool *pool);
#endif
#endif