namespace pq\Mapper;
+use InvalidArgumentException;
+use pq\Connection;
use pq\Gateway\Table;
+use pq\Transaction;
class Storage implements StorageInterface
{
*/
private $gateway;
+ /**
+ * Buffered transaction
+ * @var Transaction
+ */
+ private $xaction;
+
/**
* Create a storage for $map
* @param MapInterface $map
$this->gateway = $map->getGateway();
}
+ /**
+ * Find by PK
+ * @param mixed $pk
+ * @return object
+ */
+ function get($pk) {
+ $id = $this->gateway->getIdentity();
+ if (count($id) == 1 && is_scalar($pk)) {
+ $pk = [current($id->getColumns()) => $pk];
+ } elseif (!is_array($pk) || count($pk) !== count($id)) {
+ throw InvalidArgumentException(
+ "Insufficient identity provided; not all fields of %s are provided in %s",
+ json_encode($id->getColumns()), json_encode($pk));
+ }
+
+ $where = [];
+ foreach ($pk as $k => $v) {
+ $where["$k="] = $v;
+ }
+ $rowset = $this->gateway->find($where);
+
+ return $this->map->map($rowset->current());
+ }
+
/**
* Find
* @param array $where
function save($object) {
$this->map->unmap($object);
}
+
+ /**
+ * Buffer in a transaction
+ */
+ function buffer() {
+ switch ($this->gateway->getConnection()->transactionStatus) {
+ case Connection::TRANS_INTRANS:
+ break;
+ default:
+ $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("START TRANSACTION"));
+ }
+ }
+
+ /**
+ * Commit
+ */
+ function flush() {
+ switch ($this->gateway->getConnection()->transactionStatus) {
+ case Connection::TRANS_IDLE:
+ break;
+ default:
+ $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("COMMIT"));
+ }
+ }
+
+ /**
+ * Rollback
+ */
+ function discard() {
+ switch ($this->gateway->getConnection()->transactionStatus) {
+ case Connection::TRANS_IDLE:
+ break;
+ default:
+ $this->gateway->getQueryExecutor()->execute(new \pq\Query\Writer("ROLLBACK"));
+ }
+ $this->map->getObjects()->reset();
+ }
}
interface StorageInterface
{
+ /**
+ * Find by PK
+ * @param mixed $pk
+ * @return object
+ */
+ function get($pk);
+
/**
* Find
* @param array $where
* @param object $object
*/
function save($object);
+
+ /**
+ * Buffer in a transaction
+ */
+ function buffer();
+
+ /**
+ * Commit
+ */
+ function flush();
+
+ /**
+ * Rollback
+ */
+ function discard();
}
$context = $init();
foreach (func_get_args() as $cb) {
- $then($context, $cb);
+ if (is_callable($callback)) {
+ $then($context, $cb);
+ }
}
return array($context, function($result) use ($context, $done) {
* @param callable $callback result callback
* @return mixed context created by the init callback
*/
- function execute(WriterInterface $query, callable $callback) {
+ function execute(WriterInterface $query, callable $callback = null) {
$this->result = null;
$this->query = $query;
$this->notify();
* @param callable $callback
* @return mixed
*/
- function execute(WriterInterface $query, callable $callback) {
+ function execute(WriterInterface $query, callable $callback = null) {
$this->result = null;
$this->query = $query;
$this->notify();
$this->result = $this->getConnection()->execParams($query, $query->getParams(), $query->getTypes());
$this->notify();
- return $callback($this->result);
+ return $callback ? $callback($this->result) : $this->result;
}
/**
* @param callable $callback
* @return mixed the result of the callback
*/
- function execute(WriterInterface $query, callable $callback);
+ function execute(WriterInterface $query, callable $callback = null);
/**
* @return \pq\Query\WriterInterface
executeInConcurrentTransaction(
$this->table->getQueryExecutor(),
"UPDATE {$this->table->getName()} SET data='bar' WHERE id=\$1",
- array($row->id->get()));
+ array($row->id->get()))->commit();
$this->setExpectedException("\\UnexpectedValueException", "Row has already been modified");
$row->update();
$txn->commit();
executeInConcurrentTransaction(
$this->table->getQueryExecutor(),
"UPDATE {$this->table->getName()} SET counter = 10 WHERE id=\$1",
- array($row->id->get()));
+ array($row->id->get()))->commit();
$this->setExpectedException("\\UnexpectedValueException", "No row updated");
$row->update();
}
$this->mapper->mapOf(TestModel::class)->getObjects()->resetRow($obj);
$this->assertCount(0, $this->storage->find(["id="=>$obj->id]));
}
+
+ function testBuffer() {
+ $this->storage->buffer();
+ $this->assertEquals("yesterday", $this->storage->get(1)->data);
+ $this->mapper->mapOf(TestModel::class)->getObjects()->reset();
+ $exec = $this->mapper->mapOf(TestModel::class)->getGateway()->getQueryExecutor();
+ $xact = executeInConcurrentTransaction($exec, "UPDATE test SET data=\$2 WHERE id=\$1", [1, "the day before"]);
+ $this->assertEquals("yesterday", $this->storage->get(1)->data);
+ $xact->commit();
+ $this->storage->discard();
+ $this->assertEquals("the day before", $this->storage->get(1)->data);
+ }
}
require_once __DIR__ . "/../vendor/autoload.php";
-function executeInConcurrentTransaction(ExecutorInterface $exec, $sql, array $params = null) {
+function executeInConcurrentTransaction(ExecutorInterface $exec, $sql, array $params = array()) {
$conn = $exec->getConnection();
- $exec->setConnection(new Connection(PQ_TEST_DSN));
- $exec->execute(new Writer($sql, $params), function(){});
+ $xact = (new Connection(PQ_TEST_DSN))->startTransaction();
+ $exec->setConnection($xact->connection);
+ $exec->execute(new Writer($sql, $params));
$exec->setConnection($conn);
+ return $xact;
}
class QueryLogger implements SplObserver
date_create()->format("Y-m-d H:i:s"),
json_encode($result));
} elseif (($query = $executor->getQuery())) {
- fprintf($this->fp, "[%s] Q %s %% %s\n",
+ $executor->getConnection()->exec("SELECT pg_backend_pid()")->fetchCol($pid);
+ fprintf($this->fp, "[%s] Q %s %% %s @%d\n",
date_create()->format("Y-m-d H:i:s"),
preg_replace("/\s+/", " ", $query),
- json_encode($query->getParams()));
+ json_encode($query->getParams()),
+ $pid);
}
}
}