@@ -53,7 +53,7 @@ import org.apache.spark.sql.execution.streaming.{Sink, Source}
53
53
import org .apache .spark .sql .execution .streaming .runtime ._
54
54
import org .apache .spark .sql .execution .streaming .sinks .FileStreamSink
55
55
import org .apache .spark .sql .execution .streaming .sources .{RateStreamProvider , TextSocketSourceProvider }
56
- import org .apache .spark .sql .internal .SQLConf
56
+ import org .apache .spark .sql .internal .{ SessionStateHelper , SQLConf }
57
57
import org .apache .spark .sql .sources ._
58
58
import org .apache .spark .sql .streaming .OutputMode
59
59
import org .apache .spark .sql .types .{DataType , StructField , StructType }
@@ -100,12 +100,14 @@ case class DataSource(
100
100
partitionColumns : Seq [String ] = Seq .empty,
101
101
bucketSpec : Option [BucketSpec ] = None ,
102
102
options : Map [String , String ] = Map .empty,
103
- catalogTable : Option [CatalogTable ] = None ) extends Logging {
103
+ catalogTable : Option [CatalogTable ] = None ) extends SessionStateHelper with Logging {
104
104
105
105
case class SourceInfo (name : String , schema : StructType , partitionColumns : Seq [String ])
106
106
107
+ private val conf : SQLConf = getSqlConf(sparkSession)
108
+
107
109
lazy val providingClass : Class [_] = {
108
- val cls = DataSource .lookupDataSource(className, sparkSession.sessionState. conf)
110
+ val cls = DataSource .lookupDataSource(className, conf)
109
111
// `providingClass` is used for resolving data source relation for catalog tables.
110
112
// As now catalog for data source V2 is under development, here we fall back all the
111
113
// [[FileDataSourceV2]] to [[FileFormat]] to guarantee the current catalog works.
@@ -120,8 +122,7 @@ case class DataSource(
120
122
121
123
private [sql] def providingInstance (): Any = providingClass.getConstructor().newInstance()
122
124
123
- private def newHadoopConfiguration (): Configuration =
124
- sparkSession.sessionState.newHadoopConfWithOptions(options)
125
+ private def newHadoopConfiguration (): Configuration = getHadoopConf(sparkSession, options)
125
126
126
127
private def makeQualified (path : Path ): Path = {
127
128
val fs = path.getFileSystem(newHadoopConfiguration())
@@ -130,7 +131,7 @@ case class DataSource(
130
131
131
132
lazy val sourceInfo : SourceInfo = sourceSchema()
132
133
private val caseInsensitiveOptions = CaseInsensitiveMap (options)
133
- private val equality = sparkSession.sessionState. conf.resolver
134
+ private val equality = conf.resolver
134
135
135
136
/**
136
137
* Whether or not paths should be globbed before being used to access files.
@@ -262,7 +263,7 @@ case class DataSource(
262
263
}
263
264
}
264
265
265
- val isSchemaInferenceEnabled = sparkSession.sessionState. conf.streamingSchemaInference
266
+ val isSchemaInferenceEnabled = conf.streamingSchemaInference
266
267
val isTextSource = providingClass == classOf [text.TextFileFormat ]
267
268
val isSingleVariantColumn = (providingClass == classOf [json.JsonFileFormat ] ||
268
269
providingClass == classOf [csv.CSVFileFormat ]) &&
@@ -281,8 +282,7 @@ case class DataSource(
281
282
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false , checkFilesExist = false )
282
283
createInMemoryFileIndex(globbedPaths)
283
284
})
284
- val forceNullable = sparkSession.sessionState.conf
285
- .getConf(SQLConf .FILE_SOURCE_SCHEMA_FORCE_NULLABLE )
285
+ val forceNullable = conf.getConf(SQLConf .FILE_SOURCE_SCHEMA_FORCE_NULLABLE )
286
286
val sourceDataSchema = if (forceNullable) dataSchema.asNullable else dataSchema
287
287
SourceInfo (
288
288
s " FileSource[ $path] " ,
@@ -381,7 +381,7 @@ case class DataSource(
381
381
if FileStreamSink .hasMetadata(
382
382
caseInsensitiveOptions.get(" path" ).toSeq ++ paths,
383
383
newHadoopConfiguration(),
384
- sparkSession.sessionState. conf) =>
384
+ conf) =>
385
385
val basePath = new Path ((caseInsensitiveOptions.get(" path" ).toSeq ++ paths).head)
386
386
val fileCatalog = new MetadataLogFileIndex (sparkSession, basePath,
387
387
caseInsensitiveOptions, userSpecifiedSchema)
@@ -407,11 +407,11 @@ case class DataSource(
407
407
408
408
// This is a non-streaming file based datasource.
409
409
case (format : FileFormat , _) =>
410
- val useCatalogFileIndex = sparkSession.sessionState. conf.manageFilesourcePartitions &&
410
+ val useCatalogFileIndex = conf.manageFilesourcePartitions &&
411
411
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
412
412
catalogTable.get.partitionColumnNames.nonEmpty
413
413
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
414
- val defaultTableSize = sparkSession.sessionState. conf.defaultSizeInBytes
414
+ val defaultTableSize = conf.defaultSizeInBytes
415
415
val index = new CatalogFileIndex (
416
416
sparkSession,
417
417
catalogTable.get,
@@ -475,7 +475,7 @@ case class DataSource(
475
475
throw QueryExecutionErrors .dataPathNotSpecifiedError()
476
476
}
477
477
478
- val caseSensitive = sparkSession.sessionState. conf.caseSensitiveAnalysis
478
+ val caseSensitive = conf.caseSensitiveAnalysis
479
479
PartitioningUtils .validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
480
480
481
481
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
@@ -531,7 +531,7 @@ case class DataSource(
531
531
disallowWritingIntervals(
532
532
outputColumns.toStructType.asNullable, format.toString, forbidAnsiIntervals = false )
533
533
val cmd = planForWritingFileFormat(format, mode, data)
534
- val qe = sparkSession. sessionState.executePlan(cmd)
534
+ val qe = sessionState(sparkSession) .executePlan(cmd)
535
535
qe.assertCommandExecuted()
536
536
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
537
537
copy(userSpecifiedSchema = Some (outputColumns.toStructType.asNullable)).resolveRelation()
@@ -555,7 +555,7 @@ case class DataSource(
555
555
SaveIntoDataSourceCommand (data, dataSource, caseInsensitiveOptions, mode)
556
556
case format : FileFormat =>
557
557
disallowWritingIntervals(data.schema, format.toString, forbidAnsiIntervals = false )
558
- DataSource .validateSchema(format.toString, data.schema, sparkSession.sessionState. conf)
558
+ DataSource .validateSchema(format.toString, data.schema, conf)
559
559
planForWritingFileFormat(format, mode, data)
560
560
case _ => throw SparkException .internalError(
561
561
s " ${providingClass.getCanonicalName} does not allow create table as select. " )
0 commit comments