1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
| logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
LogManager.apply()
class LogManager(logDirs: Seq[File], initialOfflineDirs: Seq[File], val topicConfigs: Map[String, LogConfig], val initialDefaultConfig: LogConfig, val cleanerConfig: CleanerConfig, recoveryThreadsPerDataDir: Int, val flushCheckMs: Long, val flushRecoveryOffsetCheckpointMs: Long, val flushStartOffsetCheckpointMs: Long, val retentionCheckMs: Long, val maxPidExpirationMs: Int, scheduler: Scheduler, val brokerState: BrokerState, brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel, time: Time) extends Logging with KafkaMetricsGroup { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
private val currentLogs = new Pool[TopicPartition, Log]() private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap private def createAndValidateLogDirs(dirs: Seq[File], initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File] = { val liveLogDirs = new ConcurrentLinkedQueue[File]() val canonicalPaths = mutable.HashSet.empty[String]
for (dir <- dirs) { try { if (initialOfflineDirs.contains(dir)) throw new IOException(s"Failed to load ${dir.getAbsolutePath} during broker startup")
if (!dir.exists) { info(s"Log directory ${dir.getAbsolutePath} not found, creating it.") val created = dir.mkdirs() if (!created) throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}") } if (!dir.isDirectory || !dir.canRead) throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
if (!canonicalPaths.add(dir.getCanonicalPath)) throw new KafkaException(s"Duplicate log directory found: ${dirs.mkString(", ")}")
liveLogDirs.add(dir) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Failed to create or validate data directory ${dir.getAbsolutePath}", e) } } if (liveLogDirs.isEmpty) { fatal(s"Shutdown broker because none of the specified log dirs from ${dirs.mkString(", ")} can be created or validated") Exit.halt(1) }
liveLogDirs } private def loadLogs(): Unit = { info("Loading logs.") val startMs = time.milliseconds val threadPools = ArrayBuffer.empty[ExecutorService] val offlineDirs = mutable.Set.empty[(String, IOException)] val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- liveLogDirs) { try { val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool)
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) { debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory: ${dir.getAbsolutePath}") } else { brokerState.newState(RecoveringFromUncleanShutdown) }
var recoveryPoints = Map[TopicPartition, Long]() try { recoveryPoints = this.recoveryPointCheckpoints(dir).read } catch { case e: Exception => warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e) warn("Resetting the recovery checkpoint to 0") }
var logStartOffsets = Map[TopicPartition, Long]() try { logStartOffsets = this.logStartOffsetCheckpoints(dir).read } catch { case e: Exception => warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e) }
val jobsForDir = for { dirContent <- Option(dir.listFiles).toList logDir <- dirContent if logDir.isDirectory } yield { CoreUtils.runnable { try { loadLog(logDir, recoveryPoints, logStartOffsets) } catch { case e: IOException => offlineDirs.add((dir.getAbsolutePath, e)) error("Error while loading log dir " + dir.getAbsolutePath, e) } } } jobs(cleanShutdownFile) = jobsForDir.map(pool.submit) } catch { case e: IOException => offlineDirs.add((dir.getAbsolutePath, e)) error("Error while loading log dir " + dir.getAbsolutePath, e) } } try { for ((cleanShutdownFile, dirJobs) <- jobs) { dirJobs.foreach(_.get) try { cleanShutdownFile.delete() } catch { case e: IOException => offlineDirs.add((cleanShutdownFile.getParent, e)) error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e) } }
offlineDirs.foreach { case (dir, e) => logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e) } } catch { case e: ExecutionException => error("There was an error in one of the threads during logs loading: " + e.getCause) throw e.getCause } finally { threadPools.foreach(_.shutdown()) }
info(s"Logs loading complete in ${time.milliseconds - startMs} ms.") } }
|