async listen/notify
authorMichael Wallner <mike@php.net>
Tue, 5 Feb 2013 20:59:10 +0000 (21:59 +0100)
committerMichael Wallner <mike@php.net>
Tue, 5 Feb 2013 20:59:10 +0000 (21:59 +0100)
src/php_pq.c

index bae80fcf245f4215cfaf0be8f59a620926d33265..4e7275a5a6adb31e8511c5a3c5fd93e87d78c3bd 100644 (file)
@@ -1905,6 +1905,51 @@ static PHP_METHOD(pqconn, listen) {
        }
 }
 
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen_async, 0, 0, 0)
+       ZEND_ARG_INFO(0, channel)
+       ZEND_ARG_INFO(0, callable)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, listenAsync) {
+       char *channel_str = NULL;
+       int channel_len = 0;
+       php_pq_callback_t listener;
+
+       if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &channel_str, &channel_len, &listener.fci, &listener.fcc)) {
+               php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+               obj->intern->poller = PQconsumeInput;
+
+               if (obj->intern) {
+                       char *quoted_channel = PQescapeIdentifier(obj->intern->conn, channel_str, channel_len);
+
+                       if (quoted_channel) {
+                               char *cmd;
+
+                               obj->intern->poller = PQconsumeInput;
+
+                               spprintf(&cmd, 0, "LISTEN %s", channel_str);
+                               if (PQsendQuery(obj->intern->conn, cmd)) {
+                                       php_pqconn_add_listener(obj, channel_str, channel_len, &listener TSRMLS_CC);
+                                       RETVAL_TRUE;
+                               } else {
+                                       php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not install listener (%s)", PHP_PQerrorMessage(obj->intern->conn));
+                                       RETVAL_FALSE;
+                               }
+
+                               efree(cmd);
+                               PQfreemem(quoted_channel);
+
+                               php_pqconn_notify_listeners(obj TSRMLS_CC);
+                       } else {
+                               php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not escape channel identifier (%s)", PHP_PQerrorMessage(obj->intern->conn));
+                       }
+               } else {
+                       php_error_docref(NULL TSRMLS_CC, E_WARNING, "pq\\Connection not initialized");
+                       RETVAL_FALSE;
+               }
+       }
+}
+
 ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_notify, 0, 0, 2)
        ZEND_ARG_INFO(0, channel)
        ZEND_ARG_INFO(0, message)
@@ -1943,6 +1988,38 @@ static PHP_METHOD(pqconn, notify) {
        }
 }
 
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_notify_async, 0, 0, 2)
+       ZEND_ARG_INFO(0, channel)
+       ZEND_ARG_INFO(0, message)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, notifyAsync) {
+       char *channel_str, *message_str;
+       int channel_len, message_len;
+
+       if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &channel_str, &channel_len, &message_str, &message_len)) {
+               php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+               if (obj->intern) {
+                       char *params[2] = {channel_str, message_str};
+
+                       obj->intern->poller = PQconsumeInput;
+
+                       if (PQsendQueryParams(obj->intern->conn, "select pg_notify($1, $2)", 2, NULL, (const char *const*) params, NULL, NULL, 0)) {
+                               RETVAL_TRUE;
+                       } else {
+                               php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not notify listeners (%s)", PHP_PQerrorMessage(obj->intern->conn));
+                               RETVAL_FALSE;
+                       }
+
+                       php_pqconn_notify_listeners(obj TSRMLS_CC);
+
+               } else {
+                       php_error_docref(NULL TSRMLS_CC, E_WARNING, "pq\\Connection not initialized");
+                       RETVAL_FALSE;
+               }
+       }
+}
+
 ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_poll, 0, 0, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, poll) {
@@ -2714,7 +2791,9 @@ static zend_function_entry php_pqconn_methods[] = {
        PHP_ME(pqconn, prepare, ai_pqconn_prepare, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, prepareAsync, ai_pqconn_prepare_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, listen, ai_pqconn_listen, ZEND_ACC_PUBLIC)
+       PHP_ME(pqconn, listenAsync, ai_pqconn_listen_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, notify, ai_pqconn_notify, ZEND_ACC_PUBLIC)
+       PHP_ME(pqconn, notifyAsync, ai_pqconn_notify_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, getResult, ai_pqconn_get_result, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, quote, ai_pqconn_quote, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, quoteName, ai_pqconn_quote_name, ZEND_ACC_PUBLIC)