add support for nonblocking writes
[m6w6/ext-pq] / src / php_pqconn.c
index 7d066c59df5056807e6bd50cb7a80e4e5e75d759..c2cd40316289a2fd3b718c1f229c465d9090b5ed 100644 (file)
@@ -78,6 +78,7 @@ static void php_pqconn_object_free(void *o TSRMLS_DC)
                php_resource_factory_handle_dtor(&obj->intern->factory, obj->intern->conn TSRMLS_CC);
                php_resource_factory_dtor(&obj->intern->factory);
                zend_hash_destroy(&obj->intern->listeners);
+               zend_hash_destroy(&obj->intern->statements);
                zend_hash_destroy(&obj->intern->converters);
                zend_hash_destroy(&obj->intern->eventhandlers);
                efree(obj->intern);
@@ -241,6 +242,20 @@ static void php_pqconn_object_write_unbuffered(zval *object, void *o, zval *valu
        obj->intern->unbuffered = z_is_true(value);
 }
 
+static void php_pqconn_object_read_nonblocking(zval *object, void *o, zval *return_value TSRMLS_DC)
+{
+       php_pqconn_object_t *obj = o;
+
+       RETVAL_BOOL(PQisnonblocking(obj->intern->conn));
+}
+
+static void php_pqconn_object_write_nonblocking(zval *object, void *o, zval *value TSRMLS_DC)
+{
+       php_pqconn_object_t *obj = o;
+
+       PQsetnonblocking(obj->intern->conn, z_is_true(value));
+}
+
 static void php_pqconn_object_read_db(zval *object, void *o, zval *return_value TSRMLS_DC)
 {
        php_pqconn_object_t *obj = o;
@@ -486,11 +501,11 @@ static void php_pqconn_object_write_def_auto_conv(zval*object, void *o, zval *va
        }
 }
 
