Skip to content

Commit 998de35

Browse files
authored
feat: add support for exactly once subscriptions (#1572)
Adds support for exactly-once delivery subscriptions. Please see the samples for information on how to interact with exactly-once subscriptions properly (specifically, using the `*WithResponse()` methods). Other client library folks - Mahesh needs to review this, so please don't merge until that happens. Fixes #1571 🦕
1 parent d72db50 commit 998de35

22 files changed

+2241
-130
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
129129
| Create Push Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createPushSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createPushSubscription.js,samples/README.md) |
130130
| Create Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscription.js,samples/README.md) |
131131
| Create Subscription With Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithDeadLetterPolicy.js,samples/README.md) |
132+
| Create an exactly-once delivery subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md) |
132133
| Create Subscription With Filtering | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithFiltering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithFiltering.js,samples/README.md) |
133134
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
134135
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
@@ -148,6 +149,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
148149
| Listen For Avro Records | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForAvroRecords.js,samples/README.md) |
149150
| Listen For Errors | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForErrors.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForErrors.js,samples/README.md) |
150151
| Listen For Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessages.js,samples/README.md) |
152+
| Listen with exactly-once delivery | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) |
151153
| Listen For Protobuf Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForProtobufMessages.js,samples/README.md) |
152154
| Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) |
153155
| Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) |

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
"extend": "^3.0.2",
6060
"google-auth-library": "^8.0.2",
6161
"google-gax": "^3.3.0",
62+
"heap-js": "^2.2.0",
6263
"is-stream-ended": "^0.1.4",
6364
"lodash.snakecase": "^4.1.1",
6465
"p-defer": "^3.0.0"

