Skip to content

Commit 8301974

Browse files
zzzeekGerrit Code Review
authored andcommitted
Merge "Create terminate mixin" into main
2 parents 071abbb + dddfa96 commit 8301974

File tree

5 files changed

+86
-38
lines changed

5 files changed

+86
-38
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
.. change::
2+
:tags: usecase, asyncio
3+
:tickets: 12273
4+
5+
Generalize the terminate logic employed by the asyncpg dialect to reuse
6+
it in the aiomysql and asyncmy dialect implementation.

lib/sqlalchemy/connectors/asyncio.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from typing import Optional
2121
from typing import Protocol
2222
from typing import Sequence
23+
from typing import Tuple
24+
from typing import Type
2325
from typing import TYPE_CHECKING
2426

2527
from ..engine import AdaptedConnection
@@ -374,3 +376,43 @@ def commit(self) -> None:
374376

375377
def close(self) -> None:
376378
await_(self._connection.close())
379+
380+
381+
class AsyncAdapt_terminate:
382+
"""Mixin for a AsyncAdapt_dbapi_connection to add terminate support."""
383+
384+
__slots__ = ()
385+
386+
def terminate(self) -> None:
387+
if in_greenlet():
388+
# in a greenlet; this is the connection was invalidated case.
389+
try:
390+
# try to gracefully close; see #10717
391+
await_(asyncio.shield(self._terminate_graceful_close()))
392+
except self._terminate_handled_exceptions() as e:
393+
# in the case where we are recycling an old connection
394+
# that may have already been disconnected, close() will
395+
# fail. In this case, terminate
396+
# the connection without any further waiting.
397+
# see issue #8419
398+
self._terminate_force_close()
399+
if isinstance(e, asyncio.CancelledError):
400+
# re-raise CancelledError if we were cancelled
401+
raise
402+
else:
403+
# not in a greenlet; this is the gc cleanup case
404+
self._terminate_force_close()
405+
406+
def _terminate_handled_exceptions(self) -> Tuple[Type[BaseException], ...]:
407+
"""Returns the exceptions that should be handled when
408+
calling _graceful_close.
409+
"""
410+
return (asyncio.TimeoutError, asyncio.CancelledError, OSError)
411+
412+
async def _terminate_graceful_close(self) -> None:
413+
"""Try to close connection gracefully"""
414+
raise NotImplementedError
415+
416+
def _terminate_force_close(self) -> None:
417+
"""Terminate the connection"""
418+
raise NotImplementedError

lib/sqlalchemy/dialects/mysql/aiomysql.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
4242
from ...connectors.asyncio import AsyncAdapt_dbapi_module
4343
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
44+
from ...connectors.asyncio import AsyncAdapt_terminate
4445
from ...util.concurrency import await_
4546

4647
if TYPE_CHECKING:
@@ -77,7 +78,9 @@ def _make_new_cursor(
7778
)
7879

7980

80-
class AsyncAdapt_aiomysql_connection(AsyncAdapt_dbapi_connection):
81+
class AsyncAdapt_aiomysql_connection(
82+
AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
83+
):
8184
__slots__ = ()
8285

8386
_cursor_cls = AsyncAdapt_aiomysql_cursor
@@ -96,13 +99,16 @@ def autocommit(self, value: Any) -> None:
9699
def get_autocommit(self) -> bool:
97100
return self._connection.get_autocommit() # type: ignore
98101

99-
def terminate(self) -> None:
100-
# it's not awaitable.
101-
self._connection.close()
102-
103102
def close(self) -> None:
104103
await_(self._connection.ensure_closed())
105104

105+
async def _terminate_graceful_close(self) -> None:
106+
await self._connection.ensure_closed()
107+
108+
def _terminate_force_close(self) -> None:
109+
# it's not awaitable.
110+
self._connection.close()
111+
106112

107113
class AsyncAdapt_aiomysql_dbapi(AsyncAdapt_dbapi_module):
108114
def __init__(self, aiomysql: ModuleType, pymysql: ModuleType):

