@@ -1263,23 +1263,18 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
1263
1263
key := parts [0 ]
1264
1264
msg := parts [1 ]
1265
1265
publishData = append (publishData , testutil2.OrderedKeyMsg {Key : key , Data : msg })
1266
- r := topic .Publish (ctx , & Message {
1266
+ topic .Publish (ctx , & Message {
1267
1267
Data : []byte (msg ),
1268
1268
OrderingKey : key ,
1269
1269
})
1270
- go func () {
1271
- _ , err := r .Get (ctx )
1272
- if err != nil {
1273
- // Can't fail inside goroutine, so just log the error.
1274
- t .Logf ("publish error for message(%s): %v" , msg , err )
1275
- }
1276
- }()
1277
1270
wg .Add (1 )
1278
1271
}
1279
1272
if err := scanner .Err (); err != nil {
1280
1273
t .Fatal (err )
1281
1274
}
1282
1275
1276
+ receiveDone := make (chan struct {})
1277
+ ctx , cancel := context .WithCancel (ctx )
1283
1278
go func () {
1284
1279
if err := sub .Receive (ctx , func (ctx context.Context , msg * Message ) {
1285
1280
mu .Lock ()
@@ -1298,6 +1293,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
1298
1293
t .Error (err )
1299
1294
}
1300
1295
}
1296
+ close (receiveDone )
1301
1297
}()
1302
1298
1303
1299
done := make (chan struct {})
@@ -1308,6 +1304,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
1308
1304
1309
1305
select {
1310
1306
case <- done :
1307
+ cancel ()
1311
1308
case <- time .After (5 * time .Minute ):
1312
1309
t .Fatal ("timed out after 5m waiting for all messages to be received" )
1313
1310
}
@@ -1317,6 +1314,13 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
1317
1314
if err := testutil2 .VerifyKeyOrdering (publishData , receiveData ); err != nil {
1318
1315
t .Fatalf ("VerifyKeyOrdering error: %v" , err )
1319
1316
}
1317
+
1318
+ select {
1319
+ case <- receiveDone :
1320
+ case <- time .After (5 * time .Minute ):
1321
+ t .Fatal ("timed out after 5m waiting for receive to exit" )
1322
+ }
1323
+
1320
1324
}
1321
1325
1322
1326
func TestIntegration_OrderedKeys_ResumePublish (t * testing.T ) {
0 commit comments