Skip to content

Commit 820f986

Browse files
authored
docs: Add samples and test for ingestion from Kafka sources (#1354)
1 parent 316883a commit 820f986

File tree

4 files changed

+328
-2
lines changed

4 files changed

+328
-2
lines changed

samples/snippets/noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def get_pytest_env_vars() -> Dict[str, str]:
124124
"--builtin=gettext",
125125
"--max-complexity=20",
126126
"--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py",
127-
"--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202",
127+
"--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202,C901",
128128
"--max-line-length=88",
129129
]
130130

samples/snippets/publisher.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,147 @@ def create_topic_with_cloud_storage_ingestion(
185185
# [END pubsub_create_topic_with_cloud_storage_ingestion]
186186

187187

188+
def create_topic_with_aws_msk_ingestion(
189+
project_id: str,
190+
topic_id: str,
191+
cluster_arn: str,
192+
msk_topic: str,
193+
aws_role_arn: str,
194+
gcp_service_account: str,
195+
) -> None:
196+
"""Create a new Pub/Sub topic with AWS MSK Ingestion Settings."""
197+
# [START pubsub_create_topic_with_aws_msk_ingestion]
198+
from google.cloud import pubsub_v1
199+
from google.pubsub_v1.types import Topic
200+
from google.pubsub_v1.types import IngestionDataSourceSettings
201+
202+
# TODO(developer)
203+
# project_id = "your-project-id"
204+
# topic_id = "your-topic-id"
205+
# cluster_arn = "your-cluster-arn"
206+
# msk_topic = "your-msk-topic"
207+
# aws_role_arn = "your-aws-role-arn"
208+
# gcp_service_account = "your-gcp-service-account"
209+
210+
publisher = pubsub_v1.PublisherClient()
211+
topic_path = publisher.topic_path(project_id, topic_id)
212+
213+
request = Topic(
214+
name=topic_path,
215+
ingestion_data_source_settings=IngestionDataSourceSettings(
216+
aws_msk=IngestionDataSourceSettings.AwsMsk(
217+
cluster_arn=cluster_arn,
218+
topic=msk_topic,
219+
aws_role_arn=aws_role_arn,
220+
gcp_service_account=gcp_service_account,
221+
)
222+
),
223+
)
224+
225+
topic = publisher.create_topic(request=request)
226+
227+
print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings")
228+
# [END pubsub_create_topic_with_aws_msk_ingestion]
229+
230+
231+
def create_topic_with_azure_event_hubs_ingestion(
232+
project_id: str,
233+
topic_id: str,
234+
resource_group: str,
235+
namespace: str,
236+
event_hub: str,
237+
client_id: str,
238+
tenant_id: str,
239+
subscription_id: str,
240+
gcp_service_account: str,
241+
) -> None:
242+
"""Create a new Pub/Sub topic with Azure Event Hubs Ingestion Settings."""
243+
# [START pubsub_create_topic_with_azure_event_hubs_ingestion]
244+
from google.cloud import pubsub_v1
245+
from google.pubsub_v1.types import Topic
246+
from google.pubsub_v1.types import IngestionDataSourceSettings
247+
248+
# TODO(developer)
249+
# project_id = "your-project-id"
250+
# topic_id = "your-topic-id"
251+
# resource_group = "your-resource-group"
252+
# namespace = "your-namespace"
253+
# event_hub = "your-event-hub"
254+
# client_id = "your-client-id"
255+
# tenant_id = "your-tenant-id"
256+
# subscription_id = "your-subscription-id"
257+
# gcp_service_account = "your-gcp-service-account"
258+
259+
publisher = pubsub_v1.PublisherClient()
260+
topic_path = publisher.topic_path(project_id, topic_id)
261+
262+
request = Topic(
263+
name=topic_path,
264+
ingestion_data_source_settings=IngestionDataSourceSettings(
265+
azure_event_hubs=IngestionDataSourceSettings.AzureEventHubs(
266+
resource_group=resource_group,
267+
namespace=namespace,
268+
event_hub=event_hub,
269+
client_id=client_id,
270+
tenant_id=tenant_id,
271+
subscription_id=subscription_id,
272+
gcp_service_account=gcp_service_account,
273+
)
274+
),
275+
)
276+
277+
topic = publisher.create_topic(request=request)
278+
279+
print(f"Created topic: {topic.name} with Azure Event Hubs Ingestion Settings")
280+
# [END pubsub_create_topic_with_azure_event_hubs_ingestion]
281+
282+
283+
def create_topic_with_confluent_cloud_ingestion(
284+
project_id: str,
285+
topic_id: str,
286+
bootstrap_server: str,
287+
cluster_id: str,
288+
confluent_topic: str,
289+
identity_pool_id: str,
290+
gcp_service_account: str,
291+
) -> None:
292+
"""Create a new Pub/Sub topic with Confluent Cloud Ingestion Settings."""
293+
# [START pubsub_create_topic_with_confluent_cloud_ingestion]
294+
from google.cloud import pubsub_v1
295+
from google.pubsub_v1.types import Topic
296+
from google.pubsub_v1.types import IngestionDataSourceSettings
297+
298+
# TODO(developer)
299+
# project_id = "your-project-id"
300+
# topic_id = "your-topic-id"
301+
# bootstrap_server = "your-bootstrap-server"
302+
# cluster_id = "your-cluster-id"
303+
# confluent_topic = "your-confluent-topic"
304+
# identity_pool_id = "your-identity-pool-id"
305+
# gcp_service_account = "your-gcp-service-account"
306+
307+
publisher = pubsub_v1.PublisherClient()
308+
topic_path = publisher.topic_path(project_id, topic_id)
309+
310+
request = Topic(
311+
name=topic_path,
312+
ingestion_data_source_settings=IngestionDataSourceSettings(
313+
confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
314+
bootstrap_server=bootstrap_server,
315+
cluster_id=cluster_id,
316+
topic=confluent_topic,
317+
identity_pool_id=identity_pool_id,
318+
gcp_service_account=gcp_service_account,
319+
)
320+
),
321+
)
322+
323+
topic = publisher.create_topic(request=request)
324+
325+
print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")
326+
# [END pubsub_create_topic_with_confluent_cloud_ingestion]
327+
328+
188329
def update_topic_type(
189330
project_id: str,
190331
topic_id: str,
@@ -710,6 +851,43 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
710851
"minimum_object_create_time"
711852
)
712853

