From 5c080bb03b5fee867da94cb3c1108d59529f741a Mon Sep 17 00:00:00 2001 From: Michael Wallner Date: Wed, 23 Sep 2015 12:51:39 +0200 Subject: [PATCH] storage test --- lib/pq/Mapper/Storage.php | 70 +++++++++++++++++++++++++++++ lib/pq/Mapper/StorageInterface.php | 22 +++++++++ lib/pq/Query/AsyncExecutor.php | 6 ++- lib/pq/Query/Executor.php | 4 +- lib/pq/Query/ExecutorInterface.php | 2 +- tests/lib/pq/Gateway/RowTest.php | 4 +- tests/lib/pq/Mapper/StorageTest.php | 12 +++++ tests/setup.inc | 14 +++--- 8 files changed, 122 insertions(+), 12 deletions(-) diff --git a/lib/pq/Mapper/Storage.php b/lib/pq/Mapper/Storage.php index 8016ef8..4882eda 100644 --- a/lib/pq/Mapper/Storage.php +++ b/lib/pq/Mapper/Storage.php @@ -2,7 +2,10 @@ namespace pq\Mapper; +use InvalidArgumentException; +use pq\Connection; use pq\Gateway\Table; +use pq\Transaction; class Storage implements StorageInterface { @@ -18,6 +21,12 @@ class Storage implements StorageInterface */ private $gateway; + /** + * Buffered transaction + * @var Transaction + */ + private $xaction; + /** * Create a storage for $map * @param MapInterface $map @@ -27,6 +36,30 @@ class Storage implements StorageInterface $this->gateway = $map->getGateway(); } + /** + * Find by PK + * @param mixed $pk + * @return object + */ + function get($pk) { + $id = $this->gateway->getIdentity(); + if (count($id) == 1 && is_scalar($pk)) { + $pk = [current($id->getColumns()) => $pk]; + } elseif (!is_array($pk) || count($pk) !== count($id)) { + throw InvalidArgumentException( + "Insufficient identity provided; not all fields of %s are provided in %s", + json_encode($id->getColumns()), json_encode($pk)); + } + + $where = []; + foreach ($pk as $k => $v) { + $where["$k="] = $v; + } + $rowset = $this->gateway->find($where); + + return $this->map->map($rowset->current()); + } + /** * Find * @param array $where @@ -59,4 +92,41 @@ class Storage implements StorageInterface function save($object) { $this->map->unmap($object); } + + /** + * Buffer in a transaction + */ + function buffer() { + switch ($this->gateway->getConnection()->transactionStatus) { + case Connection::TRANS_INTRANS: + break; + default: + $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("START TRANSACTION")); + } + } + + /** + * Commit + */ + function flush() { + switch ($this->gateway->getConnection()->transactionStatus) { + case Connection::TRANS_IDLE: + break; + default: + $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("COMMIT")); + } + } + + /** + * Rollback + */ + function discard() { + switch ($this->gateway->getConnection()->transactionStatus) { + case Connection::TRANS_IDLE: + break; + default: + $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("ROLLBACK")); + } + $this->map->getObjects()->reset(); + } } diff --git a/lib/pq/Mapper/StorageInterface.php b/lib/pq/Mapper/StorageInterface.php index bb5c94a..eb75f63 100644 --- a/lib/pq/Mapper/StorageInterface.php +++ b/lib/pq/Mapper/StorageInterface.php @@ -4,6 +4,13 @@ namespace pq\Mapper; interface StorageInterface { + /** + * Find by PK + * @param mixed $pk + * @return object + */ + function get($pk); + /** * Find * @param array $where @@ -25,4 +32,19 @@ interface StorageInterface * @param object $object */ function save($object); + + /** + * Buffer in a transaction + */ + function buffer(); + + /** + * Commit + */ + function flush(); + + /** + * Rollback + */ + function discard(); } diff --git a/lib/pq/Query/AsyncExecutor.php b/lib/pq/Query/AsyncExecutor.php index a61f87a..4b58a99 100644 --- a/lib/pq/Query/AsyncExecutor.php +++ b/lib/pq/Query/AsyncExecutor.php @@ -103,7 +103,9 @@ class AsyncExecutor extends Executor $context = $init(); foreach (func_get_args() as $cb) { - $then($context, $cb); + if (is_callable($callback)) { + $then($context, $cb); + } } return array($context, function($result) use ($context, $done) { @@ -117,7 +119,7 @@ class AsyncExecutor extends Executor * @param callable $callback result callback * @return mixed context created by the init callback */ - function execute(WriterInterface $query, callable $callback) { + function execute(WriterInterface $query, callable $callback = null) { $this->result = null; $this->query = $query; $this->notify(); diff --git a/lib/pq/Query/Executor.php b/lib/pq/Query/Executor.php index e0bb169..3b97a6f 100644 --- a/lib/pq/Query/Executor.php +++ b/lib/pq/Query/Executor.php @@ -76,13 +76,13 @@ class Executor implements ExecutorInterface * @param callable $callback * @return mixed */ - function execute(WriterInterface $query, callable $callback) { + function execute(WriterInterface $query, callable $callback = null) { $this->result = null; $this->query = $query; $this->notify(); $this->result = $this->getConnection()->execParams($query, $query->getParams(), $query->getTypes()); $this->notify(); - return $callback($this->result); + return $callback ? $callback($this->result) : $this->result; } /** diff --git a/lib/pq/Query/ExecutorInterface.php b/lib/pq/Query/ExecutorInterface.php index f3bc0be..6c1bed1 100644 --- a/lib/pq/Query/ExecutorInterface.php +++ b/lib/pq/Query/ExecutorInterface.php @@ -27,7 +27,7 @@ interface ExecutorInterface extends \SplSubject * @param callable $callback * @return mixed the result of the callback */ - function execute(WriterInterface $query, callable $callback); + function execute(WriterInterface $query, callable $callback = null); /** * @return \pq\Query\WriterInterface diff --git a/tests/lib/pq/Gateway/RowTest.php b/tests/lib/pq/Gateway/RowTest.php index 4d539a0..a8c5bc1 100644 --- a/tests/lib/pq/Gateway/RowTest.php +++ b/tests/lib/pq/Gateway/RowTest.php @@ -70,7 +70,7 @@ class RowTest extends \PHPUnit_Framework_TestCase { executeInConcurrentTransaction( $this->table->getQueryExecutor(), "UPDATE {$this->table->getName()} SET data='bar' WHERE id=\$1", - array($row->id->get())); + array($row->id->get()))->commit(); $this->setExpectedException("\\UnexpectedValueException", "Row has already been modified"); $row->update(); $txn->commit(); @@ -93,7 +93,7 @@ class RowTest extends \PHPUnit_Framework_TestCase { executeInConcurrentTransaction( $this->table->getQueryExecutor(), "UPDATE {$this->table->getName()} SET counter = 10 WHERE id=\$1", - array($row->id->get())); + array($row->id->get()))->commit(); $this->setExpectedException("\\UnexpectedValueException", "No row updated"); $row->update(); } diff --git a/tests/lib/pq/Mapper/StorageTest.php b/tests/lib/pq/Mapper/StorageTest.php index fc63f18..c577162 100644 --- a/tests/lib/pq/Mapper/StorageTest.php +++ b/tests/lib/pq/Mapper/StorageTest.php @@ -68,4 +68,16 @@ class StorageTest extends PHPUnit_Framework_TestCase $this->mapper->mapOf(TestModel::class)->getObjects()->resetRow($obj); $this->assertCount(0, $this->storage->find(["id="=>$obj->id])); } + + function testBuffer() { + $this->storage->buffer(); + $this->assertEquals("yesterday", $this->storage->get(1)->data); + $this->mapper->mapOf(TestModel::class)->getObjects()->reset(); + $exec = $this->mapper->mapOf(TestModel::class)->getGateway()->getQueryExecutor(); + $xact = executeInConcurrentTransaction($exec, "UPDATE test SET data=\$2 WHERE id=\$1", [1, "the day before"]); + $this->assertEquals("yesterday", $this->storage->get(1)->data); + $xact->commit(); + $this->storage->discard(); + $this->assertEquals("the day before", $this->storage->get(1)->data); + } } diff --git a/tests/setup.inc b/tests/setup.inc index 1dfa3da..753cb37 100644 --- a/tests/setup.inc +++ b/tests/setup.inc @@ -50,11 +50,13 @@ SQL; require_once __DIR__ . "/../vendor/autoload.php"; -function executeInConcurrentTransaction(ExecutorInterface $exec, $sql, array $params = null) { +function executeInConcurrentTransaction(ExecutorInterface $exec, $sql, array $params = array()) { $conn = $exec->getConnection(); - $exec->setConnection(new Connection(PQ_TEST_DSN)); - $exec->execute(new Writer($sql, $params), function(){}); + $xact = (new Connection(PQ_TEST_DSN))->startTransaction(); + $exec->setConnection($xact->connection); + $exec->execute(new Writer($sql, $params)); $exec->setConnection($conn); + return $xact; } class QueryLogger implements SplObserver @@ -83,10 +85,12 @@ class QueryLogger implements SplObserver date_create()->format("Y-m-d H:i:s"), json_encode($result)); } elseif (($query = $executor->getQuery())) { - fprintf($this->fp, "[%s] Q %s %% %s\n", + $executor->getConnection()->exec("SELECT pg_backend_pid()")->fetchCol($pid); + fprintf($this->fp, "[%s] Q %s %% %s @%d\n", date_create()->format("Y-m-d H:i:s"), preg_replace("/\s+/", " ", $query), - json_encode($query->getParams())); + json_encode($query->getParams()), + $pid); } } } -- 2.30.2