From: Michael Wallner Date: Sat, 13 Apr 2013 11:47:42 +0000 (+0200) Subject: add identity and lock X-Git-Tag: v1.0.0~5 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=580991717f5e8bb237403757e2111a8d04aca616;p=m6w6%2Fpq-gateway add identity and lock --- diff --git a/composer.json b/composer.json index 84d86e1..8d6d33c 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,10 @@ "autoload": { "psr-0": { "pq\\Gateway": "lib", - "pq\\Query": "lib" + "pq\\Query": "lib" } - } + }, + "suggest": { + "reactphp/promise": "1.0.*" + } } diff --git a/lib/pq/Gateway/Row.php b/lib/pq/Gateway/Row.php index 2493373..bba7c09 100644 --- a/lib/pq/Gateway/Row.php +++ b/lib/pq/Gateway/Row.php @@ -2,6 +2,8 @@ namespace pq\Gateway; +use \pq\Query\Expr as QueryExpr; + class Row implements \JsonSerializable { /** @@ -93,6 +95,30 @@ class Row implements \JsonSerializable return $this->data; } + /** + * Get all column/value pairs to possibly uniquely identify this row + * @return array + * @throws \OutOfBoundsException if any primary key column is not present in the row + */ + function getIdentity() { + $cols = array(); + if (count($identity = $this->getTable()->getIdentity())) { + foreach ($identity as $col) { + if (!array_key_exists($col, $this->data)) { + throw new \OutOfBoundsException( + sprintf("Column '%s' does not exist in row of table '%s'", + $col, $this->getTable()->getName() + ) + ); + } + $cols[$col] = $this->data[$col]; + } + } else { + $cols = $this->data; + } + return $cols; + } + /** * Check whether the row contains modifications * @return boolean @@ -106,6 +132,10 @@ class Row implements \JsonSerializable return false; } + /** + * Refresh the rows data + * @return \pq\Gateway\Row + */ function refresh() { $this->data = $this->table->find($this->criteria(), null, 1, 0)->current()->data; $this->cell = array(); @@ -125,13 +155,21 @@ class Row implements \JsonSerializable } /** - * Transform data array to where criteria + * Transform the row's identity to where criteria * @return array */ protected function criteria() { $where = array(); - foreach($this->data as $k => $v) { - $where["$k="] = $v; + foreach ($this->getIdentity() as $col => $val) { + if (isset($val)) { + $where["$col="] = $val; + } else { + $where["$col IS"] = new QueryExpr("NULL"); + } + } + + if (($lock = $this->getTable()->getLock())) { + $lock->criteria($this, $where); } return $where; } @@ -210,7 +248,11 @@ class Row implements \JsonSerializable * @return \pq\Gateway\Row */ function create() { - $this->data = $this->table->create($this->changes())->current()->data; + $rowset = $this->table->create($this->changes()); + if (!count($rowset)) { + throw new \UnexpectedValueException("No row created"); + } + $this->data = $rowset->current()->data; $this->cell = array(); return $this; } @@ -220,7 +262,11 @@ class Row implements \JsonSerializable * @return \pq\Gateway\Row */ function update() { - $this->data = $this->table->update($this->criteria(), $this->changes())->current()->data; + $rowset = $this->table->update($this->criteria(), $this->changes()); + if (!count($rowset)) { + throw new \UnexpectedValueException("No row updated"); + } + $this->data = $rowset->current()->data; $this->cell = array(); return $this; } @@ -230,7 +276,11 @@ class Row implements \JsonSerializable * @return \pq\Gateway\Row */ function delete() { - $this->data = $this->table->delete($this->criteria(), "*")->current()->data; + $rowset = $this->table->delete($this->criteria(), "*"); + if (!count($rowset)) { + throw new \UnexpectedValueException("No row deleted"); + } + $this->data = $rowset->current()->data; return $this->prime(); } } diff --git a/lib/pq/Gateway/Table.php b/lib/pq/Gateway/Table.php index 8fc85d7..fe5efbd 100644 --- a/lib/pq/Gateway/Table.php +++ b/lib/pq/Gateway/Table.php @@ -47,6 +47,11 @@ class Table */ protected $exec; + /** + * @var \pq\Gateway\Table\Identity + */ + protected $identity; + /** * @var \pq\Gateway\Table\Relations */ @@ -57,6 +62,11 @@ class Table */ protected $metadataCache; + /** + * @var \pq\Gateway\Table\LockInterface + */ + protected $lock; + /** * @param string $table * @return \pq\Gateway\Table @@ -179,6 +189,17 @@ class Table return $this; } + /** + * Get the primary key + * @return \pq\Gateway\Table\Identity + */ + function getIdentity() { + if (!isset($this->identity)) { + $this->identity = new Table\Identity($this); + } + return $this->identity; + } + /** * Get foreign key relations * @param string $to fkey @@ -227,6 +248,24 @@ class Table return $this->name; } + /** + * Set a lock provider + * @param \pq\Gateway\Table\LockInterface $lock + * @return \pq\Gateway\Table + */ + function setLock(Table\LockInterface $lock) { + $this->lock = $lock; + return $this; + } + + /** + * Get any set lock provider + * @return \pq\Gateway\Table\LockIntferace + */ + function getLock() { + return $this->lock; + } + /** * Execute the query * @param \pq\Query\WriterInterface $query @@ -262,9 +301,10 @@ class Table * @param array|string $order * @param int $limit * @param int $offset + * @param string $lock * @return mixed */ - function find(array $where = null, $order = null, $limit = 0, $offset = 0) { + function find(array $where = null, $order = null, $limit = 0, $offset = 0, $lock = null) { $query = $this->getQueryWriter()->reset(); $query->write("SELECT * FROM", $this->conn->quoteName($this->name)); if ($where) { @@ -276,7 +316,12 @@ class Table if ($limit) { $query->write("LIMIT", $limit); } - $query->write("OFFSET", $offset); + if ($offset) { + $query->write("OFFSET", $offset); + } + if ($lock) { + $query->write("FOR", $lock); + } return $this->execute($query); } @@ -293,7 +338,7 @@ class Table // select * from $this where $this->$foreignColumn = $foreign->$referencedColumn if (!isset($name)) { - $name = $this->getName(); + $name = $foreign->getTable()->getName(); } if (!$foreign->getTable()->hasRelation($name, $this->getName())) { diff --git a/lib/pq/Gateway/Table/CacheInterface.php b/lib/pq/Gateway/Table/CacheInterface.php index c06b11b..5e30fac 100644 --- a/lib/pq/Gateway/Table/CacheInterface.php +++ b/lib/pq/Gateway/Table/CacheInterface.php @@ -2,6 +2,9 @@ namespace pq\Gateway\Table; +/** + * @codeCoverageIgnore + */ interface CacheInterface { /** diff --git a/lib/pq/Gateway/Table/Identity.php b/lib/pq/Gateway/Table/Identity.php new file mode 100644 index 0000000..fcc133b --- /dev/null +++ b/lib/pq/Gateway/Table/Identity.php @@ -0,0 +1,70 @@ +getMetadataCache(); + if (!($this->columns = $cache->get("$table#identity"))) { + $table->getQueryExecutor()->execute( + new \pq\Query\Writer(IDENTITY_SQL, array($table->getName())), + function($result) use($table, $cache) { + $this->columns = array_map("current", $result->fetchAll(\pq\Result::FETCH_ARRAY)); + $cache->set("$table#identity", $this->columns); + } + ); + } + } + + /** + * @implements \Countable + * @return int + */ + function count() { + return count($this->columns); + } + + /** + * @implements \IteratorAggregate + * @return \ArrayIterator + */ + function getIterator() { + return new \ArrayIterator($this->columns); + } + + /** + * Get the column names which the primary key contains + * @return array + */ + function getColumns() { + return $this->columns; + } +} diff --git a/lib/pq/Gateway/Table/LockInterface.php b/lib/pq/Gateway/Table/LockInterface.php new file mode 100644 index 0000000..ff8e858 --- /dev/null +++ b/lib/pq/Gateway/Table/LockInterface.php @@ -0,0 +1,10 @@ +column = $column; + } + + /** + * @implements LockInterface + * @param \pq\Gateway\Row $row + * @param array $where reference to the criteria + */ + function criteria(Row $row, array &$where) { + $where["{$this->column}="] = $row->getData()[$this->column]; + $row->{$this->column}->mod(+1); + } +} diff --git a/lib/pq/Gateway/Table/PessimisticLock.php b/lib/pq/Gateway/Table/PessimisticLock.php new file mode 100644 index 0000000..e711b46 --- /dev/null +++ b/lib/pq/Gateway/Table/PessimisticLock.php @@ -0,0 +1,35 @@ +getIdentity() as $col => $val) { + if (isset($val)) { + $where["$col="] = $val; + } else { + $where["$col IS"] = new QueryExpr("NULL"); + } + } + + if (1 != count($rowset = $row->getTable()->find($where, null, 0, 0, "update nowait"))) { + throw new \UnexpectedValueException("Failed to select a single row"); + } + if ($rowset->current()->getData() != $row->getData()) { + throw new \UnexpectedValueException("Row has already been modified"); + } + } +} \ No newline at end of file diff --git a/lib/pq/Gateway/Table/Relations.php b/lib/pq/Gateway/Table/Relations.php index 128dca7..ab74d40 100644 --- a/lib/pq/Gateway/Table/Relations.php +++ b/lib/pq/Gateway/Table/Relations.php @@ -30,17 +30,26 @@ order by ,att1.attnum SQL; +/** + * A foreighn key implementation + */ class Relations { - public $references; + /** + * @var array + */ + protected $references; function __construct(Table $table) { $cache = $table->getMetadataCache(); - if (!($this->references = $cache->get("$table:references"))) { - $this->references = $table->getConnection() - ->execParams(RELATION_SQL, array($table->getName())) - ->map(array(0,1), array(2,3,4), \pq\Result::FETCH_OBJECT); - $cache->set("$table:references", $this->references); + if (!($this->references = $cache->get("$table#relations"))) { + $table->getQueryExecutor()->execute( + new \pq\Query\Writer(RELATION_SQL, array($table->getName())), + function($result) use($table, $cache) { + $this->references = $result->map(array(0,1), array(2,3,4), \pq\Result::FETCH_OBJECT); + $cache->set("$table#relations", $this->references); + } + ); } } diff --git a/lib/pq/Query/AsyncExecutor.php b/lib/pq/Query/AsyncExecutor.php new file mode 100644 index 0000000..737c0eb --- /dev/null +++ b/lib/pq/Query/AsyncExecutor.php @@ -0,0 +1,38 @@ +result = null; + $this->query = $query; + $this->notify(); + + $deferred = new Deferred; + $this->getConnection()->execParamsAsync($query, $query->getParams(), $query->getTypes(), + array($deferred->resolver(), "resolve")); + + return $deferred->then(function($result) { + $this->result = $result; + $this->notify(); + })->then($callback); + } +} diff --git a/lib/pq/Query/Executor.php b/lib/pq/Query/Executor.php index a01580a..d1c7521 100644 --- a/lib/pq/Query/Executor.php +++ b/lib/pq/Query/Executor.php @@ -12,12 +12,28 @@ class Executor implements ExecutorInterface */ protected $conn; + /** + * @var \SplObjectStorage + */ + protected $observers; + + /** + * @var WriterInterface + */ + protected $query; + + /** + * @var \pq\Result + */ + protected $result; + /** * Create a synchronous query executor * @param \pq\Connection $conn */ function __construct(\pq\Connection $conn) { $this->conn = $conn; + $this->observers = new \SplObjectStorage; } /** @@ -38,6 +54,22 @@ class Executor implements ExecutorInterface return $this; } + /** + * @inheritdoc + * @return WriterInterface + */ + function getQuery() { + return $this->query; + } + + /** + * @inheritdoc + * @return \pq\Result + */ + function getResult() { + return $this->result; + } + /** * Execute the query synchronously through \pq\Connection::execParams() * @param \pq\Query\WriterInterface $query @@ -45,6 +77,36 @@ class Executor implements ExecutorInterface * @return mixed */ function execute(WriterInterface $query, callable $callback) { - return $callback($this->getConnection()->execParams($query, $query->getParams(), $query->getTypes())); + $this->result = null; + $this->query = $query; + $this->notify(); + $this->result = $this->getConnection()->execParams($query, $query->getParams(), $query->getTypes()); + $this->notify(); + return $callback($this->result); + } + + /** + * @implements \SplSubject + * @param \SplObserver $observer + */ + function attach(\SplObserver $observer) { + $this->observers->attach($observer); + } + + /** + * @implements \SplSubject + * @param \SplObserver $observer + */ + function detach(\SplObserver $observer) { + $this->observers->detach($observer); + } + + /** + * @implements \SplSubject + */ + function notify() { + foreach ($this->observers as $observer){ + $observer->update($this); + } } } diff --git a/lib/pq/Query/Executor/Async.php b/lib/pq/Query/Executor/Async.php deleted file mode 100644 index d41f53e..0000000 --- a/lib/pq/Query/Executor/Async.php +++ /dev/null @@ -1,55 +0,0 @@ -conn = $conn; - } - - /** - * Get the connection - * @return \pq\Connection - */ - function getConnection() { - return $this->conn; - } - - /** - * Set the connection - * @param \pq\Connection $conn - * @return \pq\Query\Executor\Async - */ - function setConnection(\pq\Connection $conn) { - $this->conn = $conn; - return $this; - } - - /** - * Execute the query asynchronously through \pq\Connection::execParamsAsync() - * @param \pq\Query\WriterInterface $query - * @param callable $callback - * @return \React\Promise\DeferredPromise - */ - function execute(WriterInterface $query, callable $callback) { - $deferred = new Deferred; // FIXME - $this->getConnection()->execParamsAsync($query, $query->getParams(), $query->getTypes(), - array($deferred->resolver(), "resolve")); - return $deferred->then($callback); - } -} diff --git a/lib/pq/Query/ExecutorInterface.php b/lib/pq/Query/ExecutorInterface.php index 141ac23..906404f 100644 --- a/lib/pq/Query/ExecutorInterface.php +++ b/lib/pq/Query/ExecutorInterface.php @@ -4,8 +4,9 @@ namespace pq\Query; /** * An executor of \pq\Query\Writer queries + * @codeCoverageIgnore */ -interface ExecutorInterface +interface ExecutorInterface extends \SplSubject { /** * Get the connection @@ -27,4 +28,15 @@ interface ExecutorInterface * @return mixed the result of the callback */ function execute(WriterInterface $query, callable $callback); + + /** + * @return WriterInterface + */ + function getQuery(); + + /** + * @return \pq\Result + */ + function getResult(); + } diff --git a/lib/pq/Query/ExpressibleInterface.php b/lib/pq/Query/ExpressibleInterface.php index 1009583..a1c2ffe 100644 --- a/lib/pq/Query/ExpressibleInterface.php +++ b/lib/pq/Query/ExpressibleInterface.php @@ -2,6 +2,9 @@ namespace pq\Query; +/** + * @codeCoverageIgnore + */ interface ExpressibleInterface { /** diff --git a/lib/pq/Query/Writer.php b/lib/pq/Query/Writer.php index 6e06945..b5f3ab2 100644 --- a/lib/pq/Query/Writer.php +++ b/lib/pq/Query/Writer.php @@ -48,7 +48,13 @@ class Writer implements WriterInterface * @return string */ protected function reduce($q, $v) { - return $q . " " . (is_array($v) ? implode(", ", $v) : $v); + if (is_array($v)) { + $v = implode(", ", $v); + } + if (strlen($q)) { + $q .= " "; + } + return $q . $v; } /** @@ -83,6 +89,9 @@ class Writer implements WriterInterface * @return \pq\Query\Writer */ function write() { + if (strlen($this->query)) { + $this->query .= " "; + } $this->query .= array_reduce(func_get_args(), array($this, "reduce")); return $this; } diff --git a/lib/pq/Query/WriterInterface.php b/lib/pq/Query/WriterInterface.php index 885d4ca..35b6c48 100644 --- a/lib/pq/Query/WriterInterface.php +++ b/lib/pq/Query/WriterInterface.php @@ -4,6 +4,7 @@ namespace pq\Query; /** * A query writer which supports easily constructing queries for \pq\Connection::execParams() + * @codeCoverageIgnore */ interface WriterInterface { diff --git a/tests/lib/pq/Gateway/CellTest.php b/tests/lib/pq/Gateway/CellTest.php index d7a8fc6..9dc9680 100644 --- a/tests/lib/pq/Gateway/CellTest.php +++ b/tests/lib/pq/Gateway/CellTest.php @@ -23,6 +23,7 @@ class CellTest extends \PHPUnit_Framework_TestCase { $this->conn->exec(PQ_TEST_DATA); Table::$defaultConnection = $this->conn; $this->table = new Table("test"); + $this->table->getQueryExecutor()->attach(new \QueryLogger()); } protected function tearDown() { diff --git a/tests/lib/pq/Gateway/RowTest.php b/tests/lib/pq/Gateway/RowTest.php index fc08094..00dbbbb 100644 --- a/tests/lib/pq/Gateway/RowTest.php +++ b/tests/lib/pq/Gateway/RowTest.php @@ -23,6 +23,7 @@ class RowTest extends \PHPUnit_Framework_TestCase { $this->conn->exec(PQ_TEST_DATA); Table::$defaultConnection = $this->conn; $this->table = new Table("test"); + $this->table->getQueryExecutor()->attach(new \QueryLogger()); } protected function tearDown() { @@ -51,4 +52,59 @@ class RowTest extends \PHPUnit_Framework_TestCase { $row = new Row($this->table); $this->assertSame($this->table, $row->getTable()); } + + function testPessimisticLock() { + $this->table->setLock(new Table\PessimisticLock); + $txn = $this->table->getConnection()->startTransaction(); + $row = $this->table->find(null, null, 1)->current(); + $row->data = "foo"; + $row->update(); + $txn->commit(); + $this->assertSame("foo", $row->data->get()); + } + + function testPessimisticLockFail() { + $this->table->setLock(new Table\PessimisticLock); + $txn = $this->table->getConnection()->startTransaction(); + $row = $this->table->find(null, null, 1)->current(); + $row->data = "foo"; + executeInConcurrentTransaction( + $this->table->getQueryExecutor(), + "UPDATE {$this->table->getName()} SET data='bar' WHERE id=\$1", + array($row->id->get())); + $this->setExpectedException("\\UnexpectedValueException", "Row has already been modified"); + $row->update(); + $txn->commit(); + } + + function testOptimisticLock() { + $this->table->setLock(new Table\OptimisticLock("counter")); + $row = $this->table->find(null, null, 1)->current(); + $cnt = $row->counter->get(); + $row->data = "foo"; + $row->update(); + $this->assertEquals("foo", $row->data->get()); + $this->assertEquals($cnt +1, $row->counter->get()); + } + + function testOptimisticLockFail() { + $this->table->setLock(new Table\OptimisticLock("counter")); + $row = $this->table->find(null, null, 1)->current(); + $cnt = $row->counter->get(); + $row->data = "foo"; + executeInConcurrentTransaction( + $this->table->getQueryExecutor(), + "UPDATE {$this->table->getName()} SET counter = 10 WHERE id=\$1", + array($row->id->get())); + $this->setExpectedException("\\UnexpectedValueException", "No row updated"); + $row->update(); + } + + function testRef() { + foreach ($this->table->find() as $row) { + foreach ($row->reftest() as $ref) { + $this->assertEquals($row->id->get(), $ref->test->id->get()); + } + } + } } diff --git a/tests/lib/pq/Gateway/RowsetTest.php b/tests/lib/pq/Gateway/RowsetTest.php index 48ac624..037192a 100644 --- a/tests/lib/pq/Gateway/RowsetTest.php +++ b/tests/lib/pq/Gateway/RowsetTest.php @@ -23,6 +23,7 @@ class RowsetTest extends \PHPUnit_Framework_TestCase { $this->conn->exec(PQ_TEST_DATA); Table::$defaultConnection = $this->conn; $this->table = new Table("test"); + $this->table->getQueryExecutor()->attach(new \QueryLogger()); } protected function tearDown() { @@ -107,7 +108,7 @@ class RowsetTest extends \PHPUnit_Framework_TestCase { } public function testDeleteFail() { - $this->setExpectedException("pq\\Exception"); + $this->setExpectedException("Exception"); $rowset = new Rowset($this->table); $rowset->append(new Row($this->table, array("xx" => 0)))->delete(); } diff --git a/tests/lib/pq/Gateway/TableTest.php b/tests/lib/pq/Gateway/TableTest.php index a5fb56f..e1b6687 100644 --- a/tests/lib/pq/Gateway/TableTest.php +++ b/tests/lib/pq/Gateway/TableTest.php @@ -23,6 +23,7 @@ class TableTest extends \PHPUnit_Framework_TestCase { $this->conn->exec(PQ_TEST_DATA); Table::$defaultConnection = $this->conn; $this->table = new Table("test"); + $this->table->getQueryExecutor()->attach(new \QueryLogger()); } protected function tearDown() { diff --git a/tests/setup.inc b/tests/setup.inc index ee9733c..bec6c71 100644 --- a/tests/setup.inc +++ b/tests/setup.inc @@ -42,3 +42,49 @@ SQL; spl_autoload_register(function($c) { if (substr($c,0,3) == "pq\\") return require_once sprintf("%s/../lib/%s.php", __DIR__, strtr($c, "\\", "/")); }); + +function executeInConcurrentTransaction(\pq\Query\ExecutorInterface $exec, $sql, array $params = null) { + $conn = $exec->getConnection(); + $exec->setConnection(new pq\Connection(PQ_TEST_DSN)); + $exec->execute(new \pq\Query\Writer($sql, $params), function(){}); + $exec->setConnection($conn); +} + +class QueryLogger implements \SplObserver +{ + protected $fp; + + function __construct($logfile = null) { + if (!isset($logfile)) { + $logfile = __DIR__."/query.log"; + } + if (!$this->fp = @fopen($logfile, "a")) { + throw new \RuntimeException(error_get_last()["message"]); + } + } + + function __destruct() { + if (is_resource($this->fp)) { + fclose($this->fp); + } + } + + function update(\SplSubject $executor) { + $result = $executor->getResult(); + if (isset($result)) { + fprintf($this->fp, "[%s] R %s\n", + date_create()->format("Y-m-d H:i:s"), + json_encode([ + "S" => $result->statusMessage, + "N" => $result->numRows, + "C" => $result->numCols, + "A" => $result->affectedRows + ])); + } elseif (($query = $executor->getQuery())) { + fprintf($this->fp, "[%s] Q %s %% %s\n", + date_create()->format("Y-m-d H:i:s"), + preg_replace("/\s+/", " ", $query), + json_encode($query->getParams())); + } + } +}