Skip to content

Commit e84ba2f

Browse files
committed
add unit test
1 parent 7f64424 commit e84ba2f

File tree

3 files changed

+154
-93
lines changed

3 files changed

+154
-93
lines changed

core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
239239

240240
def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
241241
configsToRemove: List[String] = List(),
242-
inputTopic: String,
242+
inputTopics: Set[String],
243+
changelogTopics: Set[String] = Set(),
243244
streamsGroupId: String): AsyncKafkaConsumer[K, V] = {
244245
val props = new Properties()
245246
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
@@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
255256
Optional.empty(),
256257
util.Map.of(
257258
"subtopology-0", new StreamsRebalanceData.Subtopology(
258-
util.Set.of(inputTopic),
259+
inputTopics.asJava,
259260
util.Set.of(),
260261
util.Map.of(),
261-
util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())),
262+
changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava,
262263
util.Set.of()
263264
)),
264265
Map.empty[String, String].asJava
@@ -270,7 +271,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
270271
configOverrides = props,
271272
streamsRebalanceData = streamsRebalanceData
272273
)
273-
consumer.subscribe(util.Set.of(inputTopic),
274+
consumer.subscribe(inputTopics.asJava,
274275
new StreamsRebalanceListener {
275276
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
276277
Optional.empty()

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 57 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.lang.{Long => JLong}
2424
import java.time.{Duration => JDuration}
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
2626
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
27-
import java.util.{Collections, Locale, Optional, Properties, UUID}
27+
import java.util.{Collections, Locale, Optional, Properties}
2828
import java.{time, util}
2929
import kafka.integration.KafkaServerTestHarness
3030
import kafka.server.KafkaConfig
@@ -34,7 +34,7 @@ import org.apache.kafka.clients.HostResolver
3434
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
3535
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
3636
import org.apache.kafka.clients.admin._
37-
import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData, StreamsRebalanceListener}
37+
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
3838
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
3939
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
4040
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
@@ -2318,90 +2318,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
23182318
}
23192319
}
23202320

2321-
@Test
2322-
def testDescribeStreamsGroups(): Unit = {
2323-
val streamsGroupId = "stream_group_id"
2324-
val testTopicName = "test_topic"
2325-
val testNumPartitions = 1
2326-
2327-
val config = createConfig
2328-
client = Admin.create(config)
2329-
2330-
prepareTopics(List(testTopicName), testNumPartitions)
2331-
prepareRecords(testTopicName)
2332-
2333-
val streams = createStreamsGroupToDescribe(
2334-
inputTopic = testTopicName,
2335-
streamsGroupId = streamsGroupId
2336-
)
2337-
2338-
try {
2339-
TestUtils.waitUntilTrue(() => {
2340-
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
2341-
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
2342-
}, "Stream group not stable yet")
2343-
2344-
// Verify the describe call works correctly
2345-
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
2346-
val group = describedGroups.get(streamsGroupId)
2347-
assertNotNull(group)
2348-
assertEquals(streamsGroupId, group.groupId())
2349-
assertFalse(group.members().isEmpty)
2350-
assertNotNull(group.subtopologies())
2351-
assertFalse(group.subtopologies().isEmpty)
2352-
2353-
// Verify the topology contains the expected source and sink topics
2354-
val subtopologies = group.subtopologies().asScala
2355-
assertTrue(subtopologies.exists(subtopology =>
2356-
subtopology.sourceTopics().contains(testTopicName)))
2357-
2358-
// Test describing a non-existing group
2359-
val nonExistingGroup = "non_existing_stream_group"
2360-
val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup))
2361-
assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all())
2362-
2363-
} finally {
2364-
Utils.closeQuietly(streams, "streams")
2365-
Utils.closeQuietly(client, "adminClient")
2366-
}
2367-
}
2368-
2369-
private def createStreamsGroupToDescribe(
2370-
inputTopic: String,
2371-
streamsGroupId: String
2372-
): Consumer[Array[Byte], Array[Byte]] = {
2373-
streamsConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
2374-
streamsConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
2375-
val consumer = createStreamsConsumer(streamsRebalanceData = new StreamsRebalanceData(
2376-
UUID.randomUUID(),
2377-
Optional.empty(),
2378-
util.Map.of(
2379-
"subtopology-0", new StreamsRebalanceData.Subtopology(
2380-
util.Set.of(inputTopic),
2381-
util.Set.of(),
2382-
util.Map.of(),
2383-
util.Map.of(),
2384-
util.Set.of()
2385-
)),
2386-
Map.empty[String, String].asJava
2387-
))
2388-
consumer.subscribe(
2389-
util.Set.of(inputTopic),
2390-
new StreamsRebalanceListener {
2391-
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
2392-
Optional.empty()
2393-
2394-
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] =
2395-
Optional.empty()
2396-
2397-
override def onAllTasksLost(): Optional[Exception] =
2398-
Optional.empty()
2399-
}
2400-
)
2401-
consumer.poll(JDuration.ofMillis(500L))
2402-
consumer
2403-
}
2404-
24052321
/**
24062322
* Test the consumer group APIs for member removal.
24072323
*/
@@ -2678,7 +2594,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
26782594
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
26792595

