"autoload": {
"psr-0": {
"pq\\Gateway": "lib",
- "pq\\Query": "lib"
+ "pq\\Query": "lib"
}
- }
+ },
+ "suggest": {
+ "reactphp/promise": "1.0.*"
+ }
}
namespace pq\Gateway;
+use \pq\Query\Expr as QueryExpr;
+
class Row implements \JsonSerializable
{
/**
return $this->data;
}
+ /**
+ * Get all column/value pairs to possibly uniquely identify this row
+ * @return array
+ * @throws \OutOfBoundsException if any primary key column is not present in the row
+ */
+ function getIdentity() {
+ $cols = array();
+ if (count($identity = $this->getTable()->getIdentity())) {
+ foreach ($identity as $col) {
+ if (!array_key_exists($col, $this->data)) {
+ throw new \OutOfBoundsException(
+ sprintf("Column '%s' does not exist in row of table '%s'",
+ $col, $this->getTable()->getName()
+ )
+ );
+ }
+ $cols[$col] = $this->data[$col];
+ }
+ } else {
+ $cols = $this->data;
+ }
+ return $cols;
+ }
+
/**
* Check whether the row contains modifications
* @return boolean
return false;
}
+ /**
+ * Refresh the rows data
+ * @return \pq\Gateway\Row
+ */
function refresh() {
$this->data = $this->table->find($this->criteria(), null, 1, 0)->current()->data;
$this->cell = array();
}
/**
- * Transform data array to where criteria
+ * Transform the row's identity to where criteria
* @return array
*/
protected function criteria() {
$where = array();
- foreach($this->data as $k => $v) {
- $where["$k="] = $v;
+ foreach ($this->getIdentity() as $col => $val) {
+ if (isset($val)) {
+ $where["$col="] = $val;
+ } else {
+ $where["$col IS"] = new QueryExpr("NULL");
+ }
+ }
+
+ if (($lock = $this->getTable()->getLock())) {
+ $lock->criteria($this, $where);
}
return $where;
}
* @return \pq\Gateway\Row
*/
function create() {
- $this->data = $this->table->create($this->changes())->current()->data;
+ $rowset = $this->table->create($this->changes());
+ if (!count($rowset)) {
+ throw new \UnexpectedValueException("No row created");
+ }
+ $this->data = $rowset->current()->data;
$this->cell = array();
return $this;
}
* @return \pq\Gateway\Row
*/
function update() {
- $this->data = $this->table->update($this->criteria(), $this->changes())->current()->data;
+ $rowset = $this->table->update($this->criteria(), $this->changes());
+ if (!count($rowset)) {
+ throw new \UnexpectedValueException("No row updated");
+ }
+ $this->data = $rowset->current()->data;
$this->cell = array();
return $this;
}
* @return \pq\Gateway\Row
*/
function delete() {
- $this->data = $this->table->delete($this->criteria(), "*")->current()->data;
+ $rowset = $this->table->delete($this->criteria(), "*");
+ if (!count($rowset)) {
+ throw new \UnexpectedValueException("No row deleted");
+ }
+ $this->data = $rowset->current()->data;
return $this->prime();
}
}
*/
protected $exec;
+ /**
+ * @var \pq\Gateway\Table\Identity
+ */
+ protected $identity;
+
/**
* @var \pq\Gateway\Table\Relations
*/
*/
protected $metadataCache;
+ /**
+ * @var \pq\Gateway\Table\LockInterface
+ */
+ protected $lock;
+
/**
* @param string $table
* @return \pq\Gateway\Table
return $this;
}
+ /**
+ * Get the primary key
+ * @return \pq\Gateway\Table\Identity
+ */
+ function getIdentity() {
+ if (!isset($this->identity)) {
+ $this->identity = new Table\Identity($this);
+ }
+ return $this->identity;
+ }
+
/**
* Get foreign key relations
* @param string $to fkey
return $this->name;
}
+ /**
+ * Set a lock provider
+ * @param \pq\Gateway\Table\LockInterface $lock
+ * @return \pq\Gateway\Table
+ */
+ function setLock(Table\LockInterface $lock) {
+ $this->lock = $lock;
+ return $this;
+ }
+
+ /**
+ * Get any set lock provider
+ * @return \pq\Gateway\Table\LockIntferace
+ */
+ function getLock() {
+ return $this->lock;
+ }
+
/**
* Execute the query
* @param \pq\Query\WriterInterface $query
* @param array|string $order
* @param int $limit
* @param int $offset
+ * @param string $lock
* @return mixed
*/
- function find(array $where = null, $order = null, $limit = 0, $offset = 0) {
+ function find(array $where = null, $order = null, $limit = 0, $offset = 0, $lock = null) {
$query = $this->getQueryWriter()->reset();
$query->write("SELECT * FROM", $this->conn->quoteName($this->name));
if ($where) {
if ($limit) {
$query->write("LIMIT", $limit);
}
- $query->write("OFFSET", $offset);
+ if ($offset) {
+ $query->write("OFFSET", $offset);
+ }
+ if ($lock) {
+ $query->write("FOR", $lock);
+ }
return $this->execute($query);
}
// select * from $this where $this->$foreignColumn = $foreign->$referencedColumn
if (!isset($name)) {
- $name = $this->getName();
+ $name = $foreign->getTable()->getName();
}
if (!$foreign->getTable()->hasRelation($name, $this->getName())) {
namespace pq\Gateway\Table;
+/**
+ * @codeCoverageIgnore
+ */
interface CacheInterface
{
/**
--- /dev/null
+<?php
+
+namespace pq\Gateway\Table;
+
+use \pq\Gateway\Table;
+
+const IDENTITY_SQL = <<<SQL
+select
+ a.attname as column
+from pg_class c
+ join pg_index i on c.oid = i.indrelid
+ join pg_attribute a on c.oid = a.attrelid
+where
+ c.relname = \$1
+ and a.attnum = any(i.indkey)
+ and i.indisprimary
+order by
+ a.attnum
+SQL;
+
+/**
+ * A primary key implementation
+ */
+class Identity implements \Countable, \IteratorAggregate
+{
+ /**
+ * @var array
+ */
+ protected $columns = array();
+
+ /**
+ * @param \pq\Gateway\Table $table
+ */
+ function __construct(Table $table) {
+ $cache = $table->getMetadataCache();
+ if (!($this->columns = $cache->get("$table#identity"))) {
+ $table->getQueryExecutor()->execute(
+ new \pq\Query\Writer(IDENTITY_SQL, array($table->getName())),
+ function($result) use($table, $cache) {
+ $this->columns = array_map("current", $result->fetchAll(\pq\Result::FETCH_ARRAY));
+ $cache->set("$table#identity", $this->columns);
+ }
+ );
+ }
+ }
+
+ /**
+ * @implements \Countable
+ * @return int
+ */
+ function count() {
+ return count($this->columns);
+ }
+
+ /**
+ * @implements \IteratorAggregate
+ * @return \ArrayIterator
+ */
+ function getIterator() {
+ return new \ArrayIterator($this->columns);
+ }
+
+ /**
+ * Get the column names which the primary key contains
+ * @return array
+ */
+ function getColumns() {
+ return $this->columns;
+ }
+}
--- /dev/null
+<?php
+
+namespace pq\Gateway\Table;
+
+use \pq\Gateway\Row;
+
+interface LockInterface
+{
+ function criteria(Row $row, array &$where);
+}
--- /dev/null
+<?php
+
+namespace pq\Gateway\Table;
+
+use \pq\Gateway\Row;
+
+/**
+ * An optimistic row lock implementation using a versioning column
+ */
+class OptimisticLock implements LockInterface
+{
+ /**
+ * The name of the versioning column
+ * @var string
+ */
+ protected $column;
+
+ /**
+ * @param string $column
+ */
+ function __construct($column = "version") {
+ $this->column = $column;
+ }
+
+ /**
+ * @implements LockInterface
+ * @param \pq\Gateway\Row $row
+ * @param array $where reference to the criteria
+ */
+ function criteria(Row $row, array &$where) {
+ $where["{$this->column}="] = $row->getData()[$this->column];
+ $row->{$this->column}->mod(+1);
+ }
+}
--- /dev/null
+<?php
+
+namespace pq\Gateway\Table;
+
+use \pq\Gateway\Row;
+
+/**
+ * A pessimistic row lock implementation using an additional SELECT FOR UPDATE
+ */
+class PessimisticLock implements LockInterface
+{
+ /**
+ * @inheritdoc
+ * @param \pq\Gateway\Row $row
+ * @param array $ignore
+ * @throws \UnexpectedValueException if the row has already been modified
+ */
+ function criteria(Row $row, array &$ignore) {
+ $where = array();
+ foreach ($row->getIdentity() as $col => $val) {
+ if (isset($val)) {
+ $where["$col="] = $val;
+ } else {
+ $where["$col IS"] = new QueryExpr("NULL");
+ }
+ }
+
+ if (1 != count($rowset = $row->getTable()->find($where, null, 0, 0, "update nowait"))) {
+ throw new \UnexpectedValueException("Failed to select a single row");
+ }
+ if ($rowset->current()->getData() != $row->getData()) {
+ throw new \UnexpectedValueException("Row has already been modified");
+ }
+ }
+}
\ No newline at end of file
,att1.attnum
SQL;
+/**
+ * A foreighn key implementation
+ */
class Relations
{
- public $references;
+ /**
+ * @var array
+ */
+ protected $references;
function __construct(Table $table) {
$cache = $table->getMetadataCache();
- if (!($this->references = $cache->get("$table:references"))) {
- $this->references = $table->getConnection()
- ->execParams(RELATION_SQL, array($table->getName()))
- ->map(array(0,1), array(2,3,4), \pq\Result::FETCH_OBJECT);
- $cache->set("$table:references", $this->references);
+ if (!($this->references = $cache->get("$table#relations"))) {
+ $table->getQueryExecutor()->execute(
+ new \pq\Query\Writer(RELATION_SQL, array($table->getName())),
+ function($result) use($table, $cache) {
+ $this->references = $result->map(array(0,1), array(2,3,4), \pq\Result::FETCH_OBJECT);
+ $cache->set("$table#relations", $this->references);
+ }
+ );
}
}
--- /dev/null
+<?php
+
+namespace pq\Query\Executor;
+
+use \pq\Query\Executor;
+use \pq\Query\WriterInterface;
+
+/**
+ * @requires \React\Promise
+ */
+use \React\Promise\Deferred;
+
+/**
+ * An asynchronous query executor
+ */
+class Async extends Executor
+{
+ /**
+ * Execute the query asynchronously through \pq\Connection::execParamsAsync()
+ * @param \pq\Query\WriterInterface $query
+ * @param callable $callback
+ * @return \React\Promise\DeferredPromise
+ */
+ function execute(WriterInterface $query, callable $callback) {
+ $this->result = null;
+ $this->query = $query;
+ $this->notify();
+
+ $deferred = new Deferred;
+ $this->getConnection()->execParamsAsync($query, $query->getParams(), $query->getTypes(),
+ array($deferred->resolver(), "resolve"));
+
+ return $deferred->then(function($result) {
+ $this->result = $result;
+ $this->notify();
+ })->then($callback);
+ }
+}
*/
protected $conn;
+ /**
+ * @var \SplObjectStorage
+ */
+ protected $observers;
+
+ /**
+ * @var WriterInterface
+ */
+ protected $query;
+
+ /**
+ * @var \pq\Result
+ */
+ protected $result;
+
/**
* Create a synchronous query executor
* @param \pq\Connection $conn
*/
function __construct(\pq\Connection $conn) {
$this->conn = $conn;
+ $this->observers = new \SplObjectStorage;
}
/**
return $this;
}
+ /**
+ * @inheritdoc
+ * @return WriterInterface
+ */
+ function getQuery() {
+ return $this->query;
+ }
+
+ /**
+ * @inheritdoc
+ * @return \pq\Result
+ */
+ function getResult() {
+ return $this->result;
+ }
+
/**
* Execute the query synchronously through \pq\Connection::execParams()
* @param \pq\Query\WriterInterface $query
* @return mixed
*/
function execute(WriterInterface $query, callable $callback) {
- return $callback($this->getConnection()->execParams($query, $query->getParams(), $query->getTypes()));
+ $this->result = null;
+ $this->query = $query;
+ $this->notify();
+ $this->result = $this->getConnection()->execParams($query, $query->getParams(), $query->getTypes());
+ $this->notify();
+ return $callback($this->result);
+ }
+
+ /**
+ * @implements \SplSubject
+ * @param \SplObserver $observer
+ */
+ function attach(\SplObserver $observer) {
+ $this->observers->attach($observer);
+ }
+
+ /**
+ * @implements \SplSubject
+ * @param \SplObserver $observer
+ */
+ function detach(\SplObserver $observer) {
+ $this->observers->detach($observer);
+ }
+
+ /**
+ * @implements \SplSubject
+ */
+ function notify() {
+ foreach ($this->observers as $observer){
+ $observer->update($this);
+ }
}
}
+++ /dev/null
-<?php
-
-namespace pq\Query\Executor;
-
-use \pq\Query\ExecutorInterface;
-use \pq\Query\WriterInterface;
-
-use \React\Promise\Deferred;
-
-/**
- * An asynchronous query executor
- */
-class Async implements ExecutorInterface
-{
- protected $conn;
-
- /**
- * Create a asynchronous query exectuor
- * @param \pq\Connection $conn
- */
- function __construct(\pq\Connection $conn) {
- $this->conn = $conn;
- }
-
- /**
- * Get the connection
- * @return \pq\Connection
- */
- function getConnection() {
- return $this->conn;
- }
-
- /**
- * Set the connection
- * @param \pq\Connection $conn
- * @return \pq\Query\Executor\Async
- */
- function setConnection(\pq\Connection $conn) {
- $this->conn = $conn;
- return $this;
- }
-
- /**
- * Execute the query asynchronously through \pq\Connection::execParamsAsync()
- * @param \pq\Query\WriterInterface $query
- * @param callable $callback
- * @return \React\Promise\DeferredPromise
- */
- function execute(WriterInterface $query, callable $callback) {
- $deferred = new Deferred; // FIXME
- $this->getConnection()->execParamsAsync($query, $query->getParams(), $query->getTypes(),
- array($deferred->resolver(), "resolve"));
- return $deferred->then($callback);
- }
-}
/**
* An executor of \pq\Query\Writer queries
+ * @codeCoverageIgnore
*/
-interface ExecutorInterface
+interface ExecutorInterface extends \SplSubject
{
/**
* Get the connection
* @return mixed the result of the callback
*/
function execute(WriterInterface $query, callable $callback);
+
+ /**
+ * @return WriterInterface
+ */
+ function getQuery();
+
+ /**
+ * @return \pq\Result
+ */
+ function getResult();
+
}
namespace pq\Query;
+/**
+ * @codeCoverageIgnore
+ */
interface ExpressibleInterface
{
/**
* @return string
*/
protected function reduce($q, $v) {
- return $q . " " . (is_array($v) ? implode(", ", $v) : $v);
+ if (is_array($v)) {
+ $v = implode(", ", $v);
+ }
+ if (strlen($q)) {
+ $q .= " ";
+ }
+ return $q . $v;
}
/**
* @return \pq\Query\Writer
*/
function write() {
+ if (strlen($this->query)) {
+ $this->query .= " ";
+ }
$this->query .= array_reduce(func_get_args(), array($this, "reduce"));
return $this;
}
/**
* A query writer which supports easily constructing queries for \pq\Connection::execParams()
+ * @codeCoverageIgnore
*/
interface WriterInterface
{
$this->conn->exec(PQ_TEST_DATA);
Table::$defaultConnection = $this->conn;
$this->table = new Table("test");
+ $this->table->getQueryExecutor()->attach(new \QueryLogger());
}
protected function tearDown() {
$this->conn->exec(PQ_TEST_DATA);
Table::$defaultConnection = $this->conn;
$this->table = new Table("test");
+ $this->table->getQueryExecutor()->attach(new \QueryLogger());
}
protected function tearDown() {
$row = new Row($this->table);
$this->assertSame($this->table, $row->getTable());
}
+
+ function testPessimisticLock() {
+ $this->table->setLock(new Table\PessimisticLock);
+ $txn = $this->table->getConnection()->startTransaction();
+ $row = $this->table->find(null, null, 1)->current();
+ $row->data = "foo";
+ $row->update();
+ $txn->commit();
+ $this->assertSame("foo", $row->data->get());
+ }
+
+ function testPessimisticLockFail() {
+ $this->table->setLock(new Table\PessimisticLock);
+ $txn = $this->table->getConnection()->startTransaction();
+ $row = $this->table->find(null, null, 1)->current();
+ $row->data = "foo";
+ executeInConcurrentTransaction(
+ $this->table->getQueryExecutor(),
+ "UPDATE {$this->table->getName()} SET data='bar' WHERE id=\$1",
+ array($row->id->get()));
+ $this->setExpectedException("\\UnexpectedValueException", "Row has already been modified");
+ $row->update();
+ $txn->commit();
+ }
+
+ function testOptimisticLock() {
+ $this->table->setLock(new Table\OptimisticLock("counter"));
+ $row = $this->table->find(null, null, 1)->current();
+ $cnt = $row->counter->get();
+ $row->data = "foo";
+ $row->update();
+ $this->assertEquals("foo", $row->data->get());
+ $this->assertEquals($cnt +1, $row->counter->get());
+ }
+
+ function testOptimisticLockFail() {
+ $this->table->setLock(new Table\OptimisticLock("counter"));
+ $row = $this->table->find(null, null, 1)->current();
+ $cnt = $row->counter->get();
+ $row->data = "foo";
+ executeInConcurrentTransaction(
+ $this->table->getQueryExecutor(),
+ "UPDATE {$this->table->getName()} SET counter = 10 WHERE id=\$1",
+ array($row->id->get()));
+ $this->setExpectedException("\\UnexpectedValueException", "No row updated");
+ $row->update();
+ }
+
+ function testRef() {
+ foreach ($this->table->find() as $row) {
+ foreach ($row->reftest() as $ref) {
+ $this->assertEquals($row->id->get(), $ref->test->id->get());
+ }
+ }
+ }
}
$this->conn->exec(PQ_TEST_DATA);
Table::$defaultConnection = $this->conn;
$this->table = new Table("test");
+ $this->table->getQueryExecutor()->attach(new \QueryLogger());
}
protected function tearDown() {
}
public function testDeleteFail() {
- $this->setExpectedException("pq\\Exception");
+ $this->setExpectedException("Exception");
$rowset = new Rowset($this->table);
$rowset->append(new Row($this->table, array("xx" => 0)))->delete();
}
$this->conn->exec(PQ_TEST_DATA);
Table::$defaultConnection = $this->conn;
$this->table = new Table("test");
+ $this->table->getQueryExecutor()->attach(new \QueryLogger());
}
protected function tearDown() {
spl_autoload_register(function($c) {
if (substr($c,0,3) == "pq\\") return require_once sprintf("%s/../lib/%s.php", __DIR__, strtr($c, "\\", "/"));
});
+
+function executeInConcurrentTransaction(\pq\Query\ExecutorInterface $exec, $sql, array $params = null) {
+ $conn = $exec->getConnection();
+ $exec->setConnection(new pq\Connection(PQ_TEST_DSN));
+ $exec->execute(new \pq\Query\Writer($sql, $params), function(){});
+ $exec->setConnection($conn);
+}
+
+class QueryLogger implements \SplObserver
+{
+ protected $fp;
+
+ function __construct($logfile = null) {
+ if (!isset($logfile)) {
+ $logfile = __DIR__."/query.log";
+ }
+ if (!$this->fp = @fopen($logfile, "a")) {
+ throw new \RuntimeException(error_get_last()["message"]);
+ }
+ }
+
+ function __destruct() {
+ if (is_resource($this->fp)) {
+ fclose($this->fp);
+ }
+ }
+
+ function update(\SplSubject $executor) {
+ $result = $executor->getResult();
+ if (isset($result)) {
+ fprintf($this->fp, "[%s] R %s\n",
+ date_create()->format("Y-m-d H:i:s"),
+ json_encode([
+ "S" => $result->statusMessage,
+ "N" => $result->numRows,
+ "C" => $result->numCols,
+ "A" => $result->affectedRows
+ ]));
+ } elseif (($query = $executor->getQuery())) {
+ fprintf($this->fp, "[%s] Q %s %% %s\n",
+ date_create()->format("Y-m-d H:i:s"),
+ preg_replace("/\s+/", " ", $query),
+ json_encode($query->getParams()));
+ }
+ }
+}