unlisten support
authorMichael Wallner <mike@php.net>
Mon, 14 Jul 2014 16:02:57 +0000 (18:02 +0200)
committerMichael Wallner <mike@php.net>
Mon, 14 Jul 2014 16:02:57 +0000 (18:02 +0200)
TODO
src/php_pqconn.c
tests/notify001.phpt

diff --git a/TODO b/TODO
index cdacee6daa20012115f1c129eea93e6d60db1fd8..b05f093401d4752a625ec9a3253b388621207477 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,7 +1,6 @@
 * COPY: getAsync & putAsync
 * CURSOR: *Async()
 * fetchInto/fetchCtor?
-* unlisten?
 * unregister event handler?
 * binary protocol?
 * parse explicit array dimension information in front of arrays
index 66b5d9f63e106eda534ed1cd41acf39c33c4c2c0..916acdcbd3232345e26e1fd12c378a8458a4b19d 100644 (file)
@@ -428,24 +428,36 @@ static void php_pqconn_wakeup(php_persistent_handle_factory_t *f, void **handle
        // FIXME: ping server
 }
 
-static int apply_unlisten(void *p TSRMLS_DC, int argc, va_list argv, zend_hash_key *key)
+static inline PGresult *unlisten(php_pqconn_t *conn, const char *channel_str, size_t channel_len TSRMLS_DC)
 {
-       php_pqconn_object_t *obj = va_arg(argv, php_pqconn_object_t *);
-       char *quoted_channel = PQescapeIdentifier(obj->intern->conn, key->arKey, key->nKeyLength - 1);
+       char *quoted_channel = PQescapeIdentifier(conn, channel_str, channel_len);
+       PGresult *res = NULL;
 
        if (quoted_channel) {
-               PGresult *res;
-               char *cmd;
+               smart_str cmd = {0};
 
-               spprintf(&cmd, 0, "UNLISTEN %s", quoted_channel);
-               if ((res = PQexec(obj->intern->conn, cmd))) {
-                       PHP_PQclear(res);
-               }
+               smart_str_appends(&cmd, "UNLISTEN ");
+               smart_str_appends(&cmd, quoted_channel);
+               smart_str_0(&cmd);
+
+               res = PQexec(conn, cmd);
 
-               efree(cmd);
+               smart_str_free(&cmd);
                PQfreemem(quoted_channel);
        }
 
+       return res;
+}
+
+static int apply_unlisten(void *p TSRMLS_DC, int argc, va_list argv, zend_hash_key *key)
+{
+       php_pqconn_object_t *obj = va_arg(argv, php_pqconn_object_t *);
+       PGresult *res = unlisten(obj->intern->conn, key->arKey, key->nKeyLength - 1);
+
+       if (res) {
+               PHP_PQclear(res);
+       }
+
        return ZEND_HASH_APPLY_REMOVE;
 }
 
