+static void listener_dtor(void *l) {
+ php_pqconn_listener_t *listener = l;
+
+ zend_fcall_info_args_clear(&listener->fci, 1);
+
+ zval_ptr_dtor(&listener->fci.function_name);
+ if (listener->fci.object_ptr) {
+ zval_ptr_dtor(&listener->fci.object_ptr);
+ }
+}
+
+static void php_pqconn_add_listener(php_pqconn_object_t *obj, const char *channel_str, size_t channel_len, php_pqconn_listener_t *listener TSRMLS_DC)
+{
+ HashTable ht, *existing_listeners;
+
+ Z_ADDREF_P(listener->fci.function_name);
+ if (listener->fci.object_ptr) {
+ Z_ADDREF_P(listener->fci.object_ptr);
+ }
+ if (SUCCESS == zend_hash_find(&obj->listeners, channel_str, channel_len + 1, (void *) &existing_listeners)) {
+ zend_hash_next_index_insert(existing_listeners, (void *) listener, sizeof(*listener), NULL);
+ } else {
+ zend_hash_init(&ht, 1, NULL, (dtor_func_t) listener_dtor, 0);
+ zend_hash_next_index_insert(&ht, (void *) listener, sizeof(*listener), NULL);
+ zend_hash_add(&obj->listeners, channel_str, channel_len + 1, (void *) &ht, sizeof(HashTable), NULL);
+ }
+}
+
+static STATUS php_pqres_success(PGresult *res TSRMLS_DC)
+{
+ switch (PQresultStatus(res)) {
+ case PGRES_BAD_RESPONSE:
+ case PGRES_NONFATAL_ERROR:
+ case PGRES_FATAL_ERROR:
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "%s", PQresultErrorMessage(res));
+ return FAILURE;
+ default:
+ return SUCCESS;
+ }
+}
+
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen, 0, 0, 0)
+ ZEND_ARG_INFO(0, channel)
+ ZEND_ARG_INFO(0, callable)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, listen) {
+ char *channel_str = NULL;
+ int channel_len = 0;
+ php_pqconn_listener_t listener;
+
+ if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &channel_str, &channel_len, &listener.fci, &listener.fcc)) {
+ php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+ obj->poller = PQconsumeInput;
+
+ if (obj->conn) {
+ PGresult *res;
+ char cmd[1024];
+
+ slprintf(cmd, sizeof(cmd), "LISTEN %s", channel_str);
+ res = PQexec(obj->conn, cmd);
+
+ if (res) {
+ if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
+ php_pqconn_add_listener(obj, channel_str, channel_len, &listener TSRMLS_CC);
+ RETVAL_TRUE;
+ } else {
+ RETVAL_FALSE;
+ }
+ PQclear(res);
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not install listener: %s", PQerrorMessage(obj->conn));
+ RETVAL_FALSE;
+ }
+
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
+
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Connection not initialized");
+ RETVAL_FALSE;
+ }
+ }
+
+}
+
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_notify, 0, 0, 2)
+ ZEND_ARG_INFO(0, channel)
+ ZEND_ARG_INFO(0, message)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, notify) {
+ char *channel_str, *message_str;
+ int channel_len, message_len;
+
+ if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &channel_str, &channel_len, &message_str, &message_len)) {
+ php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+ if (obj->conn) {
+ PGresult *res;
+ char *params[2] = {channel_str, message_str};
+
+ res = PQexecParams(obj->conn, "select pg_notify($1, $2)", 2, NULL, (const char *const*) params, NULL, NULL, 0);
+
+ if (res) {
+ if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
+ RETVAL_TRUE;
+ } else {
+ RETVAL_FALSE;
+ }
+ PQclear(res);
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not notify listeners: %s", PQerrorMessage(obj->conn));
+ RETVAL_FALSE;
+ }
+
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
+
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Connection not initialized");
+ RETVAL_FALSE;
+ }
+ }
+}
+