Skip to content

Commit c797f85

Browse files
jim0987795064FrankYang0529
authored andcommitted
KAFKA-19642 Replace dynamicPerBrokerConfigs with dynamicDefaultConfigs (#20405)
- **Changes**: Replace misused dynamicPerBrokerConfigs with dynamicDefaultConfigs - **Reasons**: KRaft servers don't handle the cluser-level configs in starting from: https://github.com/apache/kafka/pull/18949/files#r2296809389 Reviewers: Jun Rao <junrao@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com> --------- Co-authored-by: PoAn Yang <payang@apache.org>
1 parent 7ffd693 commit c797f85

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ object DynamicBrokerConfig {
239239
}
240240
}
241241
val configHandler = new BrokerConfigHandler(config, quotaManagers)
242-
configHandler.processConfigChanges("", dynamicPerBrokerConfigs)
242+
configHandler.processConfigChanges("", dynamicDefaultConfigs)
243243
configHandler.processConfigChanges(config.brokerId.toString, dynamicPerBrokerConfigs)
244244
}
245245
}

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,16 +1100,29 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
11001100
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
11011101
@MethodSource(Array("getTestGroupProtocolParametersAll"))
11021102
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(groupProtocol: String): Unit = {
1103+
TestNumReplicaFetcherMetricsReporter.testReporters.clear()
1104+
11031105
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
11041106
val props = defaultStaticConfig(numServers)
11051107
props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
1108+
props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, classOf[TestNumReplicaFetcherMetricsReporter].getName)
1109+
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
11061110

11071111
val kafkaConfig = KafkaConfig.fromProps(props)
11081112
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
11091113
servers += newBroker
11101114

11111115
alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal))
11121116

1117+
// Add num.replica.fetchers to the cluster-level config.
1118+
val clusterLevelProps = new Properties
1119+
clusterLevelProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2")
1120+
reconfigureServers(clusterLevelProps, perBrokerConfig = false, (ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2"))
1121+
1122+
// Wait for the metrics reporter to be configured
1123+
val initialReporter = TestNumReplicaFetcherMetricsReporter.waitForReporters(1).head
1124+
initialReporter.verifyState(reconfigureCount = 1, numFetcher = 2)
1125+
11131126
TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
11141127

11151128
TestUtils.waitUntilTrue(
@@ -1122,11 +1135,19 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
11221135
newBroker.shutdown()
11231136
newBroker.awaitShutdown()
11241137

1138+
// Clean up the test reporter
1139+
TestNumReplicaFetcherMetricsReporter.testReporters.clear()
1140+
11251141
val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
11261142
invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, KEYSTORE_PROPS, listenerPrefix(SecureExternal)))
11271143
newBroker.config.updateCurrentConfig(KafkaConfig.fromProps(invalidStaticConfigs))
11281144

11291145
newBroker.startup()
1146+
1147+
// Verify that the custom MetricsReporter is not reconfigured after restart.
1148+
// If readDynamicBrokerConfigsFromSnapshot works correctly, the reporter should maintain its state.
1149+
val reporterAfterRestart = TestNumReplicaFetcherMetricsReporter.waitForReporters(1).head
1150+
reporterAfterRestart.verifyState(reconfigureCount = 0, numFetcher = 2)
11301151
}
11311152

11321153
private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
@@ -1635,6 +1656,64 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
16351656
}
16361657
}
16371658

1659+
object TestNumReplicaFetcherMetricsReporter {
1660+
val testReporters = new ConcurrentLinkedQueue[TestNumReplicaFetcherMetricsReporter]()
1661+
1662+
def waitForReporters(count: Int): List[TestNumReplicaFetcherMetricsReporter] = {
1663+
TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics reporters size not matched. Expected: " + count + ", actual: " + testReporters.size())
1664+
1665+
val reporters = testReporters.asScala.toList
1666+
TestUtils.waitUntilTrue(() => reporters.forall(_.configureCount == 1), msg = "Metrics reporters not configured")
1667+
reporters
1668+
}
1669+
}
1670+
1671+
1672+
class TestNumReplicaFetcherMetricsReporter extends MetricsReporter {
1673+
import TestNumReplicaFetcherMetricsReporter._
1674+
@volatile var configureCount = 0
1675+
@volatile var reconfigureCount = 0
1676+
@volatile var numFetchers: Int = 1
1677+
testReporters.add(this)
1678+
1679+
override def init(metrics: util.List[KafkaMetric]): Unit = {
1680+
}
1681+
1682+
override def configure(configs: util.Map[String, _]): Unit = {
1683+
configureCount += 1
1684+
numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
1685+
}
1686+
1687+
override def metricChange(metric: KafkaMetric): Unit = {
1688+
}
1689+
1690+
override def metricRemoval(metric: KafkaMetric): Unit = {
1691+
}
1692+
1693+
override def reconfigurableConfigs(): util.Set[String] = {
1694+
util.Set.of(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
1695+
}
1696+
1697+
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
1698+
val numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
1699+
if (numFetchers <= 0)
1700+
throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers")
1701+
}
1702+
1703+
override def reconfigure(configs: util.Map[String, _]): Unit = {
1704+
reconfigureCount += 1
1705+
numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
1706+
}
1707+
1708+
override def close(): Unit = {
1709+
}
1710+
1711+
def verifyState(reconfigureCount: Int, numFetcher: Int = 1): Unit = {
1712+
assertEquals(reconfigureCount, this.reconfigureCount)
1713+
assertEquals(numFetcher, this.numFetchers)
1714+
}
1715+
}
1716+
16381717

16391718
class MockFileConfigProvider extends FileConfigProvider {
16401719
@throws(classOf[IOException])

0 commit comments

Comments
 (0)