From 0fe5cdcdf5b8b20c80ff592de3187f2200321d6a Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Tue, 5 Feb 2013 21:59:10 +0100 Subject: [PATCH] async listen/notify --- src/php_pq.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/src/php_pq.c b/src/php_pq.c index bae80fc..4e7275a 100644 --- a/src/php_pq.c +++ b/src/php_pq.c @@ -1905,6 +1905,51 @@ static PHP_METHOD(pqconn, listen) { } } +ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen_async, 0, 0, 0) + ZEND_ARG_INFO(0, channel) + ZEND_ARG_INFO(0, callable) +ZEND_END_ARG_INFO(); +static PHP_METHOD(pqconn, listenAsync) { + char *channel_str = NULL; + int channel_len = 0; + php_pq_callback_t listener; + + if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &channel_str, &channel_len, &listener.fci, &listener.fcc)) { + php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC); + + obj->intern->poller = PQconsumeInput; + + if (obj->intern) { + char *quoted_channel = PQescapeIdentifier(obj->intern->conn, channel_str, channel_len); + + if (quoted_channel) { + char *cmd; + + obj->intern->poller = PQconsumeInput; + + spprintf(&cmd, 0, "LISTEN %s", channel_str); + if (PQsendQuery(obj->intern->conn, cmd)) { + php_pqconn_add_listener(obj, channel_str, channel_len, &listener TSRMLS_CC); + RETVAL_TRUE; + } else { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not install listener (%s)", PHP_PQerrorMessage(obj->intern->conn)); + RETVAL_FALSE; + } + + efree(cmd); + PQfreemem(quoted_channel); + + php_pqconn_notify_listeners(obj TSRMLS_CC); + } else { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not escape channel identifier (%s)", PHP_PQerrorMessage(obj->intern->conn)); + } + } else { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "pq\\Connection not initialized"); + RETVAL_FALSE; + } + } +} + ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_notify, 0, 0, 2) ZEND_ARG_INFO(0, channel) ZEND_ARG_INFO(0, message) @@ -1943,6 +1988,38 @@ static PHP_METHOD(pqconn, notify) { } } +ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_notify_async, 0, 0, 2) + ZEND_ARG_INFO(0, channel) + ZEND_ARG_INFO(0, message) +ZEND_END_ARG_INFO(); +static PHP_METHOD(pqconn, notifyAsync) { + char *channel_str, *message_str; + int channel_len, message_len; + + if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &channel_str, &channel_len, &message_str, &message_len)) { + php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC); + + if (obj->intern) { + char *params[2] = {channel_str, message_str}; + + obj->intern->poller = PQconsumeInput; + + if (PQsendQueryParams(obj->intern->conn, "select pg_notify($1, $2)", 2, NULL, (const char *const*) params, NULL, NULL, 0)) { + RETVAL_TRUE; + } else { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not notify listeners (%s)", PHP_PQerrorMessage(obj->intern->conn)); + RETVAL_FALSE; + } + + php_pqconn_notify_listeners(obj TSRMLS_CC); + + } else { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "pq\\Connection not initialized"); + RETVAL_FALSE; + } + } +} + ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_poll, 0, 0, 0) ZEND_END_ARG_INFO(); static PHP_METHOD(pqconn, poll) { @@ -2714,7 +2791,9 @@ static zend_function_entry php_pqconn_methods[] = { PHP_ME(pqconn, prepare, ai_pqconn_prepare, ZEND_ACC_PUBLIC) PHP_ME(pqconn, prepareAsync, ai_pqconn_prepare_async, ZEND_ACC_PUBLIC) PHP_ME(pqconn, listen, ai_pqconn_listen, ZEND_ACC_PUBLIC) + PHP_ME(pqconn, listenAsync, ai_pqconn_listen_async, ZEND_ACC_PUBLIC) PHP_ME(pqconn, notify, ai_pqconn_notify, ZEND_ACC_PUBLIC) + PHP_ME(pqconn, notifyAsync, ai_pqconn_notify_async, ZEND_ACC_PUBLIC) PHP_ME(pqconn, getResult, ai_pqconn_get_result, ZEND_ACC_PUBLIC) PHP_ME(pqconn, quote, ai_pqconn_quote, ZEND_ACC_PUBLIC) PHP_ME(pqconn, quoteName, ai_pqconn_quote_name, ZEND_ACC_PUBLIC) -- 2.30.2