Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions spark-extension-shims-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
<artifactId>auron-spark-ui_${scalaVersion}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scalaVersion}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scalaVersion}</artifactId>
<scope>provided</scope>
</dependency>
Comment on lines +39 to +47
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-hive_${scalaVersion} is declared twice: once with default scope (compile) and again with <scope>provided</scope>. This can unintentionally bundle Spark Hive classes into this module’s artifact and/or cause dependency resolution conflicts. Keep a single dependency entry with the intended scope.

Copilot uses AI. Check for mistakes.
<dependency>
<groupId>org.apache.auron</groupId>
<artifactId>auron-common_${scalaVersion}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution.auron.plan

import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.AuronConverters.getBooleanConf
import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.execution.HiveTableScanExec

class HiveConvertProvider extends AuronConvertProvider with Logging {
override def isEnabled: Boolean =
getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)

def enableHiveTableScanExec: Boolean =
getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false)
Comment on lines +27 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be adding these to @SparkAuronConfiguration.java?


Comment on lines +26 to +32
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveConvertProvider is discovered via ServiceLoader (AuronConverters loads AuronConvertProvider implementations). This module currently doesn’t include a META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider entry, so the provider won’t be loaded at runtime (see thirdparty/auron-paimon for the existing pattern).

Copilot uses AI. Check for mistakes.
override def isSupported(exec: SparkPlan): Boolean =
exec match {
case e: HiveTableScanExec if enableHiveTableScanExec &&
e.relation.tableMeta.provider.isDefined &&
e.relation.tableMeta.provider.get.equals("hive") =>
true
Comment on lines +33 to +38
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSupported accepts all Hive tables with provider == "hive", but NativeHiveTableScanExec only builds native nodes for ORC/Parquet and otherwise will throw (e.g., MatchError on file format). Add an explicit format check here (or make the native exec gracefully fall back) to avoid runtime failures on non-ORC/Parquet Hive tables.

Copilot uses AI. Check for mistakes.
case _ => false
}
Comment on lines +33 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly hard to distinguish when to use isSupported v/s enableHiveTableScanExec flag. Do you mind adding documentation here?


override def convert(exec: SparkPlan): SparkPlan = {
exec match {
case hiveExec: HiveTableScanExec if enableHiveTableScanExec =>
convertHiveTableScanExec(hiveExec)
case _ => exec
}
}

def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = {
AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec))
}
Comment on lines +50 to +52
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are existing query/operator validation test utilities in this module, but this new Hive scan conversion path doesn’t appear to have coverage. Add at least one suite that enables spark.auron.enable.hiveTableScanExec and verifies HiveTableScanExec is converted (and that unsupported formats don’t break execution).

Copilot uses AI. Check for mistakes.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution.auron.plan

import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.{protobuf => pb}
Comment on lines +17 to +20
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File name NativeHIveTableScanExec.scala has inconsistent casing ("HIve") compared to the class NativeHiveTableScanExec. On case-sensitive filesystems this is still valid but is easy to miss/grep incorrectly; consider renaming the file to NativeHiveTableScanExec.scala for consistency.

Copilot uses AI. Check for mistakes.
import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.auron.{NativeRDD, Shims}
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
import org.apache.spark.{Partition, TaskContext}

import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
extends NativeHiveTableScanBase(basedHiveScan)
with Logging {

@transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta)
@transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
@transient private lazy val nativeTableDesc = new TableDesc(
nativeTable.getInputFormatClass,
nativeTable.getOutputFormatClass,
nativeTable.getMetadata)

@transient private lazy val nativeHadoopConf = {
val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf()
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nativeHadoopConf uses SparkSession.getActiveSession.get, which can throw if there is no active session (e.g., execution triggered outside a SQL context). Consider using the same session derivation as NativeHiveTableScanBase.broadcastedHadoopConf (Shims.get.getSqlContext(basedHiveScan).sparkSession) to avoid runtime failures.

Suggested change
val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf()
val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
val hiveConf = sparkSession.sessionState.newHadoopConf()

Copilot uses AI. Check for mistakes.
// append columns ids and names before broadcast
val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer)
val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)

HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)

val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
deserializer.initialize(hiveConf, nativeTableDesc.getProperties)

// Specifies types and object inspectors of columns to be scanned.
val structOI = ObjectInspectorUtils
.getStandardObjectInspector(
deserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]

val columnTypeNames = structOI
.getAllStructFieldRefs.asScala
.map(_.getFieldObjectInspector)
.map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
.mkString(",")

hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(","))
hiveConf
}

private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
Comment on lines +91 to +95
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minPartitions reads SparkSession.getActiveSession.get.sparkContext multiple times. Besides the .get risk, it’s also inconsistent with other native scan implementations in this repo that pass an explicit sparkSession around. Prefer using a single sparkSession resolved from basedHiveScan and derive sparkContext from it.

Copilot uses AI. Check for mistakes.
}

private val ignoreEmptySplits =
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
Comment on lines +91 to +99
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoreEmptySplits also depends on SparkSession.getActiveSession.get. This should use the same non-optional session/context resolution as the rest of the execution code to avoid NoSuchElementException when there is no active session.

Suggested change
private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
}
private val ignoreEmptySplits =
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
private val minPartitions = if (sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
sparkContext.defaultMinPartitions)
}
private val ignoreEmptySplits =
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

Copilot uses AI. Check for mistakes.

override val nodeName: String =
s"NativeHiveTableScan $tableName"

override def doExecuteNative(): NativeRDD = {
val nativeMetrics = SparkMetricNode(
metrics,
Nil,
Some({
case ("bytes_scanned", v) =>
val inputMetric = TaskContext.get.taskMetrics().inputMetrics
inputMetric.incBytesRead(v)
case ("output_rows", v) =>
val inputMetric = TaskContext.get.taskMetrics().inputMetrics
inputMetric.incRecordsRead(v)
case _ =>
}))
val nativeFileSchema = this.nativeFileSchema
val nativeFileGroups = this.nativeFileGroups
val nativePartitionSchema = this.nativePartitionSchema

val projection = schema.map(field => relation.schema.fieldIndex(field.name))
val broadcastedHadoopConf = this.broadcastedHadoopConf
val numPartitions = partitions.length

new NativeRDD(
sparkContext,
nativeMetrics,
partitions.asInstanceOf[Array[Partition]],
None,
Nil,
rddShuffleReadFull = true,
(partition, _) => {
val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
putJniBridgeResource(resourceId, broadcastedHadoopConf)

val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition])
val nativeFileScanConf = pb.FileScanExecConf
.newBuilder()
.setNumPartitions(numPartitions)
.setPartitionIndex(partition.index)
.setStatistics(pb.Statistics.getDefaultInstance)
.setSchema(nativeFileSchema)
.setFileGroup(nativeFileGroup)
.addAllProjection(projection.map(Integer.valueOf).asJava)
.setPartitionSchema(nativePartitionSchema)
.build()
fileFormat match {
case "orc" =>
val nativeOrcScanExecBuilder = pb.OrcScanExecNode
.newBuilder()
.setBaseConf(nativeFileScanConf)
.setFsResourceId(resourceId)
.addAllPruningPredicates(new java.util.ArrayList()) // not support this filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will do a full table scan here? If that's a case, do you mind creating an issue and linking it here?

Comment on lines +147 to +153
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileFormat match only has cases for "orc" and "parquet". Since HiveTableUtil.getFileFormat can return "other", this can throw a MatchError at runtime. Add a default case that either throws a clear unsupported-format error or triggers a non-native fallback.

Copilot uses AI. Check for mistakes.
pb.PhysicalPlanNode
.newBuilder()
.setOrcScan(nativeOrcScanExecBuilder.build())
.build()
case "parquet" =>
val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
.newBuilder()
.setBaseConf(nativeFileScanConf)
.setFsResourceId(resourceId)
.addAllPruningPredicates(new java.util.ArrayList()) // not support this filter

pb.PhysicalPlanNode
.newBuilder()
.setParquetScan(nativeParquetScanExecBuilder.build())
.build()
}
},
friendlyName = "NativeRDD.HiveTableScan")
}

override def getFilePartitions(): Array[FilePartition] = {
val newJobConf = new JobConf(nativeHadoopConf)
val arrayFilePartition = ArrayBuffer[FilePartition]()
val partitionedFiles = if (relation.isPartitioned) {
val partitions = basedHiveScan.prunedPartitions
val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
partitions.foreach { partition =>
val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
val partPath = partition.getDataLocation
HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf)
val partitionValues = partition.getTPartition.getValues

val partitionInternalRow = new GenericInternalRow(partitionValues.size())
for (partitionIndex <- 0 until partitionValues.size) {
partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex))
}
Comment on lines +186 to +189
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition values are populated into GenericInternalRow using raw strings from Hive metastore (partition.getTPartition.getValues). Spark scan planning usually casts these strings to the partition schema types (and handles DEFAULT_PARTITION_NAME -> null / time zones). Without that casting, partition values/types may be incorrect. Consider building the partition InternalRow via the same cast/toRow approach used in NativePaimonTableScanExec (or existing Spark/Hive utilities).

Copilot uses AI. Check for mistakes.

val inputFormatClass = partDesc.getInputFileFormatClass
.asInstanceOf[Class[newInputClass[Writable, Writable]]]
arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow)
}
Comment on lines +191 to +194
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In getFilePartitions, arrayPartitionedFile is an ArrayBuffer[PartitionedFile], but the code uses += getArrayPartitionedFile(...) where getArrayPartitionedFile returns an ArrayBuffer[PartitionedFile]. This is a type mismatch and won’t compile; use ++= (or change the helper to return a single PartitionedFile).

Copilot uses AI. Check for mistakes.
arrayPartitionedFile
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
.toArray
} else {
val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]]
getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0))
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
.toArray
}
arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
partitionedFiles,
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
Comment on lines +204 to +206
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...) again relies on getActiveSession.get. Use the sparkSession derived from basedHiveScan (as in NativeHiveTableScanBase) so partition planning doesn’t fail when there’s no active session.

Suggested change
arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
partitionedFiles,
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
val sparkSession = basedHiveScan.sparkSession
arrayFilePartition += FilePartition.getFilePartitions(
sparkSession,
partitionedFiles,
getMaxSplitBytes(sparkSession)
).toArray

Copilot uses AI. Check for mistakes.
arrayFilePartition.toArray
}

private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
Math.min(defaultMaxSplitBytes, openCostInBytes)
Comment on lines +206 to +213
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getMaxSplitBytes currently returns min(filesMaxPartitionBytes, filesOpenCostInBytes), which can drastically shrink splits and create excessive partitions. Elsewhere in this repo (NativePaimonTableScanExec) you fork Spark’s FilePartition#maxSplitBytes logic using min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore)). Align this implementation to that logic (or call the shared helper) to avoid performance regressions.

Suggested change
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
arrayFilePartition.toArray
}
private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
Math.min(defaultMaxSplitBytes, openCostInBytes)
getMaxSplitBytes(SparkSession.getActiveSession.get, partitionedFiles)).toArray
arrayFilePartition.toArray
}
private def getMaxSplitBytes(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val totalBytes = partitionedFiles.map(_.length).sum
val parallelism = math.max(1, sparkSession.sparkContext.defaultParallelism)
val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes / parallelism
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

Copilot uses AI. Check for mistakes.
}

private def getArrayPartitionedFile(newJobConf: JobConf,
inputFormatClass: Class[newInputClass[Writable, Writable]],
partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = {
val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions)
Comment on lines +216 to +219
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getArrayPartitionedFile declares inputFormatClass as a Hadoop mapreduce.InputFormat (newInputClass), but the implementation instantiates/casts it to the old mapred.InputFormat and uses JobConf/getSplits(JobConf, Int). This API mismatch is unsafe (compile-time and runtime). Use a consistent InputFormat API (likely org.apache.hadoop.mapred.InputFormat given JobConf/FileSplit usage).

Copilot uses AI. Check for mistakes.
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
inputFormatClass match {
case OrcInputFormat =>
case MapredParquetInputFormat =>
case _ =>
}
Comment on lines +225 to +229
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputFormatClass match { case OrcInputFormat => ... } is matching a Class[_] value against a class name, and the cases are empty. If you need special handling by input format, compare against classOf[OrcInputFormat] / classOf[MapredParquetInputFormat] and implement the intended behavior; otherwise remove this dead code block.

Suggested change
inputFormatClass match {
case OrcInputFormat =>
case MapredParquetInputFormat =>
case _ =>
}

Copilot uses AI. Check for mistakes.
val arrayFilePartition = ArrayBuffer[PartitionedFile]()
for (i <- 0 until inputSplits.size) {
val inputSplit = inputSplits(i)
inputSplit match {
case FileSplit =>
val orcInputSplit = inputSplit.asInstanceOf[FileSplit]
Comment on lines +234 to +235
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputSplit match { case FileSplit => ... } won’t work: FileSplit is a class, not a matchable singleton. Use a typed pattern like case fs: FileSplit => and avoid re-casting the same value immediately after matching.

Suggested change
case FileSplit =>
val orcInputSplit = inputSplit.asInstanceOf[FileSplit]
case orcInputSplit: FileSplit =>

Copilot uses AI. Check for mistakes.
arrayFilePartition +=
Shims.get.getPartitionedFile(partitionInternalRow, orcInputSplit.getPath.toString,
orcInputSplit.getStart, orcInputSplit.getLength)
}
}
arrayFilePartition
}

private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]):
InputFormat[Writable, Writable] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[Writable, Writable]]
newInputFormat match {
Comment on lines +244 to +248
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getInputFormat takes a mapreduce.InputFormat class (newInputClass) but returns/instantiates org.apache.hadoop.mapred.InputFormat. This signature mismatch makes the unchecked cast even riskier. Align the parameter type with the returned InputFormat type (or vice versa) so the compiler can help enforce correctness.

Copilot uses AI. Check for mistakes.
case c: Configurable => c.setConf(conf)
case _ =>
}
newInputFormat
}

}

object HiveTableUtil {
private val orcFormat = "OrcInputFormat"
private val parquetFormat = "MapredParquetInputFormat"

def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = {
if (inputFormatClass.getSimpleName.equalsIgnoreCase(orcFormat)) {
"orc"
} else if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) {
"parquet"
} else {
"other"
}
}

}
Loading