@@ -24,7 +24,7 @@ import java.lang.{Long => JLong}
24
24
import java .time .{Duration => JDuration }
25
25
import java .util .concurrent .atomic .{AtomicBoolean , AtomicInteger }
26
26
import java .util .concurrent .{CountDownLatch , ExecutionException , TimeUnit }
27
- import java .util .{Collections , Locale , Optional , Properties , UUID }
27
+ import java .util .{Collections , Locale , Optional , Properties }
28
28
import java .{time , util }
29
29
import kafka .integration .KafkaServerTestHarness
30
30
import kafka .server .KafkaConfig
@@ -34,7 +34,7 @@ import org.apache.kafka.clients.HostResolver
34
34
import org .apache .kafka .clients .admin .AlterConfigOp .OpType
35
35
import org .apache .kafka .clients .admin .ConfigEntry .ConfigSource
36
36
import org .apache .kafka .clients .admin ._
37
- import org .apache .kafka .clients .consumer .internals .{ AsyncKafkaConsumer , StreamsRebalanceData , StreamsRebalanceListener }
37
+ import org .apache .kafka .clients .consumer .internals .AsyncKafkaConsumer
38
38
import org .apache .kafka .clients .consumer .{CommitFailedException , Consumer , ConsumerConfig , GroupProtocol , KafkaConsumer , OffsetAndMetadata , ShareConsumer }
39
39
import org .apache .kafka .clients .producer .{KafkaProducer , ProducerConfig , ProducerRecord }
40
40
import org .apache .kafka .common .acl .{AccessControlEntry , AclBinding , AclBindingFilter , AclOperation , AclPermissionType }
@@ -2318,90 +2318,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
2318
2318
}
2319
2319
}
2320
2320
2321
- @ Test
2322
- def testDescribeStreamsGroups (): Unit = {
2323
- val streamsGroupId = " stream_group_id"
2324
- val testTopicName = " test_topic"
2325
- val testNumPartitions = 1
2326
-
2327
- val config = createConfig
2328
- client = Admin .create(config)
2329
-
2330
- prepareTopics(List (testTopicName), testNumPartitions)
2331
- prepareRecords(testTopicName)
2332
-
2333
- val streams = createStreamsGroupToDescribe(
2334
- inputTopic = testTopicName,
2335
- streamsGroupId = streamsGroupId
2336
- )
2337
-
2338
- try {
2339
- TestUtils .waitUntilTrue(() => {
2340
- val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null )
2341
- firstGroup.groupState().orElse(null ) == GroupState .STABLE && firstGroup.groupId() == streamsGroupId
2342
- }, " Stream group not stable yet" )
2343
-
2344
- // Verify the describe call works correctly
2345
- val describedGroups = client.describeStreamsGroups(util.List .of(streamsGroupId)).all().get()
2346
- val group = describedGroups.get(streamsGroupId)
2347
- assertNotNull(group)
2348
- assertEquals(streamsGroupId, group.groupId())
2349
- assertFalse(group.members().isEmpty)
2350
- assertNotNull(group.subtopologies())
2351
- assertFalse(group.subtopologies().isEmpty)
2352
-
2353
- // Verify the topology contains the expected source and sink topics
2354
- val subtopologies = group.subtopologies().asScala
2355
- assertTrue(subtopologies.exists(subtopology =>
2356
- subtopology.sourceTopics().contains(testTopicName)))
2357
-
2358
- // Test describing a non-existing group
2359
- val nonExistingGroup = " non_existing_stream_group"
2360
- val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List .of(nonExistingGroup))
2361
- assertFutureThrows(classOf [GroupIdNotFoundException ], describedNonExistingGroupResponse.all())
2362
-
2363
- } finally {
2364
- Utils .closeQuietly(streams, " streams" )
2365
- Utils .closeQuietly(client, " adminClient" )
2366
- }
2367
- }
2368
-
2369
- private def createStreamsGroupToDescribe (
2370
- inputTopic : String ,
2371
- streamsGroupId : String
2372
- ): Consumer [Array [Byte ], Array [Byte ]] = {
2373
- streamsConsumerConfig.put(ConsumerConfig .GROUP_ID_CONFIG , streamsGroupId)
2374
- streamsConsumerConfig.put(ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , " false" )
2375
- val consumer = createStreamsConsumer(streamsRebalanceData = new StreamsRebalanceData (
2376
- UUID .randomUUID(),
2377
- Optional .empty(),
2378
- util.Map .of(
2379
- " subtopology-0" , new StreamsRebalanceData .Subtopology (
2380
- util.Set .of(inputTopic),
2381
- util.Set .of(),
2382
- util.Map .of(),
2383
- util.Map .of(),
2384
- util.Set .of()
2385
- )),
2386
- Map .empty[String , String ].asJava
2387
- ))
2388
- consumer.subscribe(
2389
- util.Set .of(inputTopic),
2390
- new StreamsRebalanceListener {
2391
- override def onTasksRevoked (tasks : util.Set [StreamsRebalanceData .TaskId ]): Optional [Exception ] =
2392
- Optional .empty()
2393
-
2394
- override def onTasksAssigned (assignment : StreamsRebalanceData .Assignment ): Optional [Exception ] =
2395
- Optional .empty()
2396
-
2397
- override def onAllTasksLost (): Optional [Exception ] =
2398
- Optional .empty()
2399
- }
2400
- )
2401
- consumer.poll(JDuration .ofMillis(500L ))
2402
- consumer
2403
- }
2404
-
2405
2321
/**
2406
2322
* Test the consumer group APIs for member removal.
2407
2323
*/
@@ -2678,7 +2594,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
2678
2594
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
2679
2595
2680
2596
val streamsGroup = createStreamsGroup(
2681
- inputTopic = testTopicName,
2597
+ inputTopics = Set (testTopicName),
2598
+ changelogTopics = Set (testTopicName + " -changelog" ),
2682
2599
streamsGroupId = streamsGroupId
2683
2600
)
2684
2601
@@ -4487,7 +4404,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
4487
4404
prepareRecords(testTopicName)
4488
4405
4489
4406
val streams = createStreamsGroup(
4490
- inputTopic = testTopicName,
4407
+ inputTopics = Set (testTopicName),
4408
+ changelogTopics = Set (testTopicName + " -changelog" ),
4491
4409
streamsGroupId = streamsGroupId
4492
4410
)
4493
4411
streams.poll(JDuration .ofMillis(500L ))
@@ -4524,6 +4442,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
4524
4442
}
4525
4443
}
4526
4444
4445
+ @ Test
4446
+ def testDescribeStreamsGroupsForStatelessTopology (): Unit = {
4447
+ val streamsGroupId = " stream_group_id"
4448
+ val testTopicName = " test_topic"
4449
+ val testNumPartitions = 1
4450
+
4451
+ val config = createConfig
4452
+ client = Admin .create(config)
4453
+
4454
+ prepareTopics(List (testTopicName), testNumPartitions)
4455
+ prepareRecords(testTopicName)
4456
+
4457
+ val streams = createStreamsGroup(
4458
+ inputTopics = Set (testTopicName),
4459
+ streamsGroupId = streamsGroupId
4460
+ )
4461
+ streams.poll(JDuration .ofMillis(500L ))
4462
+
4463
+ try {
4464
+ TestUtils .waitUntilTrue(() => {
4465
+ val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null )
4466
+ firstGroup.groupState().orElse(null ) == GroupState .STABLE && firstGroup.groupId() == streamsGroupId
4467
+ }, " Stream group not stable yet" )
4468
+
4469
+ // Verify the describe call works correctly
4470
+ val describedGroups = client.describeStreamsGroups(util.List .of(streamsGroupId)).all().get()
4471
+ val group = describedGroups.get(streamsGroupId)
4472
+ assertNotNull(group)
4473
+ assertEquals(streamsGroupId, group.groupId())
4474
+ assertFalse(group.members().isEmpty)
4475
+ assertNotNull(group.subtopologies())
4476
+ assertFalse(group.subtopologies().isEmpty)
4477
+
4478
+ // Verify the topology contains the expected source and sink topics
4479
+ val subtopologies = group.subtopologies().asScala
4480
+ assertTrue(subtopologies.exists(subtopology =>
4481
+ subtopology.sourceTopics().contains(testTopicName)))
4482
+
4483
+ // Test describing a non-existing group
4484
+ val nonExistingGroup = " non_existing_stream_group"
4485
+ val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List .of(nonExistingGroup))
4486
+ assertFutureThrows(classOf [GroupIdNotFoundException ], describedNonExistingGroupResponse.all())
4487
+
4488
+ } finally {
4489
+ Utils .closeQuietly(streams, " streams" )
4490
+ Utils .closeQuietly(client, " adminClient" )
4491
+ }
4492
+ }
4493
+
4527
4494
@ Test
4528
4495
def testDeleteStreamsGroups (): Unit = {
4529
4496
val testTopicName = " test_topic"
@@ -4546,7 +4513,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
4546
4513
val streamsGroupId = s " stream_group_id_ $i"
4547
4514
4548
4515
val streams = createStreamsGroup(
4549
- inputTopic = testTopicName,
4516
+ inputTopics = Set (testTopicName),
4517
+ changelogTopics = Set (testTopicName + " -changelog" ),
4550
4518
streamsGroupId = streamsGroupId,
4551
4519
)
4552
4520
streams.poll(JDuration .ofMillis(500L ))
0 commit comments