854+
create_topic_with_aws_msk_ingestion_parser = subparsers.add_parser(
855+
"create_aws_msk_ingestion", help=create_topic_with_aws_msk_ingestion.__doc__
856+
)
857+
create_topic_with_aws_msk_ingestion_parser.add_argument("topic_id")
858+
create_topic_with_aws_msk_ingestion_parser.add_argument("cluster_arn")
859+
create_topic_with_aws_msk_ingestion_parser.add_argument("msk_topic")
860+
create_topic_with_aws_msk_ingestion_parser.add_argument("aws_role_arn")
861+
create_topic_with_aws_msk_ingestion_parser.add_argument("gcp_service_account")
862+
863+
create_topic_with_azure_event_hubs_ingestion_parser = subparsers.add_parser(
864+
"create_azure_event_hubs_ingestion",
865+
help=create_topic_with_azure_event_hubs_ingestion.__doc__,
866+
)
867+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("topic_id")
868+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("resource_group")
869+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("namespace")
870+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("event_hub")
871+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("client_id")
872+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("tenant_id")
873+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument("subscription_id")
874+
create_topic_with_azure_event_hubs_ingestion_parser.add_argument(
875+
"gcp_service_account"
876+
)
877+
878+
create_topic_with_confluent_cloud_ingestion_parser = subparsers.add_parser(
879+
"create_confluent_cloud_ingestion",
880+
help=create_topic_with_confluent_cloud_ingestion.__doc__,
881+
)
882+
create_topic_with_confluent_cloud_ingestion_parser.add_argument("topic_id")
883+
create_topic_with_confluent_cloud_ingestion_parser.add_argument("bootstrap_server")
884+
create_topic_with_confluent_cloud_ingestion_parser.add_argument("cluster_id")
885+
create_topic_with_confluent_cloud_ingestion_parser.add_argument("confluent_topic")
886+
create_topic_with_confluent_cloud_ingestion_parser.add_argument("identity_pool_id")
887+
create_topic_with_confluent_cloud_ingestion_parser.add_argument(
888+
"gcp_service_account"
889+
)
890+
713891
update_topic_type_parser = subparsers.add_parser(
714892
"update_kinesis_ingestion", help=update_topic_type.__doc__
715893
)
@@ -798,6 +976,37 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
798976
args.match_glob,
799977
args.minimum_object_create_time,
800978
)
979+
elif args.command == "create_aws_msk_ingestion":
980+
create_topic_with_aws_msk_ingestion(
981+
args.project_id,
982+
args.topic_id,
983+
args.cluster_arn,
984+
args.msk_topic,
985+
args.aws_role_arn,
986+
args.gcp_service_account,
987+
)
988+
elif args.command == "create_azure_event_hubs_ingestion":
989+
create_topic_with_azure_event_hubs_ingestion(
990+
args.project_id,
991+
args.topic_id,
992+
args.resource_group,
993+
args.namespace,
994+
args.event_hub,
995+
args.client_id,
996+
args.tenant_id,
997+
args.subscription_id,
998+
args.gcp_service_account,
999+
)
1000+
elif args.command == "create_confluent_cloud_ingestion":
1001+
create_topic_with_confluent_cloud_ingestion(
1002+
args.project_id,
1003+
args.topic_id,
1004+
args.bootstrap_server,
1005+
args.cluster_id,
1006+
args.confluent_topic,
1007+
args.identity_pool_id,
1008+
args.gcp_service_account,
1009+
)
8011010
elif args.command == "update_kinesis_ingestion":
8021011
update_topic_type(
8031012
args.project_id,

samples/snippets/publisher_test.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,123 @@ def test_create_topic_with_cloud_storage_ingestion(
196196
publisher_client.delete_topic(request={"topic": topic_path})
197197

198198

199+
def test_create_topic_with_aws_msk_ingestion(
200+
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
201+
) -> None:
202+
# The scope of `topic_path` is limited to this function.
203+
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)
204+
205+
# Outside of automated CI tests, these values must be of actual AWS resources for the test to pass.
206+
cluster_arn = (
207+
"arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1"
208+
)
209+
msk_topic = "fake-msk-topic-name"
210+
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
211+
gcp_service_account = (
212+
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
213+
)
214+
215+
try:
216+
publisher_client.delete_topic(request={"topic": topic_path})
217+
except NotFound:
218+
pass
219+
220+
publisher.create_topic_with_aws_msk_ingestion(
221+
PROJECT_ID,
222+
TOPIC_ID,
223+
cluster_arn,
224+
msk_topic,
225+
aws_role_arn,
226+
gcp_service_account,
227+
)
228+
229+
out, _ = capsys.readouterr()
230+
assert f"Created topic: {topic_path} with AWS MSK Ingestion Settings" in out
231+
232+
# Clean up resource created for the test.
233+
publisher_client.delete_topic(request={"topic": topic_path})
234+
235+
236+
def test_create_topic_with_azure_event_hubs_ingestion(
237+
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
238+
) -> None:
239+
# The scope of `topic_path` is limited to this function.
240+
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)
241+
242+
# Outside of automated CI tests, these values must be of actual Azure resources for the test to pass.
243+
resource_group = "fake-resource-group"
244+
namespace = "fake-namespace"
245+
event_hub = "fake-event-hub"
246+
client_id = "fake-client-id"
247+
tenant_id = "fake-tenant-id"
248+
subcription_id = "fake-subscription-id"
249+
gcp_service_account = (
250+
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
251+
)
252+
253+
try:
254+
publisher_client.delete_topic(request={"topic": topic_path})
255+
except NotFound:
256+
pass
257+
258+
publisher.create_topic_with_azure_event_hubs_ingestion(
259+
PROJECT_ID,
260+
TOPIC_ID,
261+
resource_group,
262+
namespace,
263+
event_hub,
264+
client_id,
265+
tenant_id,
266+
subcription_id,
267+
gcp_service_account,
268+
)
269+
270+
out, _ = capsys.readouterr()
271+
assert (
272+
f"Created topic: {topic_path} with Azure Event Hubs Ingestion Settings" in out
273+
)
274+
275+
# Clean up resource created for the test.
276+
publisher_client.delete_topic(request={"topic": topic_path})
277+
278+
279+
def test_create_topic_with_confluent_cloud_ingestion(
280+
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
281+
) -> None:
282+
# The scope of `topic_path` is limited to this function.
283+
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)
284+
285+
# Outside of automated CI tests, these values must be of actual Confluent resources for the test to pass.
286+
bootstrap_server = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092"
287+
cluster_id = "fake-cluster-id"
288+
confluent_topic = "fake-confluent-topic-name"
289+
identity_pool_id = "fake-identity-pool-id"
290+
gcp_service_account = (
291+
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
292+
)
293+
294+
try:
295+
publisher_client.delete_topic(request={"topic": topic_path})
296+
except NotFound:
297+
pass
298+
299+
publisher.create_topic_with_confluent_cloud_ingestion(
300+
PROJECT_ID,
301+
TOPIC_ID,
302+
bootstrap_server,
303+
cluster_id,
304+
confluent_topic,
305+
identity_pool_id,
306+
gcp_service_account,
307+
)
308+
309+
out, _ = capsys.readouterr()
310+
assert f"Created topic: {topic_path} with Confluent Cloud Ingestion Settings" in out
311+
312+
# Clean up resource created for the test.
313+
publisher_client.delete_topic(request={"topic": topic_path})
314+
315+
199316
def test_update_topic_type(
200317
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
201318
) -> None:

samples/snippets/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
google-cloud-pubsub==2.27.1
1+
google-cloud-pubsub==2.28.0
22
avro==1.12.0
33
protobuf===4.24.4; python_version == '3.7'
44
protobuf==5.29.2; python_version >= '3.8'

0 commit comments

Comments
 (0)