samples/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ guides.
2626
* [Create Push Subscription](#create-push-subscription)
2727
* [Create Subscription](#create-subscription)
2828
* [Create Subscription With Dead Letter Policy](#create-subscription-with-dead-letter-policy)
29+
* [Create an exactly-once delivery subscription](#create-an-exactly-once-delivery-subscription)
2930
* [Create Subscription With Filtering](#create-subscription-with-filtering)
3031
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
3132
* [Create Topic](#create-topic)
@@ -45,6 +46,7 @@ guides.
4546
* [Listen For Avro Records](#listen-for-avro-records)
4647
* [Listen For Errors](#listen-for-errors)
4748
* [Listen For Messages](#listen-for-messages)
49+
* [Listen with exactly-once delivery](#listen-with-exactly-once-delivery)
4850
* [Listen For Protobuf Messages](#listen-for-protobuf-messages)
4951
* [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes)
5052
* [Modify Push Configuration](#modify-push-configuration)
@@ -199,6 +201,25 @@ __Usage:__
199201

200202

201203

204+
### Create an exactly-once delivery subscription
205+
206+
Demonstrates how to create a subscription for exactly-once delivery.
207+
208+
View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js).
209+
210+
[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md)
211+
212+
__Usage:__
213+
214+
215+
`node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>`
216+
217+
218+
-----
219+
220+
221+
222+
202223
### Create Subscription With Filtering
203224

204225
Creates a new subscription with filtering.
@@ -560,6 +581,25 @@ __Usage:__
560581

561582

562583

584+
### Listen with exactly-once delivery
585+
586+
Listen for messages on an exactly-once delivery subscription.
587+
588+
View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js).
589+
590+
[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md)
591+
592+
__Usage:__
593+
594+
595+
`node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id>`
596+
597+
598+
-----
599+
600+
601+
602+
563603
### Listen For Protobuf Messages
564604

565605
Listens for messages in protobuf encoding from a subscription.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* This application demonstrates how to perform basic operations on
17+
* schemas with the Google Cloud Pub/Sub API.
18+
*
19+
* For more information, see the README.md under /pubsub and the documentation
20+
* at https://cloud.google.com/pubsub/docs.
21+
*/
22+
23+
// This is a generated sample. Please see typescript/README.md for more info.
24+
25+
'use strict';
26+
27+
// sample-metadata:
28+
// title: Create an exactly-once delivery subscription
29+
// description: Demonstrates how to create a subscription for exactly-once delivery.
30+
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>
31+
32+
// [START pubsub_create_subscription_with_exactly_once_delivery]
33+
/**
34+
* TODO(developer): Uncomment these variables before running the sample.
35+
*/
36+
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
37+
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
38+
39+
// Imports the Google Cloud client library
40+
const {PubSub} = require('@google-cloud/pubsub');
41+
42+
// Creates a client; cache this for further use
43+
const pubSubClient = new PubSub();
44+
45+
async function createSubscriptionWithExactlyOnceDelivery(
46+
topicNameOrId,
47+
subscriptionNameOrId
48+
) {
49+
// Creates a new subscription
50+
await pubSubClient
51+
.topic(topicNameOrId)
52+
.createSubscription(subscriptionNameOrId, {
53+
enableExactlyOnceDelivery: true,
54+
});
55+
console.log(
56+
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
57+
);
58+
console.log(
59+
'To process messages, remember to check the return value of ackWithResponse().'
60+
);
61+
}
62+
// [END pubsub_create_subscription_with_exactly_once_delivery]
63+
64+
function main(
65+
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
66+
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'
67+
) {
68+
createSubscriptionWithExactlyOnceDelivery(
69+
topicNameOrId,
70+
subscriptionNameOrId
71+
).catch(err => {
72+
console.error(err.message);
73+
process.exitCode = 1;
74+
});
75+
}
76+
77+
main(...process.argv.slice(2));
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* This application demonstrates how to perform basic operations on
17+
* schemas with the Google Cloud Pub/Sub API.
18+
*
19+
* For more information, see the README.md under /pubsub and the documentation
20+
* at https://cloud.google.com/pubsub/docs.
21+
*/
22+
23+
// This is a generated sample. Please see typescript/README.md for more info.
24+
25+
'use strict';
26+
27+
// sample-metadata:
28+
// title: Listen with exactly-once delivery
29+
// description: Listen for messages on an exactly-once delivery subscription.
30+
// usage: node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id>
31+
32+
// [START pubsub_subscriber_exactly_once]
33+
/**
34+
* TODO(developer): Uncomment this variable before running the sample.
35+
*/
36+
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
37+
38+
// Imports the Google Cloud client library
39+
const {PubSub} = require('@google-cloud/pubsub');
40+
41+
// Creates a client; cache this for further use
42+
const pubSubClient = new PubSub();
43+
44+
async function listenForMessagesWithExactlyOnceDelivery(
45+
subscriptionNameOrId,
46+
timeout
47+
) {
48+
// References an existing subscription
49+
const subscription = pubSubClient.subscription(subscriptionNameOrId);
50+
51+
// Create an event handler to handle messages
52+
let messageCount = 0;
53+
const messageHandler = async message => {
54+
console.log(`Received message ${message.id}:`);
55+
console.log(`\tData: ${message.data}`);
56+
console.log(`\tAttributes: ${message.attributes}`);
57+
messageCount++;
58+
59+
// Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
60+
// the result of the acknowledge call. When exactly-once delivery is enabled
61+
// on the subscription, the message is guaranteed not to be delivered again
62+
// if the ack Promise resolves.
63+
try {
64+
// When the Promise resolves, the value is always AckResponses.Success,
65+
// signaling that the ack was accepted. Note that you may call this
66+
// method on a subscription without exactly-once delivery, but it will
67+
// always return AckResponses.Success.
68+
await message.ackWithResponse();
69+
console.log(`Ack for message ${message.id} successful.`);
70+
} catch (e) {
71+
// In all other cases, the error passed on reject will explain why. This
72+
// is only for permanent failures; transient errors are retried automatically.
73+
const ackError = e;
74+
console.log(
75+
`Ack for message ${message.id} failed with error: ${ackError.errorCode}`
76+
);
77+
}
78+
};
79+
80+
// Listen for new messages until timeout is hit
81+
subscription.on('message', messageHandler);
82+
83+
setTimeout(() => {
84+
subscription.removeListener('message', messageHandler);
85+
console.log(`${messageCount} message(s) received.`);
86+
}, timeout * 1000);
87+
}
88+
// [END pubsub_subscriber_exactly_once]
89+
90+
function main(
91+
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
92+
timeout = 60
93+
) {
94+
listenForMessagesWithExactlyOnceDelivery(
95+
subscriptionNameOrId,
96+
Number(timeout)
97+
).catch(err => {
98+
console.error(err.message);
99+
process.exitCode = 1;
100+
});
101+
}
102+
103+
main(...process.argv.slice(2));

samples/system-test/subscriptions.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,4 +505,43 @@ describe('subscriptions', () => {
505505
.get();
506506
assert.strictEqual(subscription.metadata?.enableMessageOrdering, true);
507507
});
508+
509+
it('should create an exactly-once delivery sub and listen on it.', async () => {
510+
const testId = 'eos';
511+
const topic = await createTopic(testId);
512+
const subName = reserveSub(testId);
513+
const output = execSync(
514+
`${commandFor('createSubscriptionWithExactlyOnceDelivery')} ${
515+
topic.name
516+
} ${subName}`
517+
);
518+
assert.include(
519+
output,
520+
`Created subscription ${subName} with exactly-once delivery.`
521+
);
522+
523+
const [subscription] = await pubsub
524+
.topic(topic.name)
525+
.subscription(subName)
526+
.get();
527+
assert.strictEqual(subscription.metadata?.enableExactlyOnceDelivery, true);
528+
529+
const message = Buffer.from('test message');
530+
const messageIds = [
531+
await topic.publishMessage({
532+
data: message,
533+
}),
534+
await topic.publishMessage({
535+
data: message,
536+
}),
537+
];
538+
539+
const output2 = execSync(
540+
`${commandFor('listenForMessagesWithExactlyOnceDelivery')} ${subName} 15`
541+
);
542+
543+
for (const id of messageIds) {
544+
assert.include(output2, `Ack for message ${id} successful`);
545+
}
546+
});
508547
});
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* This application demonstrates how to perform basic operations on
17+
* schemas with the Google Cloud Pub/Sub API.
18+
*
19+
* For more information, see the README.md under /pubsub and the documentation
20+
* at https://cloud.google.com/pubsub/docs.
21+
*/
22+
23+
// sample-metadata:
24+
// title: Create an exactly-once delivery subscription
25+
// description: Demonstrates how to create a subscription for exactly-once delivery.
26+
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>
27+
28+
// [START pubsub_create_subscription_with_exactly_once_delivery]
29+
/**
30+
* TODO(developer): Uncomment these variables before running the sample.
31+
*/
32+
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
33+
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
34+
35+
// Imports the Google Cloud client library
36+
import {PubSub} from '@google-cloud/pubsub';
37+
38+
// Creates a client; cache this for further use
39+
const pubSubClient = new PubSub();
40+
41+
async function createSubscriptionWithExactlyOnceDelivery(
42+
topicNameOrId: string,
43+
subscriptionNameOrId: string
44+
) {
45+
// Creates a new subscription
46+
await pubSubClient
47+
.topic(topicNameOrId)
48+
.createSubscription(subscriptionNameOrId, {
49+
enableExactlyOnceDelivery: true,
50+
});
51+
console.log(
52+
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
53+
);
54+
console.log(
55+
'To process messages, remember to check the return value of ackWithResponse().'
56+
);
57+
}
58+
// [END pubsub_create_subscription_with_exactly_once_delivery]
59+
60+
function main(
61+
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
62+
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'
63+
) {
64+
createSubscriptionWithExactlyOnceDelivery(
65+
topicNameOrId,
66+
subscriptionNameOrId
67+
).catch(err => {
68+
console.error(err.message);
69+
process.exitCode = 1;
70+
});
71+
}
72+
73+
main(...process.argv.slice(2));

0 commit comments

Comments
 (0)