-static STATUS php_pqconn_update_socket(zval *this_ptr, php_pqconn_object_t *obj TSRMLS_DC)
+static ZEND_RESULT_CODE php_pqconn_update_socket(zval *this_ptr, php_pqconn_object_t *obj TSRMLS_DC)
 {
        zval *zsocket, zmember;
        php_stream *stream;
-       STATUS retval;
+       ZEND_RESULT_CODE retval;
        int socket;
 
        if (!obj) {
@@ -563,7 +578,7 @@ php_resource_factory_ops_t *php_pqconn_get_resource_factory_ops(void)
 static void php_pqconn_wakeup(php_persistent_handle_factory_t *f, void **handle TSRMLS_DC)
 {
        PGresult *res = PQexec(*handle, "");
-       PHP_PQclear(res);
+       php_pqres_clear(res);
 
        if (CONNECTION_OK != PQstatus(*handle)) {
                PQreset(*handle);
@@ -597,7 +612,7 @@ static int apply_unlisten(void *p TSRMLS_DC, int argc, va_list argv, zend_hash_k
        PGresult *res = unlisten(obj->intern->conn, key->arKey, key->nKeyLength - 1 TSRMLS_CC);
 
        if (res) {
-               PHP_PQclear(res);
+               php_pqres_clear(res);
        }
 
        return ZEND_HASH_APPLY_REMOVE;
@@ -624,7 +639,7 @@ static void php_pqconn_retire(php_persistent_handle_factory_t *f, void **handle
        }
        /* clean up async results */
        while ((res = PQgetResult(*handle))) {
-               PHP_PQclear(res);
+               php_pqres_clear(res);
        }
 
        /* clean up transaction & session */
@@ -638,7 +653,7 @@ static void php_pqconn_retire(php_persistent_handle_factory_t *f, void **handle
        }
 
        if (res) {
-               PHP_PQclear(res);
+               php_pqres_clear(res);
        }
 
        if (evdata) {
@@ -652,14 +667,14 @@ static void php_pqconn_retire(php_persistent_handle_factory_t *f, void **handle
 
 ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_construct, 0, 0, 1)
        ZEND_ARG_INFO(0, dsn)
-       ZEND_ARG_INFO(0, async)
+       ZEND_ARG_INFO(0, flags)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, __construct) {
        zend_error_handling zeh;
        char *dsn_str = "";
        int dsn_len = 0;
        long flags = 0;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|sl", &dsn_str, &dsn_len, &flags);
@@ -679,12 +694,13 @@ static PHP_METHOD(pqconn, __construct) {
                        obj->intern->default_auto_convert = PHP_PQRES_CONV_ALL;
 
                        zend_hash_init(&obj->intern->listeners, 0, NULL, (dtor_func_t) zend_hash_destroy, 0);
+                       zend_hash_init(&obj->intern->statements, 0, NULL, NULL, 0);
                        zend_hash_init(&obj->intern->converters, 0, NULL, ZVAL_PTR_DTOR, 0);
                        zend_hash_init(&obj->intern->eventhandlers, 0, NULL, (dtor_func_t) zend_hash_destroy, 0);
 
                        if (flags & PHP_PQCONN_PERSISTENT) {
                                php_persistent_handle_factory_t *phf = php_persistent_handle_concede(NULL, ZEND_STRL("pq\\Connection"), dsn_str, dsn_len, php_pqconn_wakeup, php_pqconn_retire TSRMLS_CC);
-                               php_resource_factory_init(&obj->intern->factory, php_persistent_handle_get_resource_factory_ops(), phf, (void (*)(void*)) php_persistent_handle_abandon);
+                               php_persistent_handle_resource_factory_init(&obj->intern->factory, phf);
                        } else {
                                php_resource_factory_init(&obj->intern->factory, &php_pqconn_resource_factory_ops, NULL, NULL);
                        }
@@ -709,7 +725,7 @@ ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_reset, 0, 0, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, reset) {
        zend_error_handling zeh;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters_none();
@@ -736,7 +752,7 @@ ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_reset_async, 0, 0, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, resetAsync) {
        zend_error_handling zeh;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters_none();
@@ -767,7 +783,7 @@ static PHP_METHOD(pqconn, unlisten)
        zend_error_handling zeh;
        char *channel_str;
        int channel_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &channel_str, &channel_len);
@@ -783,7 +799,7 @@ static PHP_METHOD(pqconn, unlisten)
 
                        if (res) {
                                php_pqres_success(res TSRMLS_CC);
-                               PHP_PQclear(res);
+                               php_pqres_clear(res);
                        }
                }
        }
@@ -796,7 +812,7 @@ static PHP_METHOD(pqconn, unlistenAsync) {
        zend_error_handling zeh;
        char *channel_str;
        int channel_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &channel_str, &channel_len);
@@ -858,7 +874,7 @@ static PHP_METHOD(pqconn, listen) {
        char *channel_str = NULL;
        int channel_len = 0;
        php_pq_callback_t listener = {{0}};
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &channel_str, &channel_len, &listener.fci, &listener.fcc);
@@ -882,7 +898,7 @@ static PHP_METHOD(pqconn, listen) {
                                smart_str_appends(&cmd, quoted_channel);
                                smart_str_0(&cmd);
 
-                               res = PQexec(obj->intern->conn, cmd.c);
+                               res = php_pq_exec(obj->intern->conn, cmd.c);
 
                                smart_str_free(&cmd);
                                PQfreemem(quoted_channel);
@@ -894,7 +910,7 @@ static PHP_METHOD(pqconn, listen) {
                                                obj->intern->poller = PQconsumeInput;
                                                php_pqconn_add_listener(obj, channel_str, channel_len, &listener TSRMLS_CC);
                                        }
-                                       PHP_PQclear(res);
+                                       php_pqres_clear(res);
                                }
 
                                php_pqconn_notify_listeners(obj TSRMLS_CC);
@@ -912,7 +928,7 @@ static PHP_METHOD(pqconn, listenAsync) {
        char *channel_str = NULL;
        int channel_len = 0;
        php_pq_callback_t listener = {{0}};
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &channel_str, &channel_len, &listener.fci, &listener.fcc);
@@ -958,7 +974,7 @@ static PHP_METHOD(pqconn, notify) {
        zend_error_handling zeh;
        char *channel_str, *message_str;
        int channel_len, message_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &channel_str, &channel_len, &message_str, &message_len);
@@ -979,7 +995,7 @@ static PHP_METHOD(pqconn, notify) {
                                throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to notify listeners (%s)", PHP_PQerrorMessage(obj->intern->conn));
                        } else {
                                php_pqres_success(res TSRMLS_CC);
-                               PHP_PQclear(res);
+                               php_pqres_clear(res);
                        }
 
                        php_pqconn_notify_listeners(obj TSRMLS_CC);
@@ -995,7 +1011,7 @@ static PHP_METHOD(pqconn, notifyAsync) {
        zend_error_handling zeh;
        char *channel_str, *message_str;
        int channel_len, message_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &channel_str, &channel_len, &message_str, &message_len);
@@ -1024,7 +1040,7 @@ ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_poll, 0, 0, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, poll) {
        zend_error_handling zeh;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters_none();
@@ -1048,6 +1064,40 @@ static PHP_METHOD(pqconn, poll) {
        }
 }
 
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_flush, 0, 0, 0)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, flush) {
+       zend_error_handling zeh;
+       ZEND_RESULT_CODE rv;
+
+       zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
+       rv = zend_parse_parameters_none();
+       zend_restore_error_handling(&zeh TSRMLS_CC);
+
+       if (SUCCESS == rv) {
+               php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+               if (!obj->intern) {
+                       throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
+               } else if (!obj->intern->poller) {
+                       throw_exce(EX_RUNTIME TSRMLS_CC, "No asynchronous operation active");
+               } else {
+                       switch (PQflush(obj->intern->conn)) {
+                       case -1:
+                       default:
+                               throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to flush connection: %s", PHP_PQerrorMessage(obj->intern->conn));
+                               break;
+                       case 0:
+                               RETVAL_TRUE;
+                               break;
+                       case 1:
+                               RETVAL_FALSE;
+                               break;
+                       }
+               }
+       }
+}
+
 ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_exec, 0, 0, 1)
        ZEND_ARG_INFO(0, query)
 ZEND_END_ARG_INFO();
