Skip to content

Commit 53f17c4

Browse files
authored
test(pubsub): cleanup ordered keys json integration test (#8575)
* test(pubsub): remove checking of publish result to avoid creating too many goroutines * add check for receive to exit before deleting subscription
1 parent 6ef1945 commit 53f17c4

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

pubsub/integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,23 +1263,18 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
12631263
key := parts[0]
12641264
msg := parts[1]
12651265
publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
1266-
r := topic.Publish(ctx, &Message{
1266+
topic.Publish(ctx, &Message{
12671267
Data: []byte(msg),
12681268
OrderingKey: key,
12691269
})
1270-
go func() {
1271-
_, err := r.Get(ctx)
1272-
if err != nil {
1273-
// Can't fail inside goroutine, so just log the error.
1274-
t.Logf("publish error for message(%s): %v", msg, err)
1275-
}
1276-
}()
12771270
wg.Add(1)
12781271
}
12791272
if err := scanner.Err(); err != nil {
12801273
t.Fatal(err)
12811274
}
12821275

1276+
receiveDone := make(chan struct{})
1277+
ctx, cancel := context.WithCancel(ctx)
12831278
go func() {
12841279
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
12851280
mu.Lock()
@@ -1298,6 +1293,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
12981293
t.Error(err)
12991294
}
13001295
}
1296+
close(receiveDone)
13011297
}()
13021298

13031299
done := make(chan struct{})
@@ -1308,6 +1304,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
13081304

13091305
select {
13101306
case <-done:
1307+
cancel()
13111308
case <-time.After(5 * time.Minute):
13121309
t.Fatal("timed out after 5m waiting for all messages to be received")
13131310
}
@@ -1317,6 +1314,13 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
13171314
if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
13181315
t.Fatalf("VerifyKeyOrdering error: %v", err)
13191316
}
1317+
1318+
select {
1319+
case <-receiveDone:
1320+
case <-time.After(5 * time.Minute):
1321+
t.Fatal("timed out after 5m waiting for receive to exit")
1322+
}
1323+
13201324
}
13211325

13221326
func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {

0 commit comments

Comments
 (0)