* @param \pq\Result $result
* @return \pq\Gateway\Rowset
*/
- function __invoke(\pq\Result $result) {
+ function __invoke(\pq\Result $result = null) {
$that = clone $this;
$that->hydrate($result);
return $that;
$this->rows = array();
if ($result) {
- $row = $this->row;
+ $row = $this->getRowPrototype();
if (is_callable($row)) {
while (($data = $result->fetchRow(\pq\Result::FETCH_ASSOC))) {
return $this;
}
+ /**
+ * Get the row prototype
+ * @return mixed
+ */
+ function getRowPrototype() {
+ return $this->row;
+ }
+
/**
* @return \pq\Gateway\Table
*/
return $this->table;
}
- function create() {
- array_map(function ($row) {
- $row->create();
- }, $this->rows);
+ /**
+ * Create all rows of this rowset
+ * @param bool $txn
+ * @return \pq\Gateway\Rowset
+ * @throws Exception
+ */
+ function create($txn = true) {
+ $txn = $txn ? $this->table->getConnection()->startTransaction() : false;
+ try {
+ foreach ($this->rows as $row) {
+ $row->create();
+ }
+ } catch (\Exception $e) {
+ if ($txn) {
+ $txn->rollback();
+ }
+ throw $e;
+ }
+ if ($txn) {
+ $txn->commit();
+ }
return $this;
}
- function update() {
- array_map(function ($row) {
- $row->update();
- }, $this->rows);
+ /**
+ * Update all rows of this rowset
+ * @param bool $txn
+ * @return \pq\Gateway\Rowset
+ * @throws \Exception
+ */
+ function update($txn = true) {
+ $txn = $txn ? $this->table->getConnection()->startTransaction() : false;
+ try {
+ foreach ($this->rows as $row) {
+ $row->update();
+ }
+ } catch (\Exception $e) {
+ if ($txn) {
+ $txn->rollback();
+ }
+ throw $e;
+ }
+ if ($txn) {
+ $txn->commit();
+ }
return $this;
}
- function delete() {
- array_map(function ($row) {
- $row->delete();
- }, $this->rows);
+ /**
+ * Delete all rows of this rowset
+ * @param type $txn
+ * @return \pq\Gateway\Rowset
+ * @throws \Exception
+ */
+ function delete($txn = true) {
+ $txn = $txn ? $this->table->getConnection()->startTransaction() : false;
+ try {
+ foreach ($this->rows as $row) {
+ $row->delete();
+ }
+ } catch (\Exception $e) {
+ if ($txn) {
+ $txn->rollback();
+ }
+ throw $e;
+ }
+ if ($txn) {
+ $txn->commit();
+ }
return $this;
}
function rewind() {
$this->index = 0;
}
+
/**
* @implements \Iterator
*/
function next() {
++$this->index;
}
+
/**
* @implements \Iterator
* @return bool
function valid() {
return $this->index < count($this->rows);
}
+
/**
* @implements \Iterator
* @return \pq\Gateway\Row
*/
function current() {
+ if (!$this->valid()) {
+ throw new \OutOfBoundsException("Invalid row index {$this->index}");
+ }
return $this->rows[$this->index];
}
+
/**
* @implements \Iterator
* @return int
if (!$this->valid()) {
throw new \OutOfBoundsException("Invalid seek position ($pos)");
}
+
+ return $this;
}
/**
return $this->rows;
}
+ /**
+ * Apply a callback on each row of this rowset
+ * @param callable $cb
+ * @return \pq\Gateway\Rowset
+ */
+ function apply(callable $cb) {
+ array_walk($this->rows, $cb, $this);
+ return $this;
+ }
+
/**
* Filter by callback
* @param callable $cb
*/
function filter(callable $cb) {
$rowset = clone $this;
+ $rowset->index = 0;
$rowset->rows = array_filter($this->rows, $cb);
return $rowset;
}
+
+ /**
+ * Append a row to the rowset
+ * @param \pq\Gateway\Row $row
+ */
+ function append(Row $row) {
+ $this->rows[] = $row;
+ return $this;
+ }
}