Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/BigQueryTable-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ name is not null', all output rows will have an 'age' over 50 and a value for th
This is the same as the WHERE clause in BigQuery. More information can be found at
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#where_clause

**Order By**: The column or list of columns to order the data by. For
example, `name asc, age desc`. More information can be found
at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#order_by_clause.

**Limit**: The maximum number of rows to read from the source table. More information can be
found
at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause.

**Enable Querying Views**: Whether to allow querying views. Since BigQuery views are not materialized
by default, querying them may have a performance overhead.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ private void configureBigQuerySource() {
if (config.getViewMaterializationDataset() != null) {
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
}
if (config.getOrderBy() != null) {
configuration.set(BigQueryConstants.CONFIG_ORDER_BY, config.getOrderBy());
}
if (config.getLimit() != null) {
configuration.set(BigQueryConstants.CONFIG_LIMIT, String.valueOf(config.getLimit()));
}
}

public Schema getSchema(FailureCollector collector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
private static final String VALID_DATE_FORMAT = "yyyy-MM-dd";
private static final String SCHEME = "gs://";
private static final String WHERE = "WHERE";
private static final String ORDER_BY = "ORDER BY";
public static final Set<Schema.Type> SUPPORTED_TYPES =
ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES,
Schema.Type.ARRAY, Schema.Type.RECORD);
Expand All @@ -70,6 +71,8 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
public static final String NAME_ENABLE_QUERYING_VIEWS = "enableQueryingViews";
public static final String NAME_VIEW_MATERIALIZATION_PROJECT = "viewMaterializationProject";
public static final String NAME_VIEW_MATERIALIZATION_DATASET = "viewMaterializationDataset";
public static final String NAME_LIMIT = "limit";
public static final String NAME_ORDER_BY = "orderBy";

@Name(Constants.Reference.REFERENCE_NAME)
@Nullable
Expand Down Expand Up @@ -110,6 +113,19 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
"and discards all rows that do not return TRUE (that is, rows that return FALSE or NULL).")
private String filter;

@Name(NAME_ORDER_BY)
@Macro
@Nullable
@Description("The ORDER BY clause sorts the results of a query based on one or more columns. " +
"For example, 'name asc, age desc'.")
private String orderBy;

@Name(NAME_LIMIT)
@Macro
@Nullable
@Description("The LIMIT clause restricts the number of rows returned by the query.")
private Long limit;

@Name(NAME_ENABLE_QUERYING_VIEWS)
@Macro
@Nullable
Expand Down Expand Up @@ -176,6 +192,13 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
if (!containsMacro(NAME_CMEK_KEY)) {
validateCmekKey(collector, arguments);
}

if (!containsMacro(NAME_LIMIT) && limit != null) {
if (limit < 0) {
collector.addFailure("Invalid limit value.", "Limit must be a non-negative number.")
.withConfigProperty(NAME_LIMIT);
}
}
}

void validateCmekKey(FailureCollector collector, Map<String, String> arguments) {
Expand Down Expand Up @@ -271,17 +294,44 @@ public String getPartitionTo() {

@Nullable
public String getFilter() {
if (filter != null) {
filter = filter.trim();
if (filter.isEmpty()) {
return null;
}
// remove the WHERE keyword from the filter if the user adds it at the begging of the expression
if (filter.toUpperCase().startsWith(WHERE)) {
filter = filter.substring(WHERE.length());
}
return cleanupSqlFragment(filter, WHERE);
}

@Nullable
public String getOrderBy() {
return cleanupSqlFragment(orderBy, ORDER_BY);
}

@Nullable
public Long getLimit() {
return limit;
}

/**
* Cleans up a SQL fragment by trimming whitespace and stripping a given keyword from the
* beginning of the string in a case-insensitive way.
*
* @param fragment The input SQL string fragment.
* @param keyword The SQL keyword to remove (e.g., "WHERE ", "ORDER BY ").
* @return The cleaned fragment, or null if the input was null or empty.
*/
@Nullable
private String cleanupSqlFragment(@Nullable String fragment, String keyword) {
if (Strings.isNullOrEmpty(fragment)) {
return null;
}

fragment = fragment.trim();

if (fragment.isEmpty()) {
return null;
}
return filter;

if (fragment.toUpperCase().startsWith(keyword)) {
fragment = fragment.substring(keyword.length()).trim();
}

return fragment.isEmpty() ? null : fragment;
}

public boolean isEnableQueryingViews() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition.Type;
import com.google.cloud.bigquery.TimePartitioning;
Expand Down Expand Up @@ -49,6 +50,8 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.GeneralSecurityException;
Expand All @@ -65,6 +68,7 @@
*/
public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat<LongWritable, GenericData.Record> {

private static final Logger LOG = LoggerFactory.getLogger(PartitionedBigQueryInputFormat.class);
private InputFormat<LongWritable, GenericData.Record> delegateInputFormat =
new AvroBigQueryInputFormatWithScopes();

Expand Down Expand Up @@ -128,17 +132,24 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null);
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
String limit = configuration.get(BigQueryConstants.CONFIG_LIMIT, null);
String orderBy = configuration.get(BigQueryConstants.CONFIG_ORDER_BY, null);

com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath);
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null);
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter();
StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition();

String query;
if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) {
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter);
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter,
limit, orderBy);
} else {
query = generateQuery(partitionFromDate, partitionToDate, filter, projectId, datasetProjectId, datasetId,
tableName, serviceAccount, isServiceAccountFilePath);
query = generateQuery(partitionFromDate, partitionToDate, filter, datasetProjectId,
datasetId,
tableName, limit, orderBy,
isPartitionFilterRequired, tableDefinition);
}