26802596
val streamsGroup = createStreamsGroup(
2681-
inputTopic = testTopicName,
2597+
inputTopics = Set(testTopicName),
2598+
changelogTopics = Set(testTopicName + "-changelog"),
26822599
streamsGroupId = streamsGroupId
26832600
)
26842601

@@ -4487,7 +4404,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
44874404
prepareRecords(testTopicName)
44884405

44894406
val streams = createStreamsGroup(
4490-
inputTopic = testTopicName,
4407+
inputTopics = Set(testTopicName),
4408+
changelogTopics = Set(testTopicName + "-changelog"),
44914409
streamsGroupId = streamsGroupId
44924410
)
44934411
streams.poll(JDuration.ofMillis(500L))
@@ -4524,6 +4442,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
45244442
}
45254443
}
45264444

4445+
@Test
4446+
def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
4447+
val streamsGroupId = "stream_group_id"
4448+
val testTopicName = "test_topic"
4449+
val testNumPartitions = 1
4450+
4451+
val config = createConfig
4452+
client = Admin.create(config)
4453+
4454+
prepareTopics(List(testTopicName), testNumPartitions)
4455+
prepareRecords(testTopicName)
4456+
4457+
val streams = createStreamsGroup(
4458+
inputTopics = Set(testTopicName),
4459+
streamsGroupId = streamsGroupId
4460+
)
4461+
streams.poll(JDuration.ofMillis(500L))
4462+
4463+
try {
4464+
TestUtils.waitUntilTrue(() => {
4465+
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
4466+
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
4467+
}, "Stream group not stable yet")
4468+
4469+
// Verify the describe call works correctly
4470+
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
4471+
val group = describedGroups.get(streamsGroupId)
4472+
assertNotNull(group)
4473+
assertEquals(streamsGroupId, group.groupId())
4474+
assertFalse(group.members().isEmpty)
4475+
assertNotNull(group.subtopologies())
4476+
assertFalse(group.subtopologies().isEmpty)
4477+
4478+
// Verify the topology contains the expected source and sink topics
4479+
val subtopologies = group.subtopologies().asScala
4480+
assertTrue(subtopologies.exists(subtopology =>
4481+
subtopology.sourceTopics().contains(testTopicName)))
4482+
4483+
// Test describing a non-existing group
4484+
val nonExistingGroup = "non_existing_stream_group"
4485+
val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup))
4486+
assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all())
4487+
4488+
} finally {
4489+
Utils.closeQuietly(streams, "streams")
4490+
Utils.closeQuietly(client, "adminClient")
4491+
}
4492+
}
4493+
45274494
@Test
45284495
def testDeleteStreamsGroups(): Unit = {
45294496
val testTopicName = "test_topic"
@@ -4546,7 +4513,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
45464513
val streamsGroupId = s"stream_group_id_$i"
45474514

45484515
val streams = createStreamsGroup(
4549-
inputTopic = testTopicName,
4516+
inputTopics = Set(testTopicName),
4517+
changelogTopics = Set(testTopicName + "-changelog"),
45504518
streamsGroupId = streamsGroupId,
45514519
)
45524520
streams.poll(JDuration.ofMillis(500L))

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, metadataImage)
1621616216
assertRecordsEquals(expectedRecords, result.records());
1621716217
}
1621816218