@@ -604,6 +616,81 @@ static PHP_METHOD(pqconn, resetAsync) {
        }
 }
 
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_unlisten, 0, 0, 1)
+       ZEND_ARG_INFO(0, channel)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, unlisten)
+{
+       zend_error_handling zeh;
+       char *channel_str;
+       int channel_len;
+       STATUS rv;
+
+       zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
+       rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &channel_str, &channel_len);
+       zend_restore_error_handling(&zeh TSRMLS_CC);
+
+       if (SUCCESS == rv) {
+               php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+               if (!obj->intern) {
+                       throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
+               } else if (SUCCESS == zend_hash_del(&obj->intern->listeners, channel_str, channel_len + 1)) {
+                       PGresult *res = unlisten(obj->intern->conn, channel_str, channel_len);
+
+                       if (res) {
+                               php_pqres_success(res TSRMLS_CC);
+                               PHP_PQclear(res);
+                       }
+               }
+       }
+}
+
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_unlisten_async, 0, 0, 1)
+       ZEND_ARG_INFO(0, channel)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, unlistenAsync) {
+       zend_error_handling zeh;
+       char *channel_str;
+       int channel_len;
+       STATUS rv;
+
+       zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
+       rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &channel_str, &channel_len);
+       zend_restore_error_handling(&zeh TSRMLS_CC);
+
+       if (SUCCESS == rv) {
+               php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+               if (!obj->intern) {
+                       throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
+               } else {
+                       char *quoted_channel = PQescapeIdentifier(obj->intern->conn, channel_str, channel_len);
+
+                       if (!quoted_channel) {
+                               throw_exce(EX_ESCAPE TSRMLS_CC, "Failed to escape channel identifier (%s)", PHP_PQerrorMessage(obj->intern->conn));
+                       } else {
+                               smart_str cmd = {0};
+
+                               smart_str_appends(&cmd, "UNLISTEN ");
+                               smart_str_appends(&cmd, quoted_channel);
+                               smart_str_0(&cmd);
+
+                               if (!PQsendQuery(obj->intern->conn, cmd.c)) {
+                                       throw_exce(EX_IO TSRMLS_CC, "Failed to uninstall listener (%s)", PHP_PQerrorMessage(obj->intern->conn));
+                               } else {
+                                       obj->intern->poller = PQconsumeInput;
+                                       zend_hash_del(&obj->intern->listeners, channel_str, channel_len + 1);
+                               }
+
+                               smart_str_free(&cmd);
+                               PQfreemem(quoted_channel);
+                               php_pqconn_notify_listeners(obj TSRMLS_CC);
+                       }
+               }
+       }
+}
+
 static void php_pqconn_add_listener(php_pqconn_object_t *obj, const char *channel_str, size_t channel_len, php_pq_callback_t *listener TSRMLS_DC)
 {
        HashTable ht, *existing_listeners;
@@ -619,7 +706,7 @@ static void php_pqconn_add_listener(php_pqconn_object_t *obj, const char *channe
        }
 }
 
-ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen, 0, 0, 0)
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen, 0, 0, 2)
        ZEND_ARG_INFO(0, channel)
        ZEND_ARG_INFO(0, callable)
 ZEND_END_ARG_INFO();
@@ -1657,6 +1744,8 @@ static zend_function_entry php_pqconn_methods[] = {
        PHP_ME(pqconn, prepareAsync, ai_pqconn_prepare_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, declare, ai_pqconn_declare, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, declareAsync, ai_pqconn_declare_async, ZEND_ACC_PUBLIC)
+       PHP_ME(pqconn, unlisten, ai_pqconn_unlisten, ZEND_ACC_PUBLIC)
+       PHP_ME(pqconn, unlistenAsync, ai_pqconn_unlisten_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, listen, ai_pqconn_listen, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, listenAsync, ai_pqconn_listen_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, notify, ai_pqconn_notify, ZEND_ACC_PUBLIC)
index 3f596a314465606acd3018e781ed4331cc682590..bb2b7c2ea5cb4bd754423d38120c29ee74b0f9da 100644 (file)
@@ -22,7 +22,7 @@ $producer->notify("test", "this is an async test");
 
 $r = array($consumer->socket);
 $w = null; $e = null;
-var_dump(stream_select($r, $w, $e, NULL));
+var_dump(stream_select($r, $w, $e, 0));
 $consumer->poll();
 
 $producer->notify("other", "this should not show up");
@@ -34,7 +34,16 @@ $producer->notify("test", "just to be sure");
 
 $r = array($consumer->socket);
 $w = null; $e = null;
-var_dump(stream_select($r, $w, $e, NULL));
+var_dump(stream_select($r, $w, $e, 0));
+$consumer->poll();
+
+$consumer->unlisten("test");
+
+$producer->notify("test", "this shouldn't show up either");
+
+$r = array($consumer->socket);
+$w = null; $e = null;
+var_dump(stream_select($r, $w, $e, 0));
 $consumer->poll();
 
 ?>
@@ -47,4 +56,5 @@ test(%d): this is an async test
 int(0)
 int(1)
 test(%d): just to be sure
+int(0)
 DONE