From bc7e915d5afb6b0b95250324d219067928bd07f6 Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Fri, 26 May 2023 15:12:12 -0400 Subject: [PATCH 1/4] tests: make sure to catch all messages on flaky ordered message test (#1728) Maybe fixes: https://togithub.com/googleapis/nodejs-pubsub/issues/1727 --- system-test/pubsub.ts | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/system-test/pubsub.ts b/system-test/pubsub.ts index 8d138c8c0..3c328362d 100644 --- a/system-test/pubsub.ts +++ b/system-test/pubsub.ts @@ -307,20 +307,6 @@ describe('pubsub', () => { // eslint-disable-next-line @typescript-eslint/no-var-requires } = require('../../system-test/fixtures/ordered-messages.json'); - const publishes = input.map(({key, message}: Input) => { - const options: MessageOptions = { - data: Buffer.from(message), - }; - - if (key) { - options.orderingKey = key; - } - - return topic.publishMessage(options); - }); - - await Promise.all(publishes); - const pending: Pending = {}; expected.forEach(({key, messages}: Expected) => { @@ -329,6 +315,7 @@ describe('pubsub', () => { const deferred = defer(); + // Make sure we're listening when the lease manager throws the messages at us. subscription .on('error', deferred.reject) .on('message', (message: Message) => { @@ -371,6 +358,19 @@ describe('pubsub', () => { } }); + const publishes = input.map(({key, message}: Input) => { + const options: MessageOptions = { + data: Buffer.from(message), + }; + + if (key) { + options.orderingKey = key; + } + + return topic.publishMessage(options); + }); + await Promise.all(publishes); + await deferred.promise; await Promise.all([topic.delete(), subscription.delete()]); }); From 2ec6dc9037ce39c262b135fe38030776b49e863a Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Tue, 6 Jun 2023 22:27:09 +0200 Subject: [PATCH 2/4] chore(deps): update dependency linkinator to v5 (#1733) --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 1c5216902..af9a57aa3 100644 --- a/package.json +++ b/package.json @@ -85,7 +85,7 @@ "jsdoc": "^4.0.0", "jsdoc-fresh": "^2.0.0", "jsdoc-region-tag": "^2.0.0", - "linkinator": "^4.0.0", + "linkinator": "^5.0.0", "mocha": "^9.2.2", "mv": "^2.1.1", "ncp": "^2.0.0", From 52ea441fee302aa6c400b58d4ecebbc96a2ea5ea Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Thu, 8 Jun 2023 17:17:46 -0400 Subject: [PATCH 3/4] fix: don't crash if an already-drained/removed queue gets flushed again (#1747) --- src/publisher/message-queues.ts | 8 ++++++++ test/publisher/message-queues.ts | 11 +++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index a9fe4224d..3ff88be25 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -354,6 +354,14 @@ export class OrderedQueue extends MessageQueue { * @fires OrderedQueue#drain */ async publish(): Promise { + // If there's nothing to flush, don't try, just short-circuit to the drain event. + // This can happen if we get a publish() call after already being drained, in + // the case that topic.flush() pulls a reference to us before we get deleted. + if (!this.batches.length) { + this.emit('drain'); + return; + } + this.inFlight = true; if (this.pending) { diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index f28b4e33b..4c62917e7 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -729,6 +729,17 @@ describe('Message Queues', () => { assert.strictEqual(spy.callCount, 1); }); + + it('should emit "drain" if already empty on publish', async () => { + const spy = sandbox.spy(); + sandbox.stub(queue, '_publish').resolves(); + + queue.on('drain', spy); + await queue.publish(); + await queue.publish(); + + assert.strictEqual(spy.callCount, 2); + }); }); describe('resumePublishing', () => { From a690c70c83d097fa454255a092bcc90b86e27161 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Thu, 8 Jun 2023 21:46:12 +0000 Subject: [PATCH 4/4] chore(main): release 3.7.1 (#1748) :robot: I have created a release *beep* *boop* --- ## [3.7.1](https://togithub.com/googleapis/nodejs-pubsub/compare/v3.7.0...v3.7.1) (2023-06-08) ### Bug Fixes * Don't crash if an already-drained/removed queue gets flushed again ([#1747](https://togithub.com/googleapis/nodejs-pubsub/issues/1747)) ([52ea441](https://togithub.com/googleapis/nodejs-pubsub/commit/52ea441fee302aa6c400b58d4ecebbc96a2ea5ea)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please). --- CHANGELOG.md | 7 +++++++ package.json | 2 +- samples/package.json | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 09d0ec97d..53be64d87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ [1]: https://www.npmjs.com/package/@google-cloud/pubsub?activeTab=versions +## [3.7.1](https://github.com/googleapis/nodejs-pubsub/compare/v3.7.0...v3.7.1) (2023-06-08) + + +### Bug Fixes + +* Don't crash if an already-drained/removed queue gets flushed again ([#1747](https://github.com/googleapis/nodejs-pubsub/issues/1747)) ([52ea441](https://github.com/googleapis/nodejs-pubsub/commit/52ea441fee302aa6c400b58d4ecebbc96a2ea5ea)) + ## [3.7.0](https://github.com/googleapis/nodejs-pubsub/compare/v3.6.0...v3.7.0) (2023-05-26) diff --git a/package.json b/package.json index af9a57aa3..1dce0ab06 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@google-cloud/pubsub", "description": "Cloud Pub/Sub Client Library for Node.js", - "version": "3.7.0", + "version": "3.7.1", "license": "Apache-2.0", "author": "Google Inc.", "engines": { diff --git a/samples/package.json b/samples/package.json index 4a76ea5ba..cf83857e3 100644 --- a/samples/package.json +++ b/samples/package.json @@ -21,7 +21,7 @@ "precompile": "npm run clean" }, "dependencies": { - "@google-cloud/pubsub": "^3.7.0", + "@google-cloud/pubsub": "^3.7.1", "@opentelemetry/api": "^1.0.0", "@opentelemetry/tracing": "^0.24.0", "avro-js": "^1.10.1",