add support for nonblocking writes
authorMichael Wallner <mike@php.net>
Wed, 18 May 2016 12:20:21 +0000 (14:20 +0200)
committerMichael Wallner <mike@php.net>
Wed, 18 May 2016 12:20:21 +0000 (14:20 +0200)
added pq\Connection::$nonblocking and pq\Connection::flush()

Closes gh-issue #16

package.xml
src/php_pqconn.c
tests/flush001.phpt [new file with mode: 0644]

index 917e7ebbfd1ff9cb5ddfa6e9787b49215dcda10f..5efced2bdada8e3291308179203f7f9b725af3eb 100644 (file)
@@ -48,6 +48,7 @@
  <notes><![CDATA[
 * Added public readonly array pq\Result::$diag property, listing PQresultErrorField details (gh-issue #14)
 * Restore listeners and prepared statements after a connection reset (gh-issue #15)
+* Added pq\Connection::$nonblocking and pq\Connection::flush() to support non-blocking writes (gh-issue #16)
 ]]></notes>
  <contents>
   <dir name="/">
     <file role="test" name="exceptions001.phpt" />
     <file role="test" name="exceptions002.phpt" />
     <file role="test" name="fetch001.phpt" />
+    <file role="test" name="flush001.phpt" />
     <file role="test" name="gh-issue015_listeners.phpt" />
     <file role="test" name="gh-issue015_statements.phpt" />
     <file role="test" name="info001.phpt" />
index ed7cbe180195ea67aaa2840e60a1e253efe116d8..c2cd40316289a2fd3b718c1f229c465d9090b5ed 100644 (file)
@@ -242,6 +242,20 @@ static void php_pqconn_object_write_unbuffered(zval *object, void *o, zval *valu
        obj->intern->unbuffered = z_is_true(value);
 }
 
+static void php_pqconn_object_read_nonblocking(zval *object, void *o, zval *return_value TSRMLS_DC)
+{
+       php_pqconn_object_t *obj = o;
+
+       RETVAL_BOOL(PQisnonblocking(obj->intern->conn));
+}
+
+static void php_pqconn_object_write_nonblocking(zval *object, void *o, zval *value TSRMLS_DC)
+{
+       php_pqconn_object_t *obj = o;
+
+       PQsetnonblocking(obj->intern->conn, z_is_true(value));
+}
+
 static void php_pqconn_object_read_db(zval *object, void *o, zval *return_value TSRMLS_DC)
 {
        php_pqconn_object_t *obj = o;
@@ -1050,6 +1064,40 @@ static PHP_METHOD(pqconn, poll) {
        }
 }
 
+ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_flush, 0, 0, 0)
+ZEND_END_ARG_INFO();
+static PHP_METHOD(pqconn, flush) {
+       zend_error_handling zeh;
+       ZEND_RESULT_CODE rv;
+
+       zend_replace_error_handling(EH_THROW, exce(EX_INVALID_ARGUMENT), &zeh TSRMLS_CC);
+       rv = zend_parse_parameters_none();
+       zend_restore_error_handling(&zeh TSRMLS_CC);
+
+       if (SUCCESS == rv) {
+               php_pqconn_object_t *obj = zend_object_store_get_object(getThis() TSRMLS_CC);
+
+               if (!obj->intern) {
+                       throw_exce(EX_UNINITIALIZED TSRMLS_CC, "pq\\Connection not initialized");
+               } else if (!obj->intern->poller) {
+                       throw_exce(EX_RUNTIME TSRMLS_CC, "No asynchronous operation active");
+               } else {
+                       switch (PQflush(obj->intern->conn)) {
+                       case -1:
+                       default:
+                               throw_exce(EX_RUNTIME TSRMLS_CC, "Failed to flush connection: %s", PHP_PQerrorMessage(obj->intern->conn));
+                               break;
+                       case 0:
+                               RETVAL_TRUE;
+                               break;
+                       case 1:
+                               RETVAL_FALSE;
+                               break;
+                       }
+               }
+       }
+}
+
 ZEND_BEGIN_ARG_INFO_EX(ai_pqconn_exec, 0, 0, 1)
        ZEND_ARG_INFO(0, query)
 ZEND_END_ARG_INFO();
@@ -1918,6 +1966,7 @@ static zend_function_entry php_pqconn_methods[] = {
        PHP_ME(pqconn, reset, ai_pqconn_reset, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, resetAsync, ai_pqconn_reset_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, poll, ai_pqconn_poll, ZEND_ACC_PUBLIC)
+       PHP_ME(pqconn, flush, ai_pqconn_flush, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, exec, ai_pqconn_exec, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, execAsync, ai_pqconn_exec_async, ZEND_ACC_PUBLIC)
        PHP_ME(pqconn, execParams, ai_pqconn_exec_params, ZEND_ACC_PUBLIC)
@@ -1971,7 +2020,7 @@ PHP_MINIT_FUNCTION(pqconn)
        php_pqconn_object_handlers.get_properties = php_pq_object_properties;
        php_pqconn_object_handlers.get_debug_info = php_pq_object_debug_info;
 
-       zend_hash_init(&php_pqconn_object_prophandlers, 20, NULL, NULL, 1);
+       zend_hash_init(&php_pqconn_object_prophandlers, 21, NULL, NULL, 1);
 
        zend_declare_property_long(php_pqconn_class_entry, ZEND_STRL("status"), CONNECTION_BAD, ZEND_ACC_PUBLIC TSRMLS_CC);
        ph.read = php_pqconn_object_read_status;
@@ -2005,6 +2054,12 @@ PHP_MINIT_FUNCTION(pqconn)
        zend_hash_add(&php_pqconn_object_prophandlers, "unbuffered", sizeof("unbuffered"), (void *) &ph, sizeof(ph), NULL);
        ph.write = NULL;
 
+       zend_declare_property_bool(php_pqconn_class_entry, ZEND_STRL("nonblocking"), 0, ZEND_ACC_PUBLIC TSRMLS_CC);
+       ph.read = php_pqconn_object_read_nonblocking;
+       ph.write = php_pqconn_object_write_nonblocking;
+       zend_hash_add(&php_pqconn_object_prophandlers, "nonblocking", sizeof("nonblocking"), (void *) &ph, sizeof(ph), NULL);
+       ph.write = NULL;
+
        zend_declare_property_null(php_pqconn_class_entry, ZEND_STRL("db"), ZEND_ACC_PUBLIC TSRMLS_CC);
        ph.read = php_pqconn_object_read_db;
        zend_hash_add(&php_pqconn_object_prophandlers, "db", sizeof("db"), (void *) &ph, sizeof(ph), NULL);
diff --git a/tests/flush001.phpt b/tests/flush001.phpt
new file mode 100644 (file)
index 0000000..c5a40e3
--- /dev/null
@@ -0,0 +1,44 @@
+--TEST--
+flush
+--SKIPIF--
+<?php include "_skipif.inc"; ?>
+--FILE--
+<?php
+echo "Test\n";
+
+include "_setup.inc";
+
+$c = new pq\Connection(PQ_DSN);
+$c->nonblocking = true;
+var_dump($c->nonblocking);
+$c->execAsync("SELECT '".str_repeat("a", 6e7)."'", function($r) {
+       $r->fetchCol($s);
+       var_dump(strlen($s));
+});
+var_dump($flushed = $c->flush());
+do {
+       while (!$flushed || $c->busy) {
+               $r = $c->busy ? [$c->socket] : null;
+               $w = !$flushed ?[$c->socket] : null; 
+               
+               if (stream_select($r, $w, $e, null)) {
+                       if ($r) {
+                               printf("P%d", $c->poll());
+                       }
+                       if ($w) {
+                               printf("F%d", $flushed = $c->flush());
+                       }
+               }
+       }
+       echo "\n";
+} while ($c->getResult());
+?>
+===DONE===
+--EXPECTF--
+Test
+bool(true)
+bool(%s)
+%r(F0)*(F1)*(P3)+%r
+int(60000000)
+
+===DONE===