@@ -1055,7 +1105,7 @@ static PHP_METHOD(pqconn, exec) {
        zend_error_handling zeh;
        char *query_str;
        int query_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &query_str, &query_len);
@@ -1067,14 +1117,14 @@ static PHP_METHOD(pqconn, exec) {
                if (!obj->intern) {
                        throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
                } else {
-                       PGresult *res = PQexec(obj->intern->conn, query_str);
+                       PGresult *res = php_pq_exec(obj->intern->conn, query_str);
 
                        if (!res) {
                                throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to execute query (%s)", PHP_PQerrorMessage(obj->intern->conn));
                        } else if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
                                php_pq_object_to_zval_no_addref(PQresultInstanceData(res, php_pqconn_event), &return_value TSRMLS_CC);
                        } else {
-                               PHP_PQclear(res);
+                               php_pqres_clear(res);
                        }
 
                        php_pqconn_notify_listeners(obj TSRMLS_CC);
@@ -1086,7 +1136,7 @@ ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_get_result, 0, 0, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, getResult) {
        zend_error_handling zeh;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters_none();
@@ -1120,7 +1170,7 @@ static PHP_METHOD(pqconn, execAsync) {
        php_pq_callback_t resolver = {{0}};
        char *query_str;
        int query_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|f", &query_str, &query_len, &resolver.fci, &resolver.fcc);
@@ -1156,7 +1206,7 @@ static PHP_METHOD(pqconn, execParams) {
        int query_len;
        zval *zparams;
        zval *ztypes = NULL;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa/|a/!", &query_str, &query_len, &zparams, &ztypes);
@@ -1181,7 +1231,7 @@ static PHP_METHOD(pqconn, execParams) {
                                if (SUCCESS == php_pqres_success(res TSRMLS_CC)) {
                                        php_pq_object_to_zval_no_addref(PQresultInstanceData(res, php_pqconn_event), &return_value TSRMLS_CC);
                                } else {
-                                       PHP_PQclear(res);
+                                       php_pqres_clear(res);
                                }
 
                                php_pqconn_notify_listeners(obj TSRMLS_CC);
@@ -1203,7 +1253,7 @@ static PHP_METHOD(pqconn, execParamsAsync) {
        int query_len;
        zval *zparams;
        zval *ztypes = NULL;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa/|a/!f", &query_str, &query_len, &zparams, &ztypes, &resolver.fci, &resolver.fcc);
@@ -1238,23 +1288,23 @@ static PHP_METHOD(pqconn, execParamsAsync) {
        zend_restore_error_handling(&zeh TSRMLS_CC);
 }
 
-STATUS php_pqconn_prepare(zval *object, php_pqconn_object_t *obj, const char *name, const char *query, php_pq_params_t *params TSRMLS_DC)
+ZEND_RESULT_CODE php_pqconn_prepare(zval *object, php_pqconn_object_t *obj, const char *name, const char *query, php_pq_params_t *params TSRMLS_DC)
 {
        PGresult *res;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        if (!obj) {
                obj = zend_object_store_get_object(object TSRMLS_CC);
        }
 
-       res = PQprepare(obj->intern->conn, name, query, params->type.count, params->type.oids);
+       res = php_pq_prepare(obj->intern->conn, name, query, params->type.count, params->type.oids);
 
        if (!res) {
                rv = FAILURE;
                throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to prepare statement (%s)", PHP_PQerrorMessage(obj->intern->conn));
        } else {
                rv = php_pqres_success(res TSRMLS_CC);
-               PHP_PQclear(res);
+               php_pqres_clear(res);
                php_pqconn_notify_listeners(obj TSRMLS_CC);
        }
 
@@ -1271,7 +1321,7 @@ static PHP_METHOD(pqconn, prepare) {
        zval *ztypes = NULL;
        char *name_str, *query_str;
        int name_len, *query_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|a/!", &name_str, &name_len, &query_str, &query_len, &ztypes);
@@ -1297,9 +1347,9 @@ static PHP_METHOD(pqconn, prepare) {
        }
 }
 
-STATUS php_pqconn_prepare_async(zval *object, php_pqconn_object_t *obj, const char *name, const char *query, php_pq_params_t *params TSRMLS_DC)
+ZEND_RESULT_CODE php_pqconn_prepare_async(zval *object, php_pqconn_object_t *obj, const char *name, const char *query, php_pq_params_t *params TSRMLS_DC)
 {
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        if (!obj) {
                obj = zend_object_store_get_object(object TSRMLS_CC);
@@ -1327,7 +1377,7 @@ static PHP_METHOD(pqconn, prepareAsync) {
        zval *ztypes = NULL;
        char *name_str, *query_str;
        int name_len, *query_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|a/!", &name_str, &name_len, &query_str, &query_len, &ztypes);
@@ -1353,23 +1403,23 @@ static PHP_METHOD(pqconn, prepareAsync) {
        }
 }
 
-STATUS php_pqconn_declare(zval *object, php_pqconn_object_t *obj, const char *decl TSRMLS_DC)
+ZEND_RESULT_CODE php_pqconn_declare(zval *object, php_pqconn_object_t *obj, const char *decl TSRMLS_DC)
 {
        PGresult *res;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        if (!obj) {
                obj = zend_object_store_get_object(object TSRMLS_CC);
        }
 
-       res = PQexec(obj->intern->conn, decl);
+       res = php_pq_exec(obj->intern->conn, decl);
 
        if (!res) {
                rv = FAILURE;
                throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to declare cursor (%s)", PHP_PQerrorMessage(obj->intern->conn));
        } else {
                rv = php_pqres_success(res TSRMLS_CC);
-               PHP_PQclear(res);
+               php_pqres_clear(res);
                php_pqconn_notify_listeners(obj TSRMLS_CC);
        }
 
@@ -1386,7 +1436,7 @@ static PHP_METHOD(pqconn, declare) {
        char *name_str, *query_str;
        int name_len, query_len;
        long flags;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sls", &name_str, &name_len, &flags, &query_str, &query_len);
@@ -1398,19 +1448,13 @@ static PHP_METHOD(pqconn, declare) {
                if (!obj->intern) {
                        throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
                } else {
-                       char *decl = php_pqcur_declare_str(name_str, name_len, flags, query_str, query_len);
+                       int query_offset;
+                       char *decl = php_pqcur_declare_str(name_str, name_len, flags, query_str, query_len, &query_offset);
 
                        if (SUCCESS != php_pqconn_declare(getThis(), obj, decl TSRMLS_CC)) {
                                efree(decl);
                        } else {
-                               php_pqcur_t *cur = ecalloc(1, sizeof(*cur));
-
-                               php_pq_object_addref(obj TSRMLS_CC);
-                               cur->conn = obj;
-                               cur->open = 1;
-                               cur->name = estrdup(name_str);
-                               cur->decl = decl;
-                               cur->flags = flags;
+                               php_pqcur_t *cur = php_pqcur_init(obj, name_str, decl, query_offset, flags TSRMLS_CC);
 
                                return_value->type = IS_OBJECT;
                                return_value->value.obj = php_pqcur_create_object_ex(php_pqcur_class_entry, cur, NULL TSRMLS_CC);
@@ -1419,9 +1463,9 @@ static PHP_METHOD(pqconn, declare) {
        }
 }
 
-STATUS php_pqconn_declare_async(zval *object, php_pqconn_object_t *obj, const char *decl TSRMLS_DC)
+ZEND_RESULT_CODE php_pqconn_declare_async(zval *object, php_pqconn_object_t *obj, const char *decl TSRMLS_DC)
 {
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        if (!obj) {
                obj = zend_object_store_get_object(object TSRMLS_CC);
@@ -1449,7 +1493,7 @@ static PHP_METHOD(pqconn, declareAsync) {
        char *name_str, *query_str;
        int name_len, query_len;
        long flags;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sls", &name_str, &name_len, &flags, &query_str, &query_len);
@@ -1461,18 +1505,13 @@ static PHP_METHOD(pqconn, declareAsync) {
                if (!obj->intern) {
                        throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
                } else {
-                       char *decl = php_pqcur_declare_str(name_str, name_len, flags, query_str, query_len);
+                       int query_offset;
+                       char *decl = php_pqcur_declare_str(name_str, name_len, flags, query_str, query_len, &query_offset);
 
                        if (SUCCESS != php_pqconn_declare_async(getThis(), obj, decl TSRMLS_CC)) {
                                efree(decl);
                        } else {
-                               php_pqcur_t *cur = ecalloc(1, sizeof(*cur));
-
-                               php_pq_object_addref(obj TSRMLS_CC);
-                               cur->conn = obj;
-                               cur->open = 1;
-                               cur->name = estrdup(name_str);
-                               cur->decl = decl;
+                               php_pqcur_t *cur = php_pqcur_init(obj, name_str, decl, query_offset, flags TSRMLS_CC);
 
                                return_value->type = IS_OBJECT;
                                return_value->value.obj = php_pqcur_create_object_ex(php_pqcur_class_entry, cur, NULL TSRMLS_CC);
@@ -1587,9 +1626,9 @@ static PHP_METHOD(pqconn, unescapeBytea) {
        }
 }
 
-STATUS php_pqconn_start_transaction(zval *zconn, php_pqconn_object_t *conn_obj, long isolation, zend_bool readonly, zend_bool deferrable TSRMLS_DC)
+ZEND_RESULT_CODE php_pqconn_start_transaction(zval *zconn, php_pqconn_object_t *conn_obj, long isolation, zend_bool readonly, zend_bool deferrable TSRMLS_DC)
 {
-       STATUS rv = FAILURE;
+       ZEND_RESULT_CODE rv = FAILURE;
 
        if (!conn_obj) {
                conn_obj = zend_object_store_get_object(zconn TSRMLS_CC);
@@ -1611,13 +1650,13 @@ STATUS php_pqconn_start_transaction(zval *zconn, php_pqconn_object_t *conn_obj,
                smart_str_appends(&cmd, " DEFERRABLE");
                smart_str_0(&cmd);
 
-               res = PQexec(conn_obj->intern->conn, cmd.c);
+               res = php_pq_exec(conn_obj->intern->conn, cmd.c);
 
                if (!res) {
                        throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to start transaction (%s)", PHP_PQerrorMessage(conn_obj->intern->conn));
                } else {
                        rv = php_pqres_success(res TSRMLS_CC);
-                       PHP_PQclear(res);
+                       php_pqres_clear(res);
                        php_pqconn_notify_listeners(conn_obj TSRMLS_CC);
                }
 
@@ -1627,9 +1666,9 @@ STATUS php_pqconn_start_transaction(zval *zconn, php_pqconn_object_t *conn_obj,
        return rv;
 }
 
-STATUS php_pqconn_start_transaction_async(zval *zconn, php_pqconn_object_t *conn_obj, long isolation, zend_bool readonly, zend_bool deferrable TSRMLS_DC)
+ZEND_RESULT_CODE php_pqconn_start_transaction_async(zval *zconn, php_pqconn_object_t *conn_obj, long isolation, zend_bool readonly, zend_bool deferrable TSRMLS_DC)
 {
-       STATUS rv = FAILURE;
+       ZEND_RESULT_CODE rv = FAILURE;
 
        if (!conn_obj) {
                conn_obj = zend_object_store_get_object(zconn TSRMLS_CC);
@@ -1675,7 +1714,7 @@ static PHP_METHOD(pqconn, startTransaction) {
        long isolation = obj->intern ? obj->intern->default_txn_isolation : PHP_PQTXN_READ_COMMITTED;
        zend_bool readonly = obj->intern ? obj->intern->default_txn_readonly : 0;
        zend_bool deferrable = obj->intern ? obj->intern->default_txn_deferrable : 0;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|lbb", &isolation, &readonly, &deferrable);
@@ -1711,7 +1750,7 @@ static PHP_METHOD(pqconn, startTransactionAsync) {
        long isolation = obj->intern ? obj->intern->default_txn_isolation : PHP_PQTXN_READ_COMMITTED;
        zend_bool readonly = obj->intern ? obj->intern->default_txn_readonly : 0;
        zend_bool deferrable = obj->intern ? obj->intern->default_txn_deferrable : 0;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|lbb", &isolation, &readonly, &deferrable);
@@ -1776,7 +1815,7 @@ static PHP_METHOD(pqconn, off) {
        zend_error_handling zeh;
        char *type_str;
        int type_len;
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &type_str, &type_len);
@@ -1802,7 +1841,7 @@ static PHP_METHOD(pqconn, on) {
        char *type_str;
        int type_len;
        php_pq_callback_t cb = {{0}};
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
 
        zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
        rv = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sf", &type_str, &type_len, &cb.fci, &cb.fcc);
@@ -1850,7 +1889,7 @@ ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_set_converter, 0, 0, 1)
        ZEND_ARG_OBJ_INFO(0, converter, pq\\Converter, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, setConverter) {
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
        zend_error_handling zeh;
        zval *zcnv;
 
@@ -1888,7 +1927,7 @@ ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_unset_converter, 0, 0, 1)
        ZEND_ARG_OBJ_INFO(0, converter, pq\\Converter, 0)
 ZEND_END_ARG_INFO();
 static PHP_METHOD(pqconn, unsetConverter) {
-       STATUS rv;
+       ZEND_RESULT_CODE rv;
        zend_error_handling zeh;
        zval *zcnv;
 
@@ -1927,6 +1966,7 @@ static zend_function_entry php_pqconn_methods[] = {
        PHP_ME(pqconn, reset, ai_pqconn_reset, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, resetAsync, ai_pqconn_reset_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, poll, ai_pqconn_poll, ZEND_ACC_PUBLIC)
+       PHP_ME(pqconn, flush, ai_pqconn_flush, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, exec, ai_pqconn_exec, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, execAsync, ai_pqconn_exec_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, execParams, ai_pqconn_exec_params, ZEND_ACC_PUBLIC)
@@ -1980,7 +2020,7 @@ PHP_MINIT_FUNCTION(pqconn)
        php_pqconn_object_handlers.get_properties = php_pq_object_properties;
        php_pqconn_object_handlers.get_debug_info = php_pq_object_debug_info;
 
-       zend_hash_init(&php_pqconn_object_prophandlers, 20, NULL, NULL, 1);
+       zend_hash_init(&php_pqconn_object_prophandlers, 21, NULL, NULL, 1);
 
        zend_declare_property_long(php_pqconn_class_entry, ZEND_STRL("status"), CONNECTION_BAD, ZEND_ACC_PUBLIC TSRMLS_CC);
        ph.read = php_pqconn_object_read_status;
@@ -2014,6 +2054,12 @@ PHP_MINIT_FUNCTION(pqconn)
        zend_hash_add(&php_pqconn_object_prophandlers, "unbuffered", sizeof("unbuffered"), (void *) &ph, sizeof(ph), NULL);
        ph.write = NULL;
 
+       zend_declare_property_bool(php_pqconn_class_entry, ZEND_STRL("nonblocking"), 0, ZEND_ACC_PUBLIC TSRMLS_CC);
+       ph.read = php_pqconn_object_read_nonblocking;
+       ph.write = php_pqconn_object_write_nonblocking;
+       zend_hash_add(&php_pqconn_object_prophandlers, "nonblocking", sizeof("nonblocking"), (void *) &ph, sizeof(ph), NULL);
+       ph.write = NULL;
+
        zend_declare_property_null(php_pqconn_class_entry, ZEND_STRL("db"), ZEND_ACC_PUBLIC TSRMLS_CC);
        ph.read = php_pqconn_object_read_db;
        zend_hash_add(&php_pqconn_object_prophandlers, "db", sizeof("db"), (void *) &ph, sizeof(ph), NULL);