diff --git a/docs/BigQueryTable-batchsource.md b/docs/BigQueryTable-batchsource.md index 7eec12e74d..b36fd3786b 100644 --- a/docs/BigQueryTable-batchsource.md +++ b/docs/BigQueryTable-batchsource.md @@ -57,6 +57,14 @@ name is not null', all output rows will have an 'age' over 50 and a value for th This is the same as the WHERE clause in BigQuery. More information can be found at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#where_clause +**Order By**: The column or list of columns to order the data by. For +example, `name asc, age desc`. More information can be found +at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#order_by_clause. + +**Limit**: The maximum number of rows to read from the source table. More information can be +found +at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause. + **Enable Querying Views**: Whether to allow querying views. Since BigQuery views are not materialized by default, querying them may have a performance overhead. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 2de14131ac..40a2bb78a7 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -241,6 +241,12 @@ private void configureBigQuerySource() { if (config.getViewMaterializationDataset() != null) { configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset()); } + if (config.getOrderBy() != null) { + configuration.set(BigQueryConstants.CONFIG_ORDER_BY, config.getOrderBy()); + } + if (config.getLimit() != null) { + configuration.set(BigQueryConstants.CONFIG_LIMIT, String.valueOf(config.getLimit())); + } } public Schema getSchema(FailureCollector collector) { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java index 4382765cb0..fbe6192977 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java @@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig { private static final String VALID_DATE_FORMAT = "yyyy-MM-dd"; private static final String SCHEME = "gs://"; private static final String WHERE = "WHERE"; + private static final String ORDER_BY = "ORDER BY"; public static final Set SUPPORTED_TYPES = ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES, Schema.Type.ARRAY, Schema.Type.RECORD); @@ -70,6 +71,8 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig { public static final String NAME_ENABLE_QUERYING_VIEWS = "enableQueryingViews"; public static final String NAME_VIEW_MATERIALIZATION_PROJECT = "viewMaterializationProject"; public static final String NAME_VIEW_MATERIALIZATION_DATASET = "viewMaterializationDataset"; + public static final String NAME_LIMIT = "limit"; + public static final String NAME_ORDER_BY = "orderBy"; @Name(Constants.Reference.REFERENCE_NAME) @Nullable @@ -110,6 +113,19 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig { "and discards all rows that do not return TRUE (that is, rows that return FALSE or NULL).") private String filter; + @Name(NAME_ORDER_BY) + @Macro + @Nullable + @Description("The ORDER BY clause sorts the results of a query based on one or more columns. " + + "For example, 'name asc, age desc'.") + private String orderBy; + + @Name(NAME_LIMIT) + @Macro + @Nullable + @Description("The LIMIT clause restricts the number of rows returned by the query.") + private Long limit; + @Name(NAME_ENABLE_QUERYING_VIEWS) @Macro @Nullable @@ -176,6 +192,13 @@ public void validate(FailureCollector collector, Map arguments) if (!containsMacro(NAME_CMEK_KEY)) { validateCmekKey(collector, arguments); } + + if (!containsMacro(NAME_LIMIT) && limit != null) { + if (limit < 0) { + collector.addFailure("Invalid limit value.", "Limit must be a non-negative number.") + .withConfigProperty(NAME_LIMIT); + } + } } void validateCmekKey(FailureCollector collector, Map arguments) { @@ -271,17 +294,44 @@ public String getPartitionTo() { @Nullable public String getFilter() { - if (filter != null) { - filter = filter.trim(); - if (filter.isEmpty()) { - return null; - } - // remove the WHERE keyword from the filter if the user adds it at the begging of the expression - if (filter.toUpperCase().startsWith(WHERE)) { - filter = filter.substring(WHERE.length()); - } + return cleanupSqlFragment(filter, WHERE); + } + + @Nullable + public String getOrderBy() { + return cleanupSqlFragment(orderBy, ORDER_BY); + } + + @Nullable + public Long getLimit() { + return limit; + } + + /** + * Cleans up a SQL fragment by trimming whitespace and stripping a given keyword from the + * beginning of the string in a case-insensitive way. + * + * @param fragment The input SQL string fragment. + * @param keyword The SQL keyword to remove (e.g., "WHERE ", "ORDER BY "). + * @return The cleaned fragment, or null if the input was null or empty. + */ + @Nullable + private String cleanupSqlFragment(@Nullable String fragment, String keyword) { + if (Strings.isNullOrEmpty(fragment)) { + return null; + } + + fragment = fragment.trim(); + + if (fragment.isEmpty()) { + return null; } - return filter; + + if (fragment.toUpperCase().startsWith(keyword)) { + fragment = fragment.substring(keyword.length()).trim(); + } + + return fragment.isEmpty() ? null : fragment; } public boolean isEnableQueryingViews() { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java index 88e8267053..fb3cd84eb5 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java @@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.bigquery.RangePartitioning; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableDefinition.Type; import com.google.cloud.bigquery.TimePartitioning; @@ -49,6 +50,8 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.GeneralSecurityException; @@ -65,6 +68,7 @@ */ public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat { + private static final Logger LOG = LoggerFactory.getLogger(PartitionedBigQueryInputFormat.class); private InputFormat delegateInputFormat = new AvroBigQueryInputFormatWithScopes(); @@ -128,17 +132,24 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null); String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null); String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null); + String limit = configuration.get(BigQueryConstants.CONFIG_LIMIT, null); + String orderBy = configuration.get(BigQueryConstants.CONFIG_ORDER_BY, null); com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable( - datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath); + datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null); Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType(); + Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter(); + StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition(); String query; if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) { - query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter); + query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter, + limit, orderBy); } else { - query = generateQuery(partitionFromDate, partitionToDate, filter, projectId, datasetProjectId, datasetId, - tableName, serviceAccount, isServiceAccountFilePath); + query = generateQuery(partitionFromDate, partitionToDate, filter, datasetProjectId, + datasetId, + tableName, limit, orderBy, + isPartitionFilterRequired, tableDefinition); } if (query != null) { @@ -160,30 +171,41 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc } @VisibleForTesting - String generateQuery(String partitionFromDate, String partitionToDate, String filter, String project, - String datasetProject, String dataset, String table, @Nullable String serviceAccount, - @Nullable Boolean isServiceAccountFilePath) { - if (partitionFromDate == null && partitionToDate == null && filter == null) { + String generateQuery(String partitionFromDate, String partitionToDate, String filter, + String datasetProject, String dataset, String table, String limit, String orderBy, + Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) { + + if (Strings.isNullOrEmpty(filter) && Strings.isNullOrEmpty(orderBy) && Strings.isNullOrEmpty( + limit) + && Strings.isNullOrEmpty(partitionFromDate) && Strings.isNullOrEmpty(partitionToDate)) { return null; } - String queryTemplate = "select * from `%s` where %s"; - com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(datasetProject, dataset, table, - serviceAccount, - isServiceAccountFilePath); - StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition(); + + RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); - if (timePartitioning == null && filter == null) { - return null; - } StringBuilder condition = new StringBuilder(); + String partitionCondition = null; if (timePartitioning != null) { - String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, - partitionToDate); - condition.append(timePartitionCondition); + if (partitionFromDate == null && partitionToDate == null + && Objects.equals(isPartitionFilterRequired, Boolean.TRUE)) { + partitionCondition = BigQueryUtil.generateDefaultTimePartitionCondition(tableDefinition); + } else if (partitionFromDate != null || partitionToDate != null) { + partitionCondition = + BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, + partitionToDate); + } + } else if (rangePartitioning != null && Objects.equals(isPartitionFilterRequired, + Boolean.TRUE)) { + partitionCondition = BigQueryUtil.generateDefaultRangePartitionCondition( + tableDefinition); } - if (filter != null) { + if (!Strings.isNullOrEmpty(partitionCondition)) { + condition.append("(").append(partitionCondition).append(")"); + } + + if (!Strings.isNullOrEmpty(filter)) { if (condition.length() == 0) { condition.append(filter); } else { @@ -192,20 +214,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi } String tableName = datasetProject + "." + dataset + "." + table; - return String.format(queryTemplate, tableName, condition.toString()); + StringBuilder query = new StringBuilder("select * from ").append(tableName); + + if (condition.length() > 0) { + query.append(" where ").append(condition); + } + + if (!Strings.isNullOrEmpty(orderBy)) { + query.append(" order by ").append(orderBy); + } + + if (!Strings.isNullOrEmpty(limit)) { + query.append(" limit ").append(limit); + } + + LOG.debug("Generated BigQuery query for job: {}", query); + return query.toString(); } @VisibleForTesting - String generateQueryForMaterializingView(String datasetProject, String dataset, String table, String filter) { - String queryTemplate = "select * from `%s`%s"; - StringBuilder condition = new StringBuilder(); - + String generateQueryForMaterializingView(String datasetProject, String dataset, String table, + String filter, String limit, String orderBy) { + String tableName = String.format("`%s.%s.%s`", datasetProject, dataset, table); + StringBuilder query = new StringBuilder("select * from ").append(tableName); if (!Strings.isNullOrEmpty(filter)) { - condition.append(String.format(" where %s", filter)); + query.append(" where ").append(filter); } - String tableName = datasetProject + "." + dataset + "." + table; - return String.format(queryTemplate, tableName, condition.toString()); + if (!Strings.isNullOrEmpty(orderBy)) { + query.append(" order by ").append(orderBy); + } + + if (!Strings.isNullOrEmpty(limit)) { + query.append(" limit ").append(limit); + } + + return query.toString(); } /** diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java index 51228ee80b..4bc6fd0769 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java @@ -36,6 +36,8 @@ public interface BigQueryConstants { String CONFIG_TABLE_FIELDS = "cdap.bq.sink.table.fields"; String CONFIG_JSON_STRING_FIELDS = "cdap.bq.sink.json.string.fields"; String CONFIG_FILTER = "cdap.bq.source.filter"; + String CONFIG_LIMIT = "cdap.bq.source.limit"; + String CONFIG_ORDER_BY = "cdap.bq.source.order.by"; String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter"; String CONFIG_JOB_ID = "cdap.bq.sink.job.id"; String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project"; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index a979cdcdc1..8d64ec3404 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.RangePartitioning; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; @@ -83,7 +84,7 @@ public final class BigQueryUtil { private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class); - private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME"; + public static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME"; private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix"; public static final String BUCKET_PATTERN = "[a-z0-9._-]+"; @@ -824,6 +825,43 @@ public static String generateTimePartitionCondition(StandardTableDefinition tabl return timePartitionCondition.toString(); } + /** + * Generates a default "IS NOT NULL OR IS NULL" partition condition for a time-partitioned table. + * + * @param tableDefinition The definition of the table. + * @return The SQL condition string or an empty string if no condition is needed. + */ + public static String generateDefaultTimePartitionCondition( + StandardTableDefinition tableDefinition) { + TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); + if (timePartitioning == null) { + return StringUtils.EMPTY; + } + + String columnName = timePartitioning.getField() != null ? + timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME; + + return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName); + } + + /** + * Generates a default "IS NOT NULL OR IS NULL" partition condition for a range-partitioned + * table. + * + * @param tableDefinition The definition of the table. + * @return The SQL condition string or an empty string if no condition is needed. + */ + public static String generateDefaultRangePartitionCondition( + StandardTableDefinition tableDefinition) { + RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); + if (rangePartitioning == null || Strings.isNullOrEmpty(rangePartitioning.getField())) { + return StringUtils.EMPTY; + } + + String columnName = rangePartitioning.getField(); + return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName); + } + /** * Get fully-qualified name (FQN) for a BQ table (FQN format: * bigquery:{projectId}.{datasetId}.{tableId}). diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java index 229b7443d1..caf8ff8e3d 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java @@ -16,65 +16,312 @@ package io.cdap.plugin.gcp.bigquery.source; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.RangePartitioning; +import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; -import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; +import com.google.cloud.bigquery.TimePartitioning; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import static org.mockito.Mockito.when; -import java.util.Objects; - -/** - * Unit Tests for generateQuery methods - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(BigQueryUtil.class) +@RunWith(MockitoJUnitRunner.class) public class PartitionedBigQueryInputFormatTest { + private static final String TEST_PROJECT = "test-project"; + private static final String TEST_DATASET = "test-dataset"; + private static final String TEST_TABLE = "test-table"; + private static final String TEST_FILTER = "age > 10"; + private static final String TEST_LIMIT = "100"; + private static final String TEST_ORDER_BY = "name asc"; + private static final String TEST_TABLE_SPEC = String.format("%s.%s.%s", TEST_PROJECT, + TEST_DATASET, TEST_TABLE); + private static final String TEST_FROM_DATE = "2025-01-01"; + private static final String TEST_TO_DATE = "2025-01-02"; + private static final String TEST_PARTITION_CONDITION = + "TIMESTAMP(`_PARTITIONTIME`) >= TIMESTAMP(\"2025-01-01\") and " + + "TIMESTAMP(`_PARTITIONTIME`) < TIMESTAMP(\"2025-01-02\")"; + private static final String TEST_DEFAULT_TIME_CONDITION = + "(`_PARTITIONTIME` IS NOT NULL OR `_PARTITIONTIME` IS NULL)"; + private static final String TEST_DEFAULT_RANGE_CONDITION = "(`range_col` IS NOT NULL " + + "OR `range_col` IS NULL)"; + private static final String TEST_TIME_UNIT_COL = "my_date_col"; + private static final String TEST_TIME_UNIT_PARTITION_CONDITION = + "TIMESTAMP(`my_date_col`) >= TIMESTAMP(\"2025-01-01\") and " + + "TIMESTAMP(`my_date_col`) < TIMESTAMP(\"2025-01-02\")"; + private static final String TEST_DEFAULT_TIME_UNIT_CONDITION = + "(`my_date_col` IS NOT NULL OR `my_date_col` IS NULL)"; + + + @Mock + private StandardTableDefinition mockTableDefinition; + @Mock + private TimePartitioning mockTimePartitioning; + @Mock + private RangePartitioning mockRangePartitioning; + @Mock + private Schema mockSchema; + @Mock + private FieldList mockFieldList; + @Mock + private Field mockField; + + private PartitionedBigQueryInputFormat format; + + @Before + public void setUp() { + format = new PartitionedBigQueryInputFormat(); + when(mockTableDefinition.getTimePartitioning()).thenReturn(null); + when(mockTableDefinition.getRangePartitioning()).thenReturn(null); + } + + @Test + public void testGenerateQueryForMaterializingView_WithFilterOnly() { + String expectedQuery = String.format("select * from `%s.%s.%s` where %s", + TEST_PROJECT, TEST_DATASET, TEST_TABLE, TEST_FILTER); + String generatedQuery = format.generateQueryForMaterializingView( + TEST_PROJECT, TEST_DATASET, TEST_TABLE, TEST_FILTER, null, null); + + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQueryForMaterializingView_NoFilterOrOptions() { + String expectedQuery = String.format("select * from `%s.%s.%s`", + TEST_PROJECT, TEST_DATASET, TEST_TABLE); + + String generatedQuery = format.generateQueryForMaterializingView( + TEST_PROJECT, TEST_DATASET, TEST_TABLE, null, null, null); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQueryForMaterializingView_AllOptions() { + String expectedQuery = String.format("select * from `%s.%s.%s` where %s order by %s limit %s", + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_FILTER, TEST_ORDER_BY, TEST_LIMIT); + + String generatedQuery = format.generateQueryForMaterializingView( + TEST_PROJECT, TEST_DATASET, TEST_TABLE, TEST_FILTER, TEST_LIMIT, TEST_ORDER_BY); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_WithFilterOnly() { + String expectedQuery = String.format("select * from %s where %s", + TEST_TABLE_SPEC, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_AllOptions() { + String expectedQuery = String.format("select * from %s where %s order by %s limit %s", + TEST_TABLE_SPEC, TEST_FILTER, TEST_ORDER_BY, TEST_LIMIT); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_LIMIT, TEST_ORDER_BY, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionWithDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, + TEST_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionRequiredAndFilter() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + String expectedQuery = String.format("select * from %s where %s and (%s)", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_CONDITION, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimeUnitPartitionWithDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(TEST_TIME_UNIT_COL); + when(mockTableDefinition.getSchema()).thenReturn(mockSchema); + when(mockSchema.getFields()).thenReturn(mockFieldList); + when(mockFieldList.get(TEST_TIME_UNIT_COL)).thenReturn(mockField); + when(mockField.getType()).thenReturn(LegacySQLTypeName.DATE); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, TEST_TIME_UNIT_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionFilterNotRequiredWithDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, + TEST_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_NoOptions_ShouldReturnNull() { + String generatedQuery = format.generateQuery(null, null, + null, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertNull("Query should be null if no filters or options are set.", generatedQuery); + } + + @Test + public void testGenerateQuery_WithLimitOnly_ShouldAssertQuery() { + String expectedQuery = String.format("select * from %s limit %s", TEST_TABLE_SPEC, + TEST_LIMIT); + + String generatedQuery = format.generateQuery(null, null, + null, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_LIMIT, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + @Test - public void testGenerateQueryForMaterializingView() { - String datasetProject = "test_bq_dataset_project"; - String dataset = "test_bq_dataset"; - String table = "test_bq_table"; - String filter = "tableColumn = 'abc'"; - PartitionedBigQueryInputFormat partitionedBigQueryInputFormat = new PartitionedBigQueryInputFormat(); - String generatedQuery = partitionedBigQueryInputFormat.generateQueryForMaterializingView(datasetProject, dataset, - table, filter); - String expectedQuery = String.format("select * from `%s.%s.%s` where %s", datasetProject, dataset, table, filter); - Assert.assertEquals(expectedQuery, generatedQuery); - - String expectedQueryWithoutFilter = String.format("select * from `%s.%s.%s`", datasetProject, dataset, table); - generatedQuery = partitionedBigQueryInputFormat.generateQueryForMaterializingView(datasetProject, dataset, - table, null); - Assert.assertEquals(expectedQueryWithoutFilter, generatedQuery); - } - - @Test - public void testGenerateQuery() { - String datasetProject = "test_bq_dataset_project"; - String dataset = "test_bq_dataset"; - String table = "test_bq_table"; - String filter = "tableColumn = 'abc'"; - PartitionedBigQueryInputFormat partitionedBigQueryInputFormat = new PartitionedBigQueryInputFormat(); - PowerMockito.mockStatic(BigQueryUtil.class); - Table t = PowerMockito.mock(Table.class); - StandardTableDefinition tableDefinition = PowerMockito.mock(StandardTableDefinition.class); - PowerMockito.when(BigQueryUtil.getBigQueryTable(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), - ArgumentMatchers.anyString(), ArgumentMatchers.any(), - ArgumentMatchers.anyBoolean())).thenReturn(t); - PowerMockito.when(t.getDefinition()).thenReturn(tableDefinition); - String generatedQuery = partitionedBigQueryInputFormat.generateQuery(null, null, filter, datasetProject, - datasetProject, dataset, table, null, true); - String expectedQuery = String.format("select * from `%s.%s.%s` where %s", datasetProject, dataset, table, filter); - Assert.assertEquals(expectedQuery, generatedQuery); - - generatedQuery = partitionedBigQueryInputFormat.generateQuery(null, null, null, datasetProject, datasetProject, - dataset, table, null, true); - Assert.assertNull(generatedQuery); + public void testGenerateQuery_WithOrderByOnly_ShouldAssertQuery() { + String expectedQuery = String.format("select * from %s order by %s", TEST_TABLE_SPEC, + TEST_ORDER_BY); + + String generatedQuery = format.generateQuery(null, null, + null, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, TEST_ORDER_BY, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionNotRequired_WithDates_ShouldAssertQuery() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, + TEST_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionRequired_WithFilterOnly_ShouldAssertQuery() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where %s and (%s)", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_CONDITION, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_RangePartitionRequiredAndFilter() { + when(mockTableDefinition.getRangePartitioning()).thenReturn(mockRangePartitioning); + when(mockRangePartitioning.getField()).thenReturn("range_col"); + + String expectedQuery = String.format("select * from %s where %s and (%s)", + TEST_TABLE_SPEC, TEST_DEFAULT_RANGE_CONDITION, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, TEST_FILTER, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_RangePartitionRequiredWithLimit() { + when(mockTableDefinition.getRangePartitioning()).thenReturn(mockRangePartitioning); + when(mockRangePartitioning.getField()).thenReturn("range_col"); + + String expectedQuery = String.format("select * from %s where %s limit %s", + TEST_TABLE_SPEC, TEST_DEFAULT_RANGE_CONDITION, TEST_LIMIT); + + String generatedQuery = format.generateQuery(null, null, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_LIMIT, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimeUnitPartitionRequiredAndFilter() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(TEST_TIME_UNIT_COL); + + String expectedQuery = String.format("select * from %s where %s and (%s)", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_UNIT_CONDITION, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, TEST_FILTER, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimeUnitPartitionRequiredWithLimit() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(TEST_TIME_UNIT_COL); + + String expectedQuery = String.format("select * from %s where %s limit %s", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_UNIT_CONDITION, TEST_LIMIT); + + String generatedQuery = format.generateQuery(null, null, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_LIMIT, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); } } diff --git a/widgets/BigQueryTable-batchsource.json b/widgets/BigQueryTable-batchsource.json index 8f0e7f9d8a..773d74f98d 100644 --- a/widgets/BigQueryTable-batchsource.json +++ b/widgets/BigQueryTable-batchsource.json @@ -151,6 +151,18 @@ "placeholder": "" } }, + { + "widget-type": "textbox", + "name": "orderBy", + "label": "Order By", + "widget-attributes": {"placeholder": "Column(s) to order by, e.g., 'name asc, age desc'"} + }, + { + "widget-type": "textbox", + "name": "limit", + "label": "Limit", + "widget-attributes": {"placeholder": "Maximum number of rows to read"} + }, { "widget-type": "textbox", "label": "Temporary Bucket Name",