static zend_object_handlers php_pqres_object_handlers;
static zend_object_handlers php_pqstm_object_handlers;
+typedef struct php_pqconn_listener {
+ zend_fcall_info fci;
+ zend_fcall_info_cache fcc;
+} php_pqconn_listener_t;
+
typedef struct php_pqconn_object {
zend_object zo;
PGconn *conn;
int (*poller)(PGconn *);
+ HashTable listeners;
unsigned async:1;
} php_pqconn_object_t;
php_pqres_iterator_t *iter;
} php_pqres_object_t;
-typedef struct php_pqstm_intern {
- char *name;
- zval *conn;
-} php_pqstm_intern_t;
-
typedef struct php_pqstm_object {
zend_object zo;
char *name;
PQfinish(obj->conn);
obj->conn = NULL;
}
+ zend_hash_destroy(&obj->listeners);
zend_object_std_dtor((zend_object *) o TSRMLS_CC);
efree(obj);
}
o->async = !PQisnonblocking(o->conn);
}
+ zend_hash_init(&o->listeners, 0, NULL, (dtor_func_t) zend_hash_destroy, 0);
+
ov.handle = zend_objects_store_put((zend_object *) o, NULL, php_pqconn_object_free, NULL TSRMLS_CC);
ov.handlers = &php_pqconn_object_handlers;
static HashTable php_pqres_object_prophandlers;
static HashTable php_pqstm_object_prophandlers;
-typedef void (*php_pq_object_prophandler_func_t)(void *o, zval *return_value TSRMLS_DC);
+typedef void (*php_pq_object_prophandler_func_t)(zval *object, void *o, zval *return_value TSRMLS_DC);
typedef struct php_pq_object_prophandler {
php_pq_object_prophandler_func_t read;
php_pq_object_prophandler_func_t write;
} php_pq_object_prophandler_t;
-static void php_pqconn_object_read_status(void *o, zval *return_value TSRMLS_DC)
+static void php_pqconn_object_read_status(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;
RETVAL_LONG(PQstatus(obj->conn));
}
-static void php_pqconn_object_read_transaction_status(void *o, zval *return_value TSRMLS_DC)
+static void php_pqconn_object_read_transaction_status(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;
RETVAL_LONG(PQtransactionStatus(obj->conn));
}
-static void php_pqconn_object_read_error_message(void *o, zval *return_value TSRMLS_DC)
+static void php_pqconn_object_read_error_message(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;
char *error = PQerrorMessage(obj->conn);
}
}
+static int apply_notify_listener(void *p, void *arg TSRMLS_DC)
+{
+ php_pqconn_listener_t *listener = p;
+ PGnotify *nfy = arg;
+ zval *zpid, *zchannel, *zmessage;
+
+ MAKE_STD_ZVAL(zpid);
+ ZVAL_LONG(zpid, nfy->be_pid);
+ MAKE_STD_ZVAL(zchannel);
+ ZVAL_STRING(zchannel, nfy->relname, 1);
+ MAKE_STD_ZVAL(zmessage);
+ ZVAL_STRING(zmessage, nfy->extra, 1);
+
+ zend_fcall_info_argn(&listener->fci TSRMLS_CC, 3, &zchannel, &zmessage, &zpid);
+ zend_fcall_info_call(&listener->fci, &listener->fcc, NULL, NULL TSRMLS_CC);
+
+ zval_ptr_dtor(&zchannel);
+ zval_ptr_dtor(&zmessage);
+ zval_ptr_dtor(&zpid);
+
+ return ZEND_HASH_APPLY_KEEP;
+}
+
+static int apply_notify_listeners(void *p, void *arg TSRMLS_DC)
+{
+ HashTable *listeners = p;
+
+ zend_hash_apply_with_argument(listeners, apply_notify_listener, arg TSRMLS_CC);
+
+ return ZEND_HASH_APPLY_KEEP;
+}
+
+static void php_pqconn_notify_listeners(zval *this_ptr, php_pqconn_object_t *obj TSRMLS_DC)
+{
+ PGnotify *nfy;
+
+ if (!obj) {
+ obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+ }
+
+ while ((nfy = PQnotifies(obj->conn))) {
+ zend_hash_apply_with_argument(&obj->listeners, apply_notify_listeners, nfy TSRMLS_CC);
+ PQfreemem(nfy);
+ }
+}
+
/* FIXME: extend to types->nspname->typname */
#define PHP_PQ_TYPES_QUERY \
"select t.oid, t.* " \
"where typisdefined " \
"and typrelid=0 " \
"and nspname in ('public', 'pg_catalog')"
-static void php_pqconn_object_read_types(void *o, zval *return_value TSRMLS_DC)
+static void php_pqconn_object_read_types(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqconn_object_t *obj = o;
PGresult *res = PQexec(obj->conn, PHP_PQ_TYPES_QUERY);
+ php_pqconn_notify_listeners(object, obj TSRMLS_CC);
+
/* FIXME: cache that */
if (res) {
if (PGRES_TUPLES_OK == PQresultStatus(res)) {
}
}
-static void php_pqres_object_read_status(void *o, zval *return_value TSRMLS_DC)
+static void php_pqres_object_read_status(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
RETVAL_LONG(PQresultStatus(obj->res));
}
-static void php_pqres_object_read_error_message(void *o, zval *return_value TSRMLS_DC)
+static void php_pqres_object_read_error_message(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
char *error = PQresultErrorMessage(obj->res);
}
}
-static void php_pqres_object_read_num_rows(void *o, zval *return_value TSRMLS_DC)
+static void php_pqres_object_read_num_rows(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
RETVAL_LONG(PQntuples(obj->res));
}
-static void php_pqres_object_read_num_cols(void *o, zval *return_value TSRMLS_DC)
+static void php_pqres_object_read_num_cols(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
RETVAL_LONG(PQnfields(obj->res));
}
-static void php_pqres_object_read_affected_rows(void *o, zval *return_value TSRMLS_DC)
+static void php_pqres_object_read_affected_rows(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
RETVAL_LONG(atoi(PQcmdTuples(obj->res)));
}
-static void php_pqres_object_read_fetch_type(void *o, zval *return_value TSRMLS_DC)
+static void php_pqres_object_read_fetch_type(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
}
}
-static void php_pqres_object_write_fetch_type(void *o, zval *value TSRMLS_DC)
+static void php_pqres_object_write_fetch_type(zval *object, void *o, zval *value TSRMLS_DC)
{
php_pqres_object_t *obj = o;
zval *zfetch_type = value;
}
}
-static void php_pqstm_object_read_name(void *o, zval *return_value TSRMLS_DC)
+static void php_pqstm_object_read_name(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqstm_object_t *obj = o;
RETVAL_STRING(obj->name, 1);
}
-static void php_pqstm_object_read_connection(void *o, zval *return_value TSRMLS_DC)
+static void php_pqstm_object_read_connection(zval *object, void *o, zval *return_value TSRMLS_DC)
{
php_pqstm_object_t *obj = o;
Z_SET_REFCOUNT_P(return_value, 0);
Z_UNSET_ISREF_P(return_value);
- handler->read(obj, return_value TSRMLS_CC);
+ handler->read(object, obj, return_value TSRMLS_CC);
} else {
zend_error(E_ERROR, "Cannot access pq\\Connection properties by reference or array key/index");
return_value = NULL;
if (SUCCESS == zend_hash_find(&php_pqconn_object_prophandlers, Z_STRVAL_P(member), Z_STRLEN_P(member)+1, (void *) &handler)) {
if (handler->write) {
- handler->write(obj, value TSRMLS_CC);
+ handler->write(object, obj, value TSRMLS_CC);
}
} else {
zend_get_std_object_handlers()->write_property(object, member, value, key TSRMLS_CC);
Z_SET_REFCOUNT_P(return_value, 0);
Z_UNSET_ISREF_P(return_value);
- handler->read(obj, return_value TSRMLS_CC);
+ handler->read(object, obj, return_value TSRMLS_CC);
} else {
zend_error(E_ERROR, "Cannot access pq\\Result properties by reference or array key/index");
return_value = NULL;
obj->iter = (php_pqres_iterator_t *) php_pqres_iterator_init(Z_OBJCE_P(object), object, 0 TSRMLS_CC);
obj->iter->zi.funcs->rewind((zend_object_iterator *) obj->iter TSRMLS_CC);
}
- handler->write(obj, value TSRMLS_CC);
+ handler->write(object, obj, value TSRMLS_CC);
}
} else {
zend_get_std_object_handlers()->write_property(object, member, value, key TSRMLS_CC);
Z_SET_REFCOUNT_P(return_value, 0);
Z_UNSET_ISREF_P(return_value);
- handler->read(obj, return_value TSRMLS_CC);
+ handler->read(object, obj, return_value TSRMLS_CC);
} else {
zend_error(E_ERROR, "Cannot access pq\\Statement properties by reference or array key/index");
return_value = NULL;
zend_error(E_WARNING, "Result not initialized");
} else if (SUCCESS == zend_hash_find(&php_pqstm_object_prophandlers, Z_STRVAL_P(member), Z_STRLEN_P(member)+1, (void *) &handler)) {
if (handler->write) {
- handler->write(obj, value TSRMLS_CC);
+ handler->write(object, obj, value TSRMLS_CC);
}
} else {
zend_get_std_object_handlers()->write_property(object, member, value, key TSRMLS_CC);
}
}
+static void listener_dtor(void *l) {
+ php_pqconn_listener_t *listener = l;
+
+ zend_fcall_info_args_clear(&listener->fci, 1);
+
+ zval_ptr_dtor(&listener->fci.function_name);
+ if (listener->fci.object_ptr) {
+ zval_ptr_dtor(&listener->fci.object_ptr);
+ }
+}
+
+static void php_pqconn_add_listener(php_pqconn_object_t *obj, const char *channel_str, size_t channel_len, php_pqconn_listener_t *listener TSRMLS_DC)
+{
+ HashTable ht, *existing_listeners;
+
+ Z_ADDREF_P(listener->fci.function_name);
+ if (listener->fci.object_ptr) {
+ Z_ADDREF_P(listener->fci.object_ptr);
+ }
+ if (SUCCESS == zend_hash_find(&obj->listeners, channel_str, channel_len + 1, (void *) &existing_listeners)) {
+ zend_hash_next_index_insert(existing_listeners, (void *) listener, sizeof(*listener), NULL);
+ } else {
+ zend_hash_init(&ht, 1, NULL, (dtor_func_t) listener_dtor, 0);
+ zend_hash_next_index_insert(&ht, (void *) listener, sizeof(*listener), NULL);
+ zend_hash_add(&obj->listeners, channel_str, channel_len + 1, (void *) &ht, sizeof(HashTable), NULL);
+ }
+}
+
+static STATUS php_pqres_success(PGresult *res TSRMLS_DC)
+{
+ switch (PQresultStatus(res)) {
+ case PGRES_BAD_RESPONSE:
+ case PGRES_NONFATAL_ERROR:
+ case PGRES_FATAL_ERROR:
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "%s", PQresultErrorMessage(res));
+ return FAILURE;
+ default:
+ return SUCCESS;
+ }
+}
+
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_listen, 0, 0, 0)
+ ZEND_ARG_INFO(0, channel)
+ ZEND_ARG_INFO(0, callable)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, listen) {
+ char *channel_str = NULL;
+ int channel_len = 0;
+ php_pqconn_listener_t listener;
+
+ if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &channel_str, &channel_len, &listener.fci, &listener.fcc)) {
+ php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+ obj->poller = PQconsumeInput;
+
+ if (obj->conn) {
+ PGresult *res;
+ char cmd[1024];
+
+ slprintf(cmd, sizeof(cmd), "LISTEN %s", channel_str);
+ res = PQexec(obj->conn, cmd);
+
+ if (res) {
+ if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
+ php_pqconn_add_listener(obj, channel_str, channel_len, &listener TSRMLS_CC);
+ RETVAL_TRUE;
+ } else {
+ RETVAL_FALSE;
+ }
+ PQclear(res);
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not install listener: %s", PQerrorMessage(obj->conn));
+ RETVAL_FALSE;
+ }
+
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
+
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Connection not initialized");
+ RETVAL_FALSE;
+ }
+ }
+
+}
+
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_notify, 0, 0, 2)
+ ZEND_ARG_INFO(0, channel)
+ ZEND_ARG_INFO(0, message)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, notify) {
+ char *channel_str, *message_str;
+ int channel_len, message_len;
+
+ if (SUCCESS == zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &channel_str, &channel_len, &message_str, &message_len)) {
+ php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+ if (obj->conn) {
+ PGresult *res;
+ char *params[2] = {channel_str, message_str};
+
+ res = PQexecParams(obj->conn, "select pg_notify($1, $2)", 2, NULL, (const char *const*) params, NULL, NULL, 0);
+
+ if (res) {
+ if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
+ RETVAL_TRUE;
+ } else {
+ RETVAL_FALSE;
+ }
+ PQclear(res);
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Could not notify listeners: %s", PQerrorMessage(obj->conn));
+ RETVAL_FALSE;
+ }
+
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
+
+ } else {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING, "Connection not initialized");
+ RETVAL_FALSE;
+ }
+ }
+}
+
ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_poll, 0, 0, 0)
ZEND_END_ARG_INFO();
static PHP_METHOD(pqconn, poll) {
if (obj->conn) {
if (obj->poller) {
if (obj->poller == PQconsumeInput) {
- RETURN_LONG(obj->poller(obj->conn) * PGRES_POLLING_OK);
+ RETVAL_LONG(obj->poller(obj->conn) * PGRES_POLLING_OK);
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
} else {
RETURN_LONG(obj->poller(obj->conn));
}
}
}
-static STATUS php_pqres_success(PGresult *res TSRMLS_DC)
-{
- switch (PQresultStatus(res)) {
- case PGRES_BAD_RESPONSE:
- case PGRES_NONFATAL_ERROR:
- case PGRES_FATAL_ERROR:
- php_error_docref(NULL TSRMLS_CC, E_WARNING, "%s", PQresultErrorMessage(res));
- return FAILURE;
- default:
- return SUCCESS;
- }
-}
-
ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_exec, 0, 0, 1)
ZEND_ARG_INFO(0, query)
ZEND_END_ARG_INFO();
if (obj->conn) {
PGresult *res = PQexec(obj->conn, query_str);
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
+
if (res) {
if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
return_value->type = IS_OBJECT;
efree(params);
}
+ php_pqconn_notify_listeners(getThis(), obj TSRMLS_CC);
+
if (res) {
if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
return_value->type = IS_OBJECT;
PHP_ME(pqconn, exec, ai_pqconn_exec, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, execParams, ai_pqconn_exec_params, ZEND_ACC_PUBLIC)
PHP_ME(pqconn, prepare, ai_pqconn_prepare, ZEND_ACC_PUBLIC)
+ PHP_ME(pqconn, listen, ai_pqconn_listen, ZEND_ACC_PUBLIC)
+ PHP_ME(pqconn, notify, ai_pqconn_notify, ZEND_ACC_PUBLIC)
{0}
};
memcpy(&php_pqconn_object_handlers, zend_get_std_object_handlers(), sizeof(zend_object_handlers));
php_pqconn_object_handlers.read_property = php_pqconn_object_read_prop;
php_pqconn_object_handlers.write_property = php_pqconn_object_write_prop;
+ php_pqconn_object_handlers.clone_obj = NULL;
+ php_pqconn_object_handlers.get_property_ptr_ptr = NULL;
zend_declare_property_long(php_pqconn_class_entry, ZEND_STRL("status"), CONNECTION_BAD, ZEND_ACC_PUBLIC TSRMLS_CC);
ph.read = php_pqconn_object_read_status;