From: Michael Wallner Date: Mon, 14 Jul 2014 16:02:57 +0000 (+0200) Subject: unlisten support X-Git-Tag: v0.5.0~40 X-Git-Url: https://git.m6w6.name/?p=m6w6%2Fext-pq;a=commitdiff_plain;h=7e633cc0ddf139abbfc591b1e27d0bf9f17bebbf unlisten support --- diff --git a/TODO b/TODO index cdacee6..b05f093 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,6 @@ * COPY: getAsync & putAsync * CURSOR: *Async() * fetchInto/fetchCtor? -* unlisten? * unregister event handler? * binary protocol? * parse explicit array dimension information in front of arrays diff --git a/src/php_pqconn.c b/src/php_pqconn.c index 66b5d9f..916acdc 100644 --- a/src/php_pqconn.c +++ b/src/php_pqconn.c @@ -428,24 +428,36 @@ static void php_pqconn_wakeup(php_persistent_handle_factory_t *f, void **handle // FIXME: ping server } -static int apply_unlisten(void *p TSRMLS_DC, int argc, va_list argv, zend_hash_key *key) +static inline PGresult *unlisten(php_pqconn_t *conn, const char *channel_str, size_t channel_len TSRMLS_DC) { - php_pqconn_object_t *obj = va_arg(argv, php_pqconn_object_t *); - char *quoted_channel = PQescapeIdentifier(obj->intern->conn, key->arKey, key->nKeyLength - 1); + char *quoted_channel = PQescapeIdentifier(conn, channel_str, channel_len); + PGresult *res = NULL; if (quoted_channel) { - PGresult *res; - char *cmd; + smart_str cmd = {0}; - spprintf(&cmd, 0, "UNLISTEN %s", quoted_channel); - if ((res = PQexec(obj->intern->conn, cmd))) { - PHP_PQclear(res); - } + smart_str_appends(&cmd, "UNLISTEN "); + smart_str_appends(&cmd, quoted_channel); + smart_str_0(&cmd); + + res = PQexec(conn, cmd); - efree(cmd); + smart_str_free(&cmd); PQfreemem(quoted_channel); } + return res; +} + +static int apply_unlisten(void *p TSRMLS_DC, int argc, va_list argv, zend_hash_key *key) +{ + php_pqconn_object_t *obj = va_arg(argv, php_pqconn_object_t *); + PGresult *res = unlisten(obj->intern->conn, key->arKey, key->nKeyLength - 1); + + if (res) { + PHP_PQclear(res); + } + return ZEND_HASH_APPLY_REMOVE; } @@ -604,6 +616,81 @@ static PHP_METHOD(pqconn, resetAsync) { } } +ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_unlisten, 0, 0, 1) + ZEND_ARG_INFO(0, channel) +ZEND_END_ARG_INFO(); +static PHP_METHOD(pqconn, unlisten) +{ + zend_error_handling zeh; + char *channel_str; + int channel_len; + STATUS rv; + + zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC); + rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &channel_str, &channel_len); + zend_restore_error_handling(&zeh TSRMLS_CC); + + if (SUCCESS == rv) { + php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC); + + if (!obj->intern) { + throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized"); + } else if (SUCCESS == zend_hash_del(&obj->intern->listeners, channel_str, channel_len + 1)) { + PGresult *res = unlisten(obj->intern->conn, channel_str, channel_len); + + if (res) { + php_pqres_success(res TSRMLS_CC); + PHP_PQclear(res); + } + } + } +} + +ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_unlisten_async, 0, 0, 1) + ZEND_ARG_INFO(0, channel) +ZEND_END_ARG_INFO(); +static PHP_METHOD(pqconn, unlistenAsync) { + zend_error_handling zeh; + char *channel_str; + int channel_len; + STATUS rv; + + zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC); + rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &channel_str, &channel_len); + zend_restore_error_handling(&zeh TSRMLS_CC); + + if (SUCCESS == rv) { + php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC); + + if (!obj->intern) { + throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized"); + } else { + char *quoted_channel = PQescapeIdentifier(obj->intern->conn, channel_str, channel_len); + + if (!quoted_channel) { + throw_exce(EX_ESCAPE TSRMLS_CC, "Failed to escape channel identifier (%s)", PHP_PQerrorMessage(obj->intern->conn)); + } else { + smart_str cmd = {0}; + + smart_str_appends(&cmd, "UNLISTEN "); + smart_str_appends(&cmd, quoted_channel); + smart_str_0(&cmd); + + if (!PQsendQuery(obj->intern->conn, cmd.c)) { + throw_exce(EX_IO TSRMLS_CC, "Failed to uninstall listener (%s)", PHP_PQerrorMessage(obj->intern->conn)); + } else { + obj->intern->poller = PQconsumeInput; + zend_hash_del(&obj->intern->listeners, channel_str, channel_len + 1); + } + + smart_str_free(&cmd); + PQfreemem(quoted_channel); + php_pqconn_notify_listeners(obj TSRMLS_CC); + } + } + } +} + static void php_pqconn_add_listener(php_pqconn_object_t *obj, const char *channel_str, size_t channel_len, php_pq_callback_t *listener TSRMLS_DC) { HashTable ht, *existing_listeners; @@ -619,7 +706,7 @@ static void php_pqconn_add_listener(php_pqconn_object_t *obj, const char *channe } } -ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen, 0, 0, 0) +ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen, 0, 0, 2) ZEND_ARG_INFO(0, channel) ZEND_ARG_INFO(0, callable) ZEND_END_ARG_INFO(); @@ -1657,6 +1744,8 @@ static zend_function_entry php_pqconn_methods[] = { PHP_ME(pqconn, prepareAsync, ai_pqconn_prepare_async, ZEND_ACC_PUBLIC) PHP_ME(pqconn, declare, ai_pqconn_declare, ZEND_ACC_PUBLIC) PHP_ME(pqconn, declareAsync, ai_pqconn_declare_async, ZEND_ACC_PUBLIC) + PHP_ME(pqconn, unlisten, ai_pqconn_unlisten, ZEND_ACC_PUBLIC) + PHP_ME(pqconn, unlistenAsync, ai_pqconn_unlisten_async, ZEND_ACC_PUBLIC) PHP_ME(pqconn, listen, ai_pqconn_listen, ZEND_ACC_PUBLIC) PHP_ME(pqconn, listenAsync, ai_pqconn_listen_async, ZEND_ACC_PUBLIC) PHP_ME(pqconn, notify, ai_pqconn_notify, ZEND_ACC_PUBLIC) diff --git a/tests/notify001.phpt b/tests/notify001.phpt index 3f596a3..bb2b7c2 100644 --- a/tests/notify001.phpt +++ b/tests/notify001.phpt @@ -22,7 +22,7 @@ $producer->notify("test", "this is an async test"); $r = array($consumer->socket); $w = null; $e = null; -var_dump(stream_select($r, $w, $e, NULL)); +var_dump(stream_select($r, $w, $e, 0)); $consumer->poll(); $producer->notify("other", "this should not show up"); @@ -34,7 +34,16 @@ $producer->notify("test", "just to be sure"); $r = array($consumer->socket); $w = null; $e = null; -var_dump(stream_select($r, $w, $e, NULL)); +var_dump(stream_select($r, $w, $e, 0)); +$consumer->poll(); + +$consumer->unlisten("test"); + +$producer->notify("test", "this shouldn't show up either"); + +$r = array($consumer->socket); +$w = null; $e = null; +var_dump(stream_select($r, $w, $e, 0)); $consumer->poll(); ?> @@ -47,4 +56,5 @@ test(%d): this is an async test int(0) int(1) test(%d): just to be sure +int(0) DONE