@@ -1100,16 +1100,29 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
1100
1100
@ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedGroupProtocolNames )
1101
1101
@ MethodSource (Array (" getTestGroupProtocolParametersAll" ))
1102
1102
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs (groupProtocol : String ): Unit = {
1103
+ TestNumReplicaFetcherMetricsReporter .testReporters.clear()
1104
+
1103
1105
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
1104
1106
val props = defaultStaticConfig(numServers)
1105
1107
props.put(MetadataLogConfig .METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG , " 10000" )
1108
+ props.put(MetricConfigs .METRIC_REPORTER_CLASSES_CONFIG , classOf [TestNumReplicaFetcherMetricsReporter ].getName)
1109
+ props.put(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 1" )
1106
1110
1107
1111
val kafkaConfig = KafkaConfig .fromProps(props)
1108
1112
val newBroker = createBroker(kafkaConfig).asInstanceOf [BrokerServer ]
1109
1113
servers += newBroker
1110
1114
1111
1115
alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal ))
1112
1116
1117
+ // Add num.replica.fetchers to the cluster-level config.
1118
+ val clusterLevelProps = new Properties
1119
+ clusterLevelProps.put(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 2" )
1120
+ reconfigureServers(clusterLevelProps, perBrokerConfig = false , (ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 2" ))
1121
+
1122
+ // Wait for the metrics reporter to be configured
1123
+ val initialReporter = TestNumReplicaFetcherMetricsReporter .waitForReporters(1 ).head
1124
+ initialReporter.verifyState(reconfigureCount = 1 , numFetcher = 2 )
1125
+
1113
1126
TestUtils .ensureConsistentKRaftMetadata(servers, controllerServer)
1114
1127
1115
1128
TestUtils .waitUntilTrue(
@@ -1122,11 +1135,19 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
1122
1135
newBroker.shutdown()
1123
1136
newBroker.awaitShutdown()
1124
1137
1138
+ // Clean up the test reporter
1139
+ TestNumReplicaFetcherMetricsReporter .testReporters.clear()
1140
+
1125
1141
val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
1126
1142
invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, KEYSTORE_PROPS , listenerPrefix(SecureExternal )))
1127
1143
newBroker.config.updateCurrentConfig(KafkaConfig .fromProps(invalidStaticConfigs))
1128
1144
1129
1145
newBroker.startup()
1146
+
1147
+ // Verify that the custom MetricsReporter is not reconfigured after restart.
1148
+ // If readDynamicBrokerConfigsFromSnapshot works correctly, the reporter should maintain its state.
1149
+ val reporterAfterRestart = TestNumReplicaFetcherMetricsReporter .waitForReporters(1 ).head
1150
+ reporterAfterRestart.verifyState(reconfigureCount = 0 , numFetcher = 2 )
1130
1151
}
1131
1152
1132
1153
private def awaitInitialPositions (consumer : Consumer [_, _]): Unit = {
@@ -1635,6 +1656,64 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
1635
1656
}
1636
1657
}
1637
1658
1659
+ object TestNumReplicaFetcherMetricsReporter {
1660
+ val testReporters = new ConcurrentLinkedQueue [TestNumReplicaFetcherMetricsReporter ]()
1661
+
1662
+ def waitForReporters (count : Int ): List [TestNumReplicaFetcherMetricsReporter ] = {
1663
+ TestUtils .waitUntilTrue(() => testReporters.size == count, msg = " Metrics reporters size not matched. Expected: " + count + " , actual: " + testReporters.size())
1664
+
1665
+ val reporters = testReporters.asScala.toList
1666
+ TestUtils .waitUntilTrue(() => reporters.forall(_.configureCount == 1 ), msg = " Metrics reporters not configured" )
1667
+ reporters
1668
+ }
1669
+ }
1670
+
1671
+
1672
+ class TestNumReplicaFetcherMetricsReporter extends MetricsReporter {
1673
+ import TestNumReplicaFetcherMetricsReporter ._
1674
+ @ volatile var configureCount = 0
1675
+ @ volatile var reconfigureCount = 0
1676
+ @ volatile var numFetchers : Int = 1
1677
+ testReporters.add(this )
1678
+
1679
+ override def init (metrics : util.List [KafkaMetric ]): Unit = {
1680
+ }
1681
+
1682
+ override def configure (configs : util.Map [String , _]): Unit = {
1683
+ configureCount += 1
1684
+ numFetchers = configs.get(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG ).toString.toInt
1685
+ }
1686
+
1687
+ override def metricChange (metric : KafkaMetric ): Unit = {
1688
+ }
1689
+
1690
+ override def metricRemoval (metric : KafkaMetric ): Unit = {
1691
+ }
1692
+
1693
+ override def reconfigurableConfigs (): util.Set [String ] = {
1694
+ util.Set .of(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG )
1695
+ }
1696
+
1697
+ override def validateReconfiguration (configs : util.Map [String , _]): Unit = {
1698
+ val numFetchers = configs.get(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG ).toString.toInt
1699
+ if (numFetchers <= 0 )
1700
+ throw new ConfigException (s " Invalid num.replica.fetchers $numFetchers" )
1701
+ }
1702
+
1703
+ override def reconfigure (configs : util.Map [String , _]): Unit = {
1704
+ reconfigureCount += 1
1705
+ numFetchers = configs.get(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG ).toString.toInt
1706
+ }
1707
+
1708
+ override def close (): Unit = {
1709
+ }
1710
+
1711
+ def verifyState (reconfigureCount : Int , numFetcher : Int = 1 ): Unit = {
1712
+ assertEquals(reconfigureCount, this .reconfigureCount)
1713
+ assertEquals(numFetcher, this .numFetchers)
1714
+ }
1715
+ }
1716
+
1638
1717
1639
1718
class MockFileConfigProvider extends FileConfigProvider {
1640
1719
@ throws(classOf [IOException ])
0 commit comments