16219+
@Test
16220+
public void testJoinEmptyStreamsGroupAndDescribe() {
16221+
String groupId = "fooup";
16222+
String memberId = Uuid.randomUuid().toString();
16223+
16224+
String subtopology1 = "subtopology1";
16225+
String fooTopicName = "foo";
16226+
Uuid fooTopicId = Uuid.randomUuid();
16227+
Topology topology = new Topology().setSubtopologies(List.of(
16228+
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
16229+
));
16230+
16231+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
16232+
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
16233+
.addTopic(fooTopicId, fooTopicName, 6)
16234+
.buildCoordinatorMetadataImage();
16235+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
16236+
.withStreamsGroupTaskAssignors(List.of(assignor))
16237+
.withMetadataImage(metadataImage)
16238+
.build();
16239+
16240+
assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16241+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
16242+
)));
16243+
16244+
assertThrows(GroupIdNotFoundException.class, () ->
16245+
context.groupMetadataManager.streamsGroup(groupId));
16246+
16247+
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
16248+
new StreamsGroupHeartbeatRequestData()
16249+
.setGroupId(groupId)
16250+
.setMemberId(memberId)
16251+
.setMemberEpoch(0)
16252+
.setProcessId("process-id")
16253+
.setRebalanceTimeoutMs(1500)
16254+
.setTopology(topology)
16255+
.setActiveTasks(List.of())
16256+
.setStandbyTasks(List.of())
16257+
.setWarmupTasks(List.of()));
16258+
16259+
assertResponseEquals(
16260+
new StreamsGroupHeartbeatResponseData()
16261+
.setMemberId(memberId)
16262+
.setMemberEpoch(1)
16263+
.setHeartbeatIntervalMs(5000)
16264+
.setActiveTasks(List.of(
16265+
new StreamsGroupHeartbeatResponseData.TaskIds()
16266+
.setSubtopologyId(subtopology1)
16267+
.setPartitions(List.of(0, 1, 2, 3, 4, 5))
16268+
))
16269+
.setStandbyTasks(List.of())
16270+
.setWarmupTasks(List.of()),
16271+
result.response().data()
16272+
);
16273+
16274+
StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
16275+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
16276+
.setMemberEpoch(1)
16277+
.setPreviousMemberEpoch(0)
16278+
.setClientId(DEFAULT_CLIENT_ID)
16279+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
16280+
.setRebalanceTimeoutMs(1500)
16281+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16282+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
16283+
.build();
16284+
16285+
// Commit the offset and test again
16286+
context.commit();
16287+
16288+
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset);
16289+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
16290+
.setGroupId(groupId)
16291+
.setAssignmentEpoch(1)
16292+
.setTopology(
16293+
new StreamsGroupDescribeResponseData.Topology()
16294+
.setEpoch(0)
16295+
.setSubtopologies(List.of(
16296+
new StreamsGroupDescribeResponseData.Subtopology()
16297+
.setSubtopologyId(subtopology1)
16298+
.setSourceTopics(List.of(fooTopicName))
16299+
))
16300+
)
16301+
.setMembers(Collections.singletonList(
16302+
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16303+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
16304+
))
16305+
.setGroupState(StreamsGroupState.STABLE.toString())
16306+
.setGroupEpoch(1);
16307+
assertEquals(1, actual.size());
16308+
assertEquals(describedGroup, actual.get(0));
16309+
}
16310+
1621916311
@Test
1622016312
public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
1622116313
String groupId = "fooup";

0 commit comments

Comments
 (0)