Skip to content

Commit 6de8eda

Browse files
mikeklaashongalex
authored andcommitted
fix(pubsub/pstest): Clear Subscription when calling ClearMessages.
Currently, `ClearMessages()` only clears `Server` messages, not touching any messages that have been published to a Subscription but not yet delivered. This can cause odd behaviour if those messages are delivered as `pstest` assumes that any such messages exist in the Server's `msgsByID` map. Attempting to ModAck such a message results in a NPE.
1 parent 036656b commit 6de8eda

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

pubsub/pstest/fake.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ func (s *Server) ClearMessages() {
288288
s.GServer.mu.Lock()
289289
s.GServer.msgs = nil
290290
s.GServer.msgsByID = make(map[string]*Message)
291+
for _, sub := range s.GServer.subs {
292+
sub.msgs = map[string]*message{}
293+
}
291294
s.GServer.mu.Unlock()
292295
}
293296

pubsub/pstest/fake_test.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,13 +472,19 @@ func TestPublishOrdered(t *testing.T) {
472472
}
473473

474474
func TestClearMessages(t *testing.T) {
475-
s := NewServer()
476-
defer s.Close()
475+
pclient, sclient, s, cleanup := newFake(context.TODO(), t)
476+
defer cleanup()
477+
478+
top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
479+
sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
480+
Name: "projects/P/subscriptions/S",
481+
Topic: top.Name,
482+
AckDeadlineSeconds: 10,
483+
})
477484

478485
for i := 0; i < 3; i++ {
479-
s.Publish("projects/p/topics/t", []byte("hello"), nil)
486+
s.Publish(top.Name, []byte("hello"), nil)
480487
}
481-
s.Wait()
482488
msgs := s.Messages()
483489
if got, want := len(msgs), 3; got != want {
484490
t.Errorf("got %d messages, want %d", got, want)
@@ -488,6 +494,14 @@ func TestClearMessages(t *testing.T) {
488494
if got, want := len(msgs), 0; got != want {
489495
t.Errorf("got %d messages, want %d", got, want)
490496
}
497+
498+
res, err := sclient.Pull(context.Background(), &pb.PullRequest{Subscription: sub.Name})
499+
if err != nil {
500+
t.Fatal(err)
501+
}
502+
if len(res.ReceivedMessages) != 0 {
503+
t.Errorf("got %d messages, want zero", len(res.ReceivedMessages))
504+
}
491505
}
492506

493507
// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now".

0 commit comments

Comments
 (0)