@@ -185,6 +185,147 @@ def create_topic_with_cloud_storage_ingestion(
185
185
# [END pubsub_create_topic_with_cloud_storage_ingestion]
186
186
187
187
188
+ def create_topic_with_aws_msk_ingestion (
189
+ project_id : str ,
190
+ topic_id : str ,
191
+ cluster_arn : str ,
192
+ msk_topic : str ,
193
+ aws_role_arn : str ,
194
+ gcp_service_account : str ,
195
+ ) -> None :
196
+ """Create a new Pub/Sub topic with AWS MSK Ingestion Settings."""
197
+ # [START pubsub_create_topic_with_aws_msk_ingestion]
198
+ from google .cloud import pubsub_v1
199
+ from google .pubsub_v1 .types import Topic
200
+ from google .pubsub_v1 .types import IngestionDataSourceSettings
201
+
202
+ # TODO(developer)
203
+ # project_id = "your-project-id"
204
+ # topic_id = "your-topic-id"
205
+ # cluster_arn = "your-cluster-arn"
206
+ # msk_topic = "your-msk-topic"
207
+ # aws_role_arn = "your-aws-role-arn"
208
+ # gcp_service_account = "your-gcp-service-account"
209
+
210
+ publisher = pubsub_v1 .PublisherClient ()
211
+ topic_path = publisher .topic_path (project_id , topic_id )
212
+
213
+ request = Topic (
214
+ name = topic_path ,
215
+ ingestion_data_source_settings = IngestionDataSourceSettings (
216
+ aws_msk = IngestionDataSourceSettings .AwsMsk (
217
+ cluster_arn = cluster_arn ,
218
+ topic = msk_topic ,
219
+ aws_role_arn = aws_role_arn ,
220
+ gcp_service_account = gcp_service_account ,
221
+ )
222
+ ),
223
+ )
224
+
225
+ topic = publisher .create_topic (request = request )
226
+
227
+ print (f"Created topic: { topic .name } with AWS MSK Ingestion Settings" )
228
+ # [END pubsub_create_topic_with_aws_msk_ingestion]
229
+
230
+
231
+ def create_topic_with_azure_event_hubs_ingestion (
232
+ project_id : str ,
233
+ topic_id : str ,
234
+ resource_group : str ,
235
+ namespace : str ,
236
+ event_hub : str ,
237
+ client_id : str ,
238
+ tenant_id : str ,
239
+ subscription_id : str ,
240
+ gcp_service_account : str ,
241
+ ) -> None :
242
+ """Create a new Pub/Sub topic with Azure Event Hubs Ingestion Settings."""
243
+ # [START pubsub_create_topic_with_azure_event_hubs_ingestion]
244
+ from google .cloud import pubsub_v1
245
+ from google .pubsub_v1 .types import Topic
246
+ from google .pubsub_v1 .types import IngestionDataSourceSettings
247
+
248
+ # TODO(developer)
249
+ # project_id = "your-project-id"
250
+ # topic_id = "your-topic-id"
251
+ # resource_group = "your-resource-group"
252
+ # namespace = "your-namespace"
253
+ # event_hub = "your-event-hub"
254
+ # client_id = "your-client-id"
255
+ # tenant_id = "your-tenant-id"
256
+ # subscription_id = "your-subscription-id"
257
+ # gcp_service_account = "your-gcp-service-account"
258
+
259
+ publisher = pubsub_v1 .PublisherClient ()
260
+ topic_path = publisher .topic_path (project_id , topic_id )
261
+
262
+ request = Topic (
263
+ name = topic_path ,
264
+ ingestion_data_source_settings = IngestionDataSourceSettings (
265
+ azure_event_hubs = IngestionDataSourceSettings .AzureEventHubs (
266
+ resource_group = resource_group ,
267
+ namespace = namespace ,
268
+ event_hub = event_hub ,
269
+ client_id = client_id ,
270
+ tenant_id = tenant_id ,
271
+ subscription_id = subscription_id ,
272
+ gcp_service_account = gcp_service_account ,
273
+ )
274
+ ),
275
+ )
276
+
277
+ topic = publisher .create_topic (request = request )
278
+
279
+ print (f"Created topic: { topic .name } with Azure Event Hubs Ingestion Settings" )
280
+ # [END pubsub_create_topic_with_azure_event_hubs_ingestion]
281
+
282
+
283
+ def create_topic_with_confluent_cloud_ingestion (
284
+ project_id : str ,
285
+ topic_id : str ,
286
+ bootstrap_server : str ,
287
+ cluster_id : str ,
288
+ confluent_topic : str ,
289
+ identity_pool_id : str ,
290
+ gcp_service_account : str ,
291
+ ) -> None :
292
+ """Create a new Pub/Sub topic with Confluent Cloud Ingestion Settings."""
293
+ # [START pubsub_create_topic_with_confluent_cloud_ingestion]
294
+ from google .cloud import pubsub_v1
295
+ from google .pubsub_v1 .types import Topic
296
+ from google .pubsub_v1 .types import IngestionDataSourceSettings
297
+
298
+ # TODO(developer)
299
+ # project_id = "your-project-id"
300
+ # topic_id = "your-topic-id"
301
+ # bootstrap_server = "your-bootstrap-server"
302
+ # cluster_id = "your-cluster-id"
303
+ # confluent_topic = "your-confluent-topic"
304
+ # identity_pool_id = "your-identity-pool-id"
305
+ # gcp_service_account = "your-gcp-service-account"
306
+
307
+ publisher = pubsub_v1 .PublisherClient ()
308
+ topic_path = publisher .topic_path (project_id , topic_id )
309
+
310
+ request = Topic (
311
+ name = topic_path ,
312
+ ingestion_data_source_settings = IngestionDataSourceSettings (
313
+ confluent_cloud = IngestionDataSourceSettings .ConfluentCloud (
314
+ bootstrap_server = bootstrap_server ,
315
+ cluster_id = cluster_id ,
316
+ topic = confluent_topic ,
317
+ identity_pool_id = identity_pool_id ,
318
+ gcp_service_account = gcp_service_account ,
319
+ )
320
+ ),
321
+ )
322
+
323
+ topic = publisher .create_topic (request = request )
324
+
325
+ print (f"Created topic: { topic .name } with Confluent Cloud Ingestion Settings" )
326
+ # [END pubsub_create_topic_with_confluent_cloud_ingestion]
327
+
328
+
188
329
def update_topic_type (
189
330
project_id : str ,
190
331
topic_id : str ,
@@ -710,6 +851,43 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
710
851
"minimum_object_create_time"
711
852
)
712
853
854
+ create_topic_with_aws_msk_ingestion_parser = subparsers .add_parser (
855
+ "create_aws_msk_ingestion" , help = create_topic_with_aws_msk_ingestion .__doc__
856
+ )
857
+ create_topic_with_aws_msk_ingestion_parser .add_argument ("topic_id" )
858
+ create_topic_with_aws_msk_ingestion_parser .add_argument ("cluster_arn" )
859
+ create_topic_with_aws_msk_ingestion_parser .add_argument ("msk_topic" )
860
+ create_topic_with_aws_msk_ingestion_parser .add_argument ("aws_role_arn" )
861
+ create_topic_with_aws_msk_ingestion_parser .add_argument ("gcp_service_account" )
862
+
863
+ create_topic_with_azure_event_hubs_ingestion_parser = subparsers .add_parser (
864
+ "create_azure_event_hubs_ingestion" ,
865
+ help = create_topic_with_azure_event_hubs_ingestion .__doc__ ,
866
+ )
867
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("topic_id" )
868
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("resource_group" )
869
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("namespace" )
870
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("event_hub" )
871
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("client_id" )
872
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("tenant_id" )
873
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument ("subscription_id" )
874
+ create_topic_with_azure_event_hubs_ingestion_parser .add_argument (
875
+ "gcp_service_account"
876
+ )
877
+
878
+ create_topic_with_confluent_cloud_ingestion_parser = subparsers .add_parser (
879
+ "create_confluent_cloud_ingestion" ,
880
+ help = create_topic_with_confluent_cloud_ingestion .__doc__ ,
881
+ )
882
+ create_topic_with_confluent_cloud_ingestion_parser .add_argument ("topic_id" )
883
+ create_topic_with_confluent_cloud_ingestion_parser .add_argument ("bootstrap_server" )
884
+ create_topic_with_confluent_cloud_ingestion_parser .add_argument ("cluster_id" )
885
+ create_topic_with_confluent_cloud_ingestion_parser .add_argument ("confluent_topic" )
886
+ create_topic_with_confluent_cloud_ingestion_parser .add_argument ("identity_pool_id" )
887
+ create_topic_with_confluent_cloud_ingestion_parser .add_argument (
888
+ "gcp_service_account"
889
+ )
890
+
713
891
update_topic_type_parser = subparsers .add_parser (
714
892
"update_kinesis_ingestion" , help = update_topic_type .__doc__
715
893
)
@@ -798,6 +976,37 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
798
976
args .match_glob ,
799
977
args .minimum_object_create_time ,
800
978
)
979
+ elif args .command == "create_aws_msk_ingestion" :
980
+ create_topic_with_aws_msk_ingestion (
981
+ args .project_id ,
982
+ args .topic_id ,
983
+ args .cluster_arn ,
984
+ args .msk_topic ,
985
+ args .aws_role_arn ,
986
+ args .gcp_service_account ,
987
+ )
988
+ elif args .command == "create_azure_event_hubs_ingestion" :
989
+ create_topic_with_azure_event_hubs_ingestion (
990
+ args .project_id ,
991
+ args .topic_id ,
992
+ args .resource_group ,
993
+ args .namespace ,
994
+ args .event_hub ,
995
+ args .client_id ,
996
+ args .tenant_id ,
997
+ args .subscription_id ,
998
+ args .gcp_service_account ,
999
+ )
1000
+ elif args .command == "create_confluent_cloud_ingestion" :
1001
+ create_topic_with_confluent_cloud_ingestion (
1002
+ args .project_id ,
1003
+ args .topic_id ,
1004
+ args .bootstrap_server ,
1005
+ args .cluster_id ,
1006
+ args .confluent_topic ,
1007
+ args .identity_pool_id ,
1008
+ args .gcp_service_account ,
1009
+ )
801
1010
elif args .command == "update_kinesis_ingestion" :
802
1011
update_topic_type (
803
1012
args .project_id ,
0 commit comments