Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\ETL\Adapter\PostgreSql;

use function Flow\ETL\DSL\array_to_rows;
use function Flow\PostgreSql\DSL\{sql_to_count_query, sql_to_paginated_query};
use function Flow\PostgreSql\DSL\{sql_parse, sql_query_order_by, sql_to_count_query, sql_to_paginated_query};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\{Extractor, FlowContext, Schema};
Expand All @@ -30,6 +30,12 @@ public function extract(FlowContext $context) : \Generator
{
$sql = $this->query instanceof SqlQuery ? $this->query->toSql() : $this->query;

if (!sql_query_order_by(sql_parse($sql))->hasOrderBy()) {
throw new InvalidArgumentException(
'LIMIT/OFFSET pagination requires ORDER BY clause for deterministic results'
);
}

$total = $this->maximum ?? $this->countTotal($sql);

if ($total === 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Adapter\PostgreSql\Tests\Unit;

use function Flow\ETL\DSL\flow_context;
use Flow\ETL\Adapter\PostgreSql\PostgreSqlLimitOffsetExtractor;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Schema;
Expand All @@ -13,6 +14,24 @@

final class PostgreSqlLimitOffsetExtractorTest extends FlowTestCase
{
protected function setUp() : void
{
if (!\extension_loaded('pg_query')) {
self::markTestSkipped('pg_query extension is not loaded. For local development use `nix-shell --arg with-pg-query-ext true` to enable it in the shell.');
}
}

public function test_throws_exception_when_query_has_no_order_by() : void
{
$client = $this->createClientMock();
$extractor = new PostgreSqlLimitOffsetExtractor($client, 'SELECT * FROM users');

$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('LIMIT/OFFSET pagination requires ORDER BY clause for deterministic results');

\iterator_to_array($extractor->extract(flow_context()));
}

public function test_with_maximum_validates_positive_value() : void
{
$client = $this->createClientMock();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace Flow\PostgreSql\AST\Visitors;

use Flow\PostgreSql\AST\NodeVisitor;
use Flow\PostgreSql\Protobuf\AST\SortBy;

/**
* A visitor that collects all SortBy (ORDER BY items) nodes.
*/
final class SortByCollector implements NodeVisitor
{
/**
* @var array<SortBy>
*/
private array $sortByClauses = [];

public static function nodeClass() : string
{
return SortBy::class;
}

public function enter(object $node) : ?int
{
/** @var SortBy $node */
$this->sortByClauses[] = $node;

return null;
}

/**
* @return array<SortBy>
*/
public function getSortByClauses() : array
{
return $this->sortByClauses;
}

public function hasSortBy() : bool
{
return \count($this->sortByClauses) > 0;
}

public function leave(object $node) : ?int
{
return null;
}

public function reset() : void
{
$this->sortByClauses = [];
}
}
31 changes: 20 additions & 11 deletions src/lib/postgresql/src/Flow/PostgreSql/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use Flow\PostgreSql\Explain\Analyzer\PlanAnalyzer;
use Flow\PostgreSql\Explain\ExplainParser;
use Flow\PostgreSql\Explain\Plan\Plan;
use Flow\PostgreSql\Extractors\{Columns, Functions, QueryDepth, Tables};
use Flow\PostgreSql\Extractors\{Columns, Functions, OrderBy as OrderByExtractor, QueryDepth, Tables};
use Flow\PostgreSql\Protobuf\AST\Node;
use Flow\PostgreSql\QueryBuilder\Clause\{
CTE,
Expand All @@ -31,13 +31,13 @@
LockingClause,
NullsPosition,
OnConflictClause,
OrderByItem,
OrderBy,
ReturningClause,
SortDirection,
WindowDefinition,
WindowFrame
};
use Flow\PostgreSql\QueryBuilder\Clause\{OrderBy, WithClause};
use Flow\PostgreSql\QueryBuilder\Clause\WithClause;
use Flow\PostgreSql\QueryBuilder\Condition\{
All,
AndCondition,
Expand Down Expand Up @@ -370,6 +370,15 @@ function sql_query_functions(ParsedQuery $query) : Functions
return new Functions($query);
}

/**
* Extract ORDER BY clauses from a parsed SQL query.
*/
#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)]
function sql_query_order_by(ParsedQuery $query) : OrderByExtractor
{
return new OrderByExtractor($query);
}

/**
* Get the maximum nesting depth of a SQL query.
*
Expand Down Expand Up @@ -1171,7 +1180,7 @@ function binary_expr(Expression $left, string $operator, Expression $right) : Bi
* @param string $name Function name
* @param list<Expression> $args Function arguments
* @param list<Expression> $partitionBy PARTITION BY expressions
* @param list<OrderBy|OrderByItem> $orderBy ORDER BY items
* @param list<OrderBy> $orderBy ORDER BY items
*/
#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)]
function window_func(
Expand Down Expand Up @@ -1709,26 +1718,26 @@ function order_by(
Expression $expr,
SortDirection $direction = SortDirection::ASC,
NullsPosition $nulls = NullsPosition::DEFAULT,
) : OrderByItem {
return new OrderByItem($expr, $direction, $nulls);
) : OrderBy {
return new OrderBy($expr, $direction, $nulls);
}

/**
* Create an ORDER BY item with ASC direction.
*/
#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)]
function asc(Expression $expr, NullsPosition $nulls = NullsPosition::DEFAULT) : OrderByItem
function asc(Expression $expr, NullsPosition $nulls = NullsPosition::DEFAULT) : OrderBy
{
return new OrderByItem($expr, SortDirection::ASC, $nulls);
return new OrderBy($expr, SortDirection::ASC, $nulls);
}

/**
* Create an ORDER BY item with DESC direction.
*/
#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)]
function desc(Expression $expr, NullsPosition $nulls = NullsPosition::DEFAULT) : OrderByItem
function desc(Expression $expr, NullsPosition $nulls = NullsPosition::DEFAULT) : OrderBy
{
return new OrderByItem($expr, SortDirection::DESC, $nulls);
return new OrderBy($expr, SortDirection::DESC, $nulls);
}

/**
Expand Down Expand Up @@ -1758,7 +1767,7 @@ function cte(
*
* @param string $name Window name
* @param list<Expression> $partitionBy PARTITION BY expressions
* @param list<OrderBy|OrderByItem> $orderBy ORDER BY items
* @param list<OrderBy> $orderBy ORDER BY items
* @param null|WindowFrame $frame Window frame specification
*/
#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)]
Expand Down
38 changes: 38 additions & 0 deletions src/lib/postgresql/src/Flow/PostgreSql/Extractors/OrderBy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Flow\PostgreSql\Extractors;

use Flow\PostgreSql\AST\Visitors\SortByCollector;
use Flow\PostgreSql\ParsedQuery;
use Flow\PostgreSql\QueryBuilder\Clause\OrderBy as OrderByClause;

final readonly class OrderBy
{
public function __construct(private ParsedQuery $query)
{
}

/**
* @return array<OrderByClause>
*/
public function all() : array
{
$collector = new SortByCollector();
$this->query->traverse($collector);

return \array_map(
static fn ($sortBy) => OrderByClause::fromAst($sortBy),
$collector->getSortByClauses()
);
}

public function hasOrderBy() : bool
{
$collector = new SortByCollector();
$this->query->traverse($collector);

return $collector->hasSortBy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,91 @@
namespace Flow\PostgreSql\QueryBuilder\Clause;

use Flow\PostgreSql\Protobuf\AST\{Node, SortBy};
use Flow\PostgreSql\QueryBuilder\Bridge\AstConvertible;
use Flow\PostgreSql\QueryBuilder\Exception\InvalidAstException;
use Flow\PostgreSql\QueryBuilder\Expression\{Expression, ExpressionFactory};
use Flow\PostgreSql\QueryBuilder\Expression\{Column, Expression, ExpressionFactory};

/**
* Represents an ORDER BY clause element.
* Represents an ORDER BY item.
*/
final readonly class OrderBy implements AstConvertible
final readonly class OrderBy
{
public function __construct(
private Expression $expression,
private SortDirection $direction = SortDirection::ASC,
private NullsPosition $nullsPosition = NullsPosition::DEFAULT,
private NullsPosition $nulls = NullsPosition::DEFAULT,
) {
}

public static function fromAst(Node $node) : static
public static function fromAst(SortBy $node) : self
{
$sortBy = $node->getSortBy();

if ($sortBy === null) {
throw InvalidAstException::unexpectedNodeType('SortBy', 'unknown');
}

$nodeExpr = $sortBy->getNode();
$nodeExpr = $node->getNode();

if ($nodeExpr === null) {
throw InvalidAstException::missingRequiredField('node', 'SortBy');
}

$expression = ExpressionFactory::fromAst($nodeExpr);
$direction = SortDirection::fromProtobuf($node->getSortbyDir());
$nulls = NullsPosition::fromProtobuf($node->getSortbyNulls());

$direction = SortDirection::fromProtobuf($sortBy->getSortbyDir());
$nullsPosition = NullsPosition::fromProtobuf($sortBy->getSortbyNulls());
return new self($expression, $direction, $nulls);
}

return new self($expression, $direction, $nullsPosition);
public function asc() : self
{
return new self($this->expression, SortDirection::ASC, $this->nulls);
}

public function getDirection() : SortDirection
public function column() : ?string
{
if ($this->expression instanceof Column) {
return $this->expression->columnName();
}

return null;
}

public function desc() : self
{
return new self($this->expression, SortDirection::DESC, $this->nulls);
}

public function direction() : SortDirection
{
return $this->direction;
}

public function getExpression() : Expression
public function expression() : Expression
{
return $this->expression;
}

public function getNullsPosition() : NullsPosition
public function nulls() : NullsPosition
{
return $this->nullsPosition;
return $this->nulls;
}

public function toAst() : Node
public function nullsFirst() : self
{
$sortBy = new SortBy([
'node' => $this->expression->toAst(),
'sortby_dir' => $this->direction->toProtobuf(),
'sortby_nulls' => $this->nullsPosition->toProtobuf(),
]);
return new self($this->expression, $this->direction, NullsPosition::FIRST);
}

return new Node(['sort_by' => $sortBy]);
public function nullsLast() : self
{
return new self($this->expression, $this->direction, NullsPosition::LAST);
}

public function withDirection(SortDirection $direction) : self
public function toAst() : SortBy
{
return new self($this->expression, $direction, $this->nullsPosition);
return new SortBy([
'node' => $this->expression->toAst(),
'sortby_dir' => $this->direction->toProtobuf(),
'sortby_nulls' => $this->nulls->toProtobuf(),
]);
}

public function withNullsPosition(NullsPosition $nullsPosition) : self
public function toNode() : Node
{
return new self($this->expression, $this->direction, $nullsPosition);
return new Node(['sort_by' => $this->toAst()]);
}
}
Loading
Loading