Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.List;
import java.util.Objects;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
Expand Down Expand Up @@ -74,8 +73,13 @@ public IgniteColocatedSortAggregate(RelInput input) {
/** {@inheritDoc} */
@Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet,
List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
RelCollation collation = TraitUtils.collation(input.getTraitSet());

assert collation.satisfies(TraitUtils.collation(traitSet))
: "Unexpected collations: input=" + collation + ", traitSet=" + TraitUtils.collation(traitSet);

return new IgniteColocatedSortAggregate(
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, TraitUtils.collation(traitSet));
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@ public IgniteMapSortAggregate(RelInput input) {
RelNode input,
ImmutableBitSet groupSet,
List<ImmutableBitSet> groupSets,
List<AggregateCall> aggCalls) {
List<AggregateCall> aggCalls
) {
RelCollation collation = TraitUtils.collation(input.getTraitSet());

assert collation.satisfies(TraitUtils.collation(traitSet))
: "Unexpected collations: input=" + collation + ", traitSet=" + TraitUtils.collation(traitSet);

return new IgniteMapSortAggregate(
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, TraitUtils.collation(traitSet));
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.List;
import java.util.Objects;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
Expand Down Expand Up @@ -75,6 +74,11 @@ public IgniteReduceSortAggregate(RelInput input) {

/** {@inheritDoc} */
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
RelCollation collation = TraitUtils.collation(sole(inputs).getTraitSet());

assert collation.satisfies(TraitUtils.collation(traitSet))
: "Unexpected collations: input=" + collation + ", traitSet=" + TraitUtils.collation(traitSet);

return new IgniteReduceSortAggregate(
getCluster(),
traitSet,
Expand All @@ -83,7 +87,7 @@ public IgniteReduceSortAggregate(RelInput input) {
groupSets,
aggCalls,
rowType,
TraitUtils.collation(traitSet)
collation
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.query.calcite.trait;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -29,6 +31,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.AbstractRelOptPlanner;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
Expand Down Expand Up @@ -61,6 +64,7 @@
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -167,6 +171,7 @@ else if (converter == RewindabilityTraitDef.INSTANCE)
RelOptRule.convert(
rel,
rel.getTraitSet()
.replace(RewindabilityTrait.ONE_WAY)
.replace(CorrelationTrait.UNCORRELATED)
),
toTrait);
Expand Down Expand Up @@ -419,15 +424,58 @@ public static Pair<RelTraitSet, List<RelTraitSet>> passThrough(TraitsAwareIgnite

assert traits.size() <= 1;

if (!traits.isEmpty() && traits.get(0).left.satisfies(requiredTraits)) {
// Return most relaxed parent traits.
return Pair.of(requiredTraits, traits.get(0).right);
}

return F.first(traits);
}

/** */
public static List<RelTraitSet> removeDuplicates(List<RelTraitSet> traits) {
BitSet duplicates = null;

for (int i = 0; i < traits.size() - 1; i++) {
if (duplicates != null && duplicates.get(i))
continue;

for (int j = i + 1; j < traits.size(); j++) {
if (duplicates != null && duplicates.get(j))
continue;

// Return most strict child traits.
if (traits.get(i).satisfies(traits.get(j)))
(duplicates == null ? duplicates = new BitSet() : duplicates).set(j);
else if (traits.get(j).satisfies(traits.get(i))) {
(duplicates == null ? duplicates = new BitSet() : duplicates).set(i);
break;
}
}
}

if (duplicates == null)
return traits;

List<RelTraitSet> newTraits = new ArrayList<>(traits.size() - duplicates.cardinality());

for (int i = 0; i < traits.size(); i++) {
if (!duplicates.get(i))
newTraits.add(traits.get(i));
}

return newTraits;
}

/** */
public static List<RelNode> derive(TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
assert !F.isEmpty(inTraits);

RelTraitSet outTraits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE);
Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = combinations(outTraits, inTraits);

inTraits = Commons.transform(inTraits, TraitUtils::removeDuplicates);

Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = combinations(rel, outTraits, inTraits);

if (combinations.isEmpty())
return ImmutableList.of();
Expand All @@ -448,21 +496,33 @@ private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
}

/** */
private static Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations(RelTraitSet outTraits, List<List<RelTraitSet>> inTraits) {
private static Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations(
TraitsAwareIgniteRel rel,
RelTraitSet outTraits,
List<List<RelTraitSet>> inTraits
) {
Set<Pair<RelTraitSet, List<RelTraitSet>>> out = new HashSet<>();
fillRecursive(outTraits, inTraits, out, new RelTraitSet[inTraits.size()], 0);
fillRecursive(rel, outTraits, inTraits, out, new RelTraitSet[inTraits.size()], 0);
return out;
}

/** */
private static boolean fillRecursive(
TraitsAwareIgniteRel rel,
RelTraitSet outTraits,
List<List<RelTraitSet>> inTraits,
Set<Pair<RelTraitSet, List<RelTraitSet>>> result,
RelTraitSet[] combination,
int idx
) throws ControlFlowException {
boolean processed = false, last = idx == inTraits.size() - 1;

if (last) {
assert rel.getCluster().getPlanner() instanceof AbstractRelOptPlanner;

((AbstractRelOptPlanner)rel.getCluster().getPlanner()).checkCancel();
}

for (RelTraitSet t : inTraits.get(idx)) {
assert t.getConvention() == IgniteConvention.INSTANCE;

Expand All @@ -471,7 +531,7 @@ private static boolean fillRecursive(

if (last)
result.add(Pair.of(outTraits, ImmutableList.copyOf(combination)));
else if (!fillRecursive(outTraits, inTraits, result, combination, idx + 1))
else if (!fillRecursive(rel, outTraits, inTraits, result, combination, idx + 1))
return false;
}
return processed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ public void correctCollationsOnMapReduceSortAgg() throws InterruptedException {
assertEquals(ROWS, cursors.size());
}

/** */
@Test
public void testNullsReordering() {
sql("CREATE TABLE t(a INTEGER, b INTEGER) WITH " + atomicity());
sql("INSERT INTO t VALUES (1, 1), (2, 2), (1, 3), (3, 4), (NULL, 1), (1, NULL)");

assertQuery("SELECT a, SUM(b), COUNT(b), COUNT(*) FROM t GROUP BY a ORDER BY a NULLS LAST")
.ordered()
.returns(1, 4L, 2L, 3L)
.returns(2, 2L, 1L, 1L)
.returns(3, 4L, 1L, 1L)
.returns(null, 1L, 1L, 1L)
.check();
}

/**
* @param c Cache.
* @param rows Rows count.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Duration;
import java.time.Period;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -657,6 +658,20 @@ public void testInsertIncorrectDate() {
assertThrows("INSERT INTO timestamp_t VALUES ('1900-1-1 00-00-00')", errType, errDate);
}

/** */
@Test
public void testInsertMultiRowValues() {
sql("CREATE TABLE test (id int, val int) WITH " + atomicity());

int rowsCnt = 50;

String sql = "INSERT INTO test VALUES " + String.join(", ", Collections.nCopies(rowsCnt, "(?, ?)"));

sql(sql, new Object[rowsCnt * 2]);

assertQuery("SELECT * FROM test").resultSize(rowsCnt).check();
}

/** */
private void checkDefaultValue(String sqlType, String sqlVal, Object expectedVal) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.junit.Test;

import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -209,4 +210,31 @@ public void joinBroadcastAggregateRehash() throws Exception {
)
);
}

/**
* Re-hashing right hand for merge join.
*/
@Test
public void joinMergeJoinAffinityRehash() throws Exception {
IgniteSchema schema = createSchema(
createTable("ORDERS", IgniteDistributions.affinity(0, "orders", "hash"),
"ID", INTEGER, "REGION", INTEGER)
.addIndex("ORDER_ID_IDX", 0),
createTable("ORDER_ITEMS", IgniteDistributions.affinity(0, "order_items", "hash"),
"ID", INTEGER, "ORDER_ID", INTEGER, "AMOUNT", INTEGER)
.addIndex("ORDER_ITEMS_ORDER_ID_IDX", 1)
);

String sql = "SELECT sum(amount)" +
" FROM order_items i JOIN orders o ON o.id=i.order_id" +
" WHERE o.region = ?";

assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class))
.and(hasChildThat(isIndexScan("ORDERS", "ORDER_ID_IDX")))
.and(hasChildThat(isInstanceOf(IgniteExchange.class)
.and(hasDistribution(IgniteDistributions.affinity(0, "orders", "hash")))
.and(hasChildThat(isIndexScan("ORDER_ITEMS", "ORDER_ITEMS_ORDER_ID_IDX")))))
);
}
}
Loading
Loading