if (query != null) {
Expand All @@ -160,30 +171,41 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
}

@VisibleForTesting
String generateQuery(String partitionFromDate, String partitionToDate, String filter, String project,
String datasetProject, String dataset, String table, @Nullable String serviceAccount,
@Nullable Boolean isServiceAccountFilePath) {
if (partitionFromDate == null && partitionToDate == null && filter == null) {
String generateQuery(String partitionFromDate, String partitionToDate, String filter,
String datasetProject, String dataset, String table, String limit, String orderBy,
Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) {

if (Strings.isNullOrEmpty(filter) && Strings.isNullOrEmpty(orderBy) && Strings.isNullOrEmpty(
limit)
&& Strings.isNullOrEmpty(partitionFromDate) && Strings.isNullOrEmpty(partitionToDate)) {
return null;
}
String queryTemplate = "select * from `%s` where %s";
com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(datasetProject, dataset, table,
serviceAccount,
isServiceAccountFilePath);
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();

RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
if (timePartitioning == null && filter == null) {
return null;
}
StringBuilder condition = new StringBuilder();
String partitionCondition = null;

if (timePartitioning != null) {
String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
partitionToDate);
condition.append(timePartitionCondition);
if (partitionFromDate == null && partitionToDate == null
&& Objects.equals(isPartitionFilterRequired, Boolean.TRUE)) {
partitionCondition = BigQueryUtil.generateDefaultTimePartitionCondition(tableDefinition);
} else if (partitionFromDate != null || partitionToDate != null) {
partitionCondition =
BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
partitionToDate);
}
} else if (rangePartitioning != null && Objects.equals(isPartitionFilterRequired,
Boolean.TRUE)) {
partitionCondition = BigQueryUtil.generateDefaultRangePartitionCondition(
tableDefinition);
}

if (filter != null) {
if (!Strings.isNullOrEmpty(partitionCondition)) {
condition.append("(").append(partitionCondition).append(")");
}

if (!Strings.isNullOrEmpty(filter)) {
if (condition.length() == 0) {
condition.append(filter);
} else {
Expand All @@ -192,20 +214,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
}

String tableName = datasetProject + "." + dataset + "." + table;
return String.format(queryTemplate, tableName, condition.toString());
StringBuilder query = new StringBuilder("select * from ").append(tableName);

if (condition.length() > 0) {
query.append(" where ").append(condition);
}

if (!Strings.isNullOrEmpty(orderBy)) {
query.append(" order by ").append(orderBy);
}

if (!Strings.isNullOrEmpty(limit)) {
query.append(" limit ").append(limit);
}

LOG.debug("Generated BigQuery query for job: {}", query);
return query.toString();
}

@VisibleForTesting
String generateQueryForMaterializingView(String datasetProject, String dataset, String table, String filter) {
String queryTemplate = "select * from `%s`%s";
StringBuilder condition = new StringBuilder();

String generateQueryForMaterializingView(String datasetProject, String dataset, String table,
String filter, String limit, String orderBy) {
String tableName = String.format("`%s.%s.%s`", datasetProject, dataset, table);
StringBuilder query = new StringBuilder("select * from ").append(tableName);
if (!Strings.isNullOrEmpty(filter)) {
condition.append(String.format(" where %s", filter));
query.append(" where ").append(filter);
}

String tableName = datasetProject + "." + dataset + "." + table;
return String.format(queryTemplate, tableName, condition.toString());
if (!Strings.isNullOrEmpty(orderBy)) {
query.append(" order by ").append(orderBy);
}

if (!Strings.isNullOrEmpty(limit)) {
query.append(" limit ").append(limit);
}

return query.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface BigQueryConstants {
String CONFIG_TABLE_FIELDS = "cdap.bq.sink.table.fields";
String CONFIG_JSON_STRING_FIELDS = "cdap.bq.sink.json.string.fields";
String CONFIG_FILTER = "cdap.bq.source.filter";
String CONFIG_LIMIT = "cdap.bq.source.limit";
String CONFIG_ORDER_BY = "cdap.bq.source.order.by";
String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter";
String CONFIG_JOB_ID = "cdap.bq.sink.job.id";
String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project";
Expand Down
40 changes: 39 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
Expand Down Expand Up @@ -83,7 +84,7 @@ public final class BigQueryUtil {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class);

private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
public static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix";

public static final String BUCKET_PATTERN = "[a-z0-9._-]+";
Expand Down Expand Up @@ -824,6 +825,43 @@ public static String generateTimePartitionCondition(StandardTableDefinition tabl
return timePartitionCondition.toString();
}

/**
* Generates a default "IS NOT NULL OR IS NULL" partition condition for a time-partitioned table.
*
* @param tableDefinition The definition of the table.
* @return The SQL condition string or an empty string if no condition is needed.
*/
public static String generateDefaultTimePartitionCondition(
StandardTableDefinition tableDefinition) {
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
if (timePartitioning == null) {
return StringUtils.EMPTY;
}

String columnName = timePartitioning.getField() != null ?
timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME;

return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName);
}

/**
* Generates a default "IS NOT NULL OR IS NULL" partition condition for a range-partitioned
* table.
*
* @param tableDefinition The definition of the table.
* @return The SQL condition string or an empty string if no condition is needed.
*/
public static String generateDefaultRangePartitionCondition(
StandardTableDefinition tableDefinition) {
RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
if (rangePartitioning == null || Strings.isNullOrEmpty(rangePartitioning.getField())) {
return StringUtils.EMPTY;
}

String columnName = rangePartitioning.getField();
return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName);
}

/**
* Get fully-qualified name (FQN) for a BQ table (FQN format:
* bigquery:{projectId}.{datasetId}.{tableId}).
Expand Down
Loading
Loading