lib/sqlalchemy/dialects/mysql/asyncmy.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
4242
from ...connectors.asyncio import AsyncAdapt_dbapi_module
4343
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
44+
from ...connectors.asyncio import AsyncAdapt_terminate
4445
from ...util.concurrency import await_
4546

4647
if TYPE_CHECKING:
@@ -72,7 +73,9 @@ def _make_new_cursor(
7273
)
7374

7475

75-
class AsyncAdapt_asyncmy_connection(AsyncAdapt_dbapi_connection):
76+
class AsyncAdapt_asyncmy_connection(
77+
AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
78+
):
7679
__slots__ = ()
7780

7881
_cursor_cls = AsyncAdapt_asyncmy_cursor
@@ -106,13 +109,16 @@ def autocommit(self, value: Any) -> None:
106109
def get_autocommit(self) -> bool:
107110
return self._connection.get_autocommit() # type: ignore
108111

109-
def terminate(self) -> None:
110-
# it's not awaitable.
111-
self._connection.close()
112-
113112
def close(self) -> None:
114113
await_(self._connection.ensure_closed())
115114

115+
async def _terminate_graceful_close(self) -> None:
116+
await self._connection.ensure_closed()
117+
118+
def _terminate_force_close(self) -> None:
119+
# it's not awaitable.
120+
self._connection.close()
121+
116122

117123
class AsyncAdapt_asyncmy_dbapi(AsyncAdapt_dbapi_module):
118124
def __init__(self, asyncmy: ModuleType):

lib/sqlalchemy/dialects/postgresql/asyncpg.py

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@
178178

179179
from __future__ import annotations
180180

181-
import asyncio
182181
from collections import deque
183182
import decimal
184183
import json as _py_json
@@ -219,6 +218,7 @@
219218
from ...connectors.asyncio import AsyncAdapt_dbapi_connection
220219
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
221220
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
221+
from ...connectors.asyncio import AsyncAdapt_terminate
222222
from ...engine import processors
223223
from ...sql import sqltypes
224224
from ...util.concurrency import await_
@@ -750,7 +750,9 @@ def executemany(self, operation, seq_of_parameters):
750750
)
751751

752752

753-
class AsyncAdapt_asyncpg_connection(AsyncAdapt_dbapi_connection):
753+
class AsyncAdapt_asyncpg_connection(
754+
AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
755+
):
754756
_cursor_cls = AsyncAdapt_asyncpg_cursor
755757
_ss_cursor_cls = AsyncAdapt_asyncpg_ss_cursor
756758

@@ -931,32 +933,18 @@ def close(self):
931933

932934
await_(self._connection.close())
933935

934-
def terminate(self):
935-
if util.concurrency.in_greenlet():
936-
# in a greenlet; this is the connection was invalidated
937-
# case.
938-
try:
939-
# try to gracefully close; see #10717
940-
# timeout added in asyncpg 0.14.0 December 2017
941-
await_(asyncio.shield(self._connection.close(timeout=2)))
942-
except (
943-
asyncio.TimeoutError,
944-
asyncio.CancelledError,
945-
OSError,
946-
self.dbapi.asyncpg.PostgresError,
947-
) as e:
948-
# in the case where we are recycling an old connection
949-
# that may have already been disconnected, close() will
950-
# fail with the above timeout. in this case, terminate
951-
# the connection without any further waiting.
952-
# see issue #8419
953-
self._connection.terminate()
954-
if isinstance(e, asyncio.CancelledError):
955-
# re-raise CancelledError if we were cancelled
956-
raise
957-
else:
958-
# not in a greenlet; this is the gc cleanup case
959-
self._connection.terminate()
936+
def _terminate_handled_exceptions(self):
937+
return super()._terminate_handled_exceptions() + (
938+
self.dbapi.asyncpg.PostgresError,
939+
)
940+
941+
async def _terminate_graceful_close(self) -> None:
942+
# timeout added in asyncpg 0.14.0 December 2017
943+
await self._connection.close(timeout=2)
944+
self._transaction = None
945+
946+
def _terminate_force_close(self) -> None:
947+
self._connection.terminate()
960948
self._transaction = None
961949

962950
@staticmethod

0 commit comments

Comments
 (0)