storage test
authorMichael Wallner <mike@php.net>
Wed, 23 Sep 2015 10:51:39 +0000 (12:51 +0200)
committerMichael Wallner <mike@php.net>
Wed, 23 Sep 2015 10:51:39 +0000 (12:51 +0200)
lib/pq/Mapper/Storage.php
lib/pq/Mapper/StorageInterface.php
lib/pq/Query/AsyncExecutor.php
lib/pq/Query/Executor.php
lib/pq/Query/ExecutorInterface.php
tests/lib/pq/Gateway/RowTest.php
tests/lib/pq/Mapper/StorageTest.php
tests/setup.inc

index 8016ef8b8cc13693ce368241f4c3f199d860ef6a..4882eda881fb512232d6eb85b4455a92ad6e4afc 100644 (file)
@@ -2,7 +2,10 @@
 
 namespace pq\Mapper;
 
+use InvalidArgumentException;
+use pq\Connection;
 use pq\Gateway\Table;
+use pq\Transaction;
 
 class Storage implements StorageInterface
 {
@@ -18,6 +21,12 @@ class Storage implements StorageInterface
         */
        private $gateway;
 
+       /**
+        * Buffered transaction
+        * @var Transaction
+        */
+       private $xaction;
+
        /**
         * Create a storage for $map
         * @param MapInterface $map
@@ -27,6 +36,30 @@ class Storage implements StorageInterface
                $this->gateway = $map->getGateway();
        }
 
+       /**
+        * Find by PK
+        * @param mixed $pk
+        * @return object
+        */
+       function get($pk) {
+               $id = $this->gateway->getIdentity();
+               if (count($id) == 1 && is_scalar($pk)) {
+                       $pk = [current($id->getColumns()) => $pk];
+               } elseif (!is_array($pk) || count($pk) !== count($id)) {
+                       throw InvalidArgumentException(
+                               "Insufficient identity provided; not all fields of %s are provided in %s",
+                               json_encode($id->getColumns()), json_encode($pk));
+               }
+
+               $where = [];
+               foreach ($pk as $k => $v) {
+                       $where["$k="] = $v;
+               }
+               $rowset = $this->gateway->find($where);
+               
+               return $this->map->map($rowset->current());
+       }
+
        /**
         * Find
         * @param array $where
@@ -59,4 +92,41 @@ class Storage implements StorageInterface
        function save($object) {
                $this->map->unmap($object);
        }
+
+       /**
+        * Buffer in a transaction
+        */
+       function buffer() {
+               switch ($this->gateway->getConnection()->transactionStatus) {
+               case Connection::TRANS_INTRANS:
+                       break;
+               default:
+                       $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("START TRANSACTION"));
+               }
+       }
+
+       /**
+        * Commit
+        */
+       function flush() {
+               switch ($this->gateway->getConnection()->transactionStatus) {
+               case Connection::TRANS_IDLE:
+                       break;
+               default:
+                       $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("COMMIT"));
+               }
+       }
+
+       /**
+        * Rollback
+        */
+       function discard() {
+               switch ($this->gateway->getConnection()->transactionStatus) {
+               case Connection::TRANS_IDLE:
+                       break;
+               default:
+                       $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("ROLLBACK"));
+               }
+               $this->map->getObjects()->reset();
+       }
 }
index bb5c94a0f02207d9adf1172a5163eb86f5f6d4a1..eb75f633b63fdfe9d2ce375a71ec60d719c4b5b2 100644 (file)
@@ -4,6 +4,13 @@ namespace pq\Mapper;
 
 interface StorageInterface
 {
+       /**
+        * Find by PK
+        * @param mixed $pk
+        * @return object
+        */
+       function get($pk);
+       
        /**
         * Find
         * @param array $where
@@ -25,4 +32,19 @@ interface StorageInterface
         * @param object $object
         */
        function save($object);
+
+       /**
+        * Buffer in a transaction
+        */
+       function buffer();
+
+       /**
+        * Commit
+        */
+       function flush();
+
+       /**
+        * Rollback
+        */
+       function discard();
 }
index a61f87a868cca08df0729562ba35cd43a29ad6b3..4b58a995e7ad942fe73fee0d4c7621e845e773df 100644 (file)
@@ -103,7 +103,9 @@ class AsyncExecutor extends Executor
                
                $context = $init();
                foreach (func_get_args() as $cb) {
-                       $then($context, $cb);
+                       if (is_callable($callback)) {
+                               $then($context, $cb);
+                       }
                }
                
                return array($context, function($result) use ($context, $done) {
@@ -117,7 +119,7 @@ class AsyncExecutor extends Executor
         * @param callable $callback result callback
         * @return mixed context created by the init callback
         */
-       function execute(WriterInterface $query, callable $callback) {
+       function execute(WriterInterface $query, callable $callback = null) {
                $this->result = null;
                $this->query = $query;
                $this->notify();
index e0bb169f3d191ad8a68fdd48e62472e72a12d294..3b97a6f2e6984f766efb03bd8db9ac1eabb2c0c2 100644 (file)
@@ -76,13 +76,13 @@ class Executor implements ExecutorInterface
         * @param callable $callback
         * @return mixed
         */
-       function execute(WriterInterface $query, callable $callback) {
+       function execute(WriterInterface $query, callable $callback = null) {
                $this->result = null;
                $this->query = $query;
                $this->notify();
                $this->result = $this->getConnection()->execParams($query, $query->getParams(), $query->getTypes());
                $this->notify();
-               return $callback($this->result);
+               return $callback ? $callback($this->result) : $this->result;
        }
        
        /**
index f3bc0be971b89d3c19092b98066f88c0cd7c3d3f..6c1bed140c4ed98cf03dac71b44555c3c958e889 100644 (file)
@@ -27,7 +27,7 @@ interface ExecutorInterface extends \SplSubject
         * @param callable $callback
         * @return mixed the result of the callback
         */
-       function execute(WriterInterface $query, callable $callback);
+       function execute(WriterInterface $query, callable $callback = null);
        
        /**
         * @return \pq\Query\WriterInterface
index 4d539a054d9ba5ae116080134a1e867e042758c0..a8c5bc1d4c189ad43e2e3a5f347eda80ca0f60d3 100644 (file)
@@ -70,7 +70,7 @@ class RowTest extends \PHPUnit_Framework_TestCase {
                executeInConcurrentTransaction(
                        $this->table->getQueryExecutor(),
                        "UPDATE {$this->table->getName()} SET data='bar' WHERE id=\$1", 
-                       array($row->id->get()));
+                       array($row->id->get()))->commit();
                $this->setExpectedException("\\UnexpectedValueException", "Row has already been modified");
                $row->update();
                $txn->commit();
@@ -93,7 +93,7 @@ class RowTest extends \PHPUnit_Framework_TestCase {
                executeInConcurrentTransaction(
                        $this->table->getQueryExecutor(), 
                        "UPDATE {$this->table->getName()} SET counter = 10 WHERE id=\$1", 
-                       array($row->id->get()));
+                       array($row->id->get()))->commit();
                $this->setExpectedException("\\UnexpectedValueException", "No row updated");
                $row->update();
        }
index fc63f18325ae7f176c49b23dc4de1ab08f9b6811..c57716269528a0741a5c9593615ef0303f0f5191 100644 (file)
@@ -68,4 +68,16 @@ class StorageTest extends PHPUnit_Framework_TestCase
                $this->mapper->mapOf(TestModel::class)->getObjects()->resetRow($obj);
                $this->assertCount(0, $this->storage->find(["id="=>$obj->id]));
        }
+
+       function testBuffer() {
+               $this->storage->buffer();
+               $this->assertEquals("yesterday", $this->storage->get(1)->data);
+               $this->mapper->mapOf(TestModel::class)->getObjects()->reset();
+               $exec = $this->mapper->mapOf(TestModel::class)->getGateway()->getQueryExecutor();
+               $xact = executeInConcurrentTransaction($exec, "UPDATE test SET data=\$2 WHERE id=\$1", [1, "the day before"]);
+               $this->assertEquals("yesterday", $this->storage->get(1)->data);
+               $xact->commit();
+               $this->storage->discard();
+               $this->assertEquals("the day before", $this->storage->get(1)->data);
+       }
 }
index 1dfa3dae41478d13cbaf5f090b042397dd093e73..753cb370c50b052f31820e7ac3020da48a2be31d 100644 (file)
@@ -50,11 +50,13 @@ SQL;
 
 require_once __DIR__ . "/../vendor/autoload.php";
 
-function executeInConcurrentTransaction(ExecutorInterface $exec, $sql, array $params = null) {
+function executeInConcurrentTransaction(ExecutorInterface $exec, $sql, array $params = array()) {
        $conn = $exec->getConnection();
-       $exec->setConnection(new Connection(PQ_TEST_DSN));
-       $exec->execute(new Writer($sql, $params), function(){});
+       $xact = (new Connection(PQ_TEST_DSN))->startTransaction();
+       $exec->setConnection($xact->connection);
+       $exec->execute(new Writer($sql, $params));
        $exec->setConnection($conn);
+       return $xact;
 }
 
 class QueryLogger implements SplObserver
@@ -83,10 +85,12 @@ class QueryLogger implements SplObserver
                                date_create()->format("Y-m-d H:i:s"),
                                json_encode($result));
                } elseif (($query = $executor->getQuery())) {
-                       fprintf($this->fp, "[%s] Q %s %% %s\n", 
+                       $executor->getConnection()->exec("SELECT pg_backend_pid()")->fetchCol($pid);
+                       fprintf($this->fp, "[%s] Q %s %% %s @%d\n",
                                date_create()->format("Y-m-d H:i:s"),
                                preg_replace("/\s+/", " ", $query), 
-                               json_encode($query->getParams()));
+                               json_encode($query->getParams()),
+                               $pid);
                }
        }
 }