Spark实践
动态加载hadoop环境
def chooseHadoopEnv(sparkBuilder: SparkSession.Builder, hiveConfDir: String, hadoopConfDir: String) = {
val configuration: Configuration = new Configuration()
// 这里的文件地址可以换成从数据库里查询
val core = new Path(s"${hadoopConfDir}/core-site.xml")
val hdfs = new Path(s"${hadoopConfDir}/hdfs-site.xml")
val yarn = new Path(s"${hadoopConfDir}/yarn-site.xml")
val hive = new Path(s"${hiveConfDir}/hive-site.xml")
configuration.addResource(core)
configuration.addResource(hdfs)
configuration.addResource(yarn)
configuration.addResource(hive)
for (c <- configuration) {
sparkBuilder.config(c.getKey, c.getValue)
}
sparkBuilder
}
/**
* 选择不同的hadoop环境
*
*/
def chooseHive(sparkBuilder: SparkSession.Builder, hiveMetaStoreUri: String) = {
sparkBuilder.config("hive.metastore.uris", hiveMetaStoreUri)
}
def chooseHadoop(spark: SparkSession, nameSpace: String, nn1: String, nn1Addr: String, nn2: String, nn2Addr: String)
= {
val sc: SparkContext = spark.sparkContext
sc.hadoopConfiguration.set(s"fs.defaultFS", s"hdfs://$nameSpace")
sc.hadoopConfiguration.set(s"dfs.nameservices", nameSpace)
sc.hadoopConfiguration.set(s"dfs.ha.namenodes.$nameSpace", s"$nn1,$nn2")
sc.hadoopConfiguration.set(s"dfs.namenode.rpc-address.$nameSpace.$nn1", nn1Addr)
sc.hadoopConfiguration.set(s"dfs.namenode.rpc-address.$nameSpace.$nn2", nn2Addr)
sc.hadoopConfiguration.set(s"dfs.client.failover.proxy.provider.$nameSpace", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
}
Tez类找不到问题:移除hive-site.xml tez查询引擎配置即可
Spark Structured Streaming低版本Kafka并发访问问题
对同一个DF进行多次action操作偶发情况下会出现类似问题,本质是底层spark kafka维护kafkaConsumer Pool的时候Key没有携带ThreadId,导致同一个线程下并发创建Consumer的问题
版本信息
Spark:2.1.0
Kafka:0.10
Scala:2.11
异常堆栈
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
源码定位
KafkaConsumer
问题出现在根本原因
private void acquire() {
this.ensureNotClosed();
long threadId = Thread.currentThread().getId();
if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}
CachedKafkaConsumer
没创建一个CachedKafkaConsumer对象会创建一个KafkaConsumer
import CachedKafkaConsumer._
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
private var consumer = createConsumer
/** indicates whether this consumer is in use or not */
private var inuse = true
/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
private var nextOffsetInFetchedData = UNKNOWN_OFFSET
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
val tps = new ju.ArrayList[TopicPartition]()
tps.add(topicPartition)
c.assign(tps)
c
}
/**
* 问题代码,并发问题导致在判断key的时候可能thread 1和 thread 2同时判断该key不包含,然后都去创建
* kafkaConsumer,然后就导致同一个thraedId下创建多个kafkaConsumer
* @param topic
* @param partition
* @param kafkaParams
* @return
*/
def getOrCreate(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) {
removeKafkaConsumer(topic, partition, kafkaParams)
val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
consumer.inuse = true
cache.put(key, consumer)
consumer
} else {
//fixme 并发问题
if (!cache.containsKey(key)) {
cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
}
val consumer = cache.get(key)
consumer.inuse = true
consumer
}
}
修复版本
采用原子性操作
def getOrCreate(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) {
removeKafkaConsumer(topic, partition, kafkaParams)
val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
consumer.inuse = true
cache.put(key, consumer)
consumer
} else {
// if (!cache.containsKey(key)) {
// cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
// }
// val consumer = cache.get(key)
// fix thread safe problem
val consumer = cache.putIfAbsent(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
consumer.inuse = true
consumer
}
}
最后更新于