-
-
Notifications
You must be signed in to change notification settings - Fork 16.2k
Make ScheduledEventExecutor task scheduler pluggable #13552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,33 +15,28 @@ | |
*/ | ||
package io.netty5.util.concurrent; | ||
|
||
import io.netty5.util.internal.DefaultPriorityQueue; | ||
import io.netty5.util.internal.PriorityQueue; | ||
import io.netty5.util.internal.PriorityQueueNode; | ||
|
||
import java.util.Comparator; | ||
import java.util.Queue; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static java.util.Objects.requireNonNull; | ||
import static java.util.concurrent.Executors.callable; | ||
|
||
/** | ||
* Abstract base class for {@link EventExecutor}s that want to support scheduling. | ||
*/ | ||
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { | ||
|
||
private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR = | ||
Comparable::compareTo; | ||
private static final RunnableScheduledFutureNode<?>[] | ||
EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES = new RunnableScheduledFutureNode<?>[0]; | ||
|
||
private PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue; | ||
private TaskScheduler taskScheduler; | ||
private TaskSchedulerFactory taskSchedulerFactory; | ||
Comment on lines
+30
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scalability Issue: The introduction of TaskScheduler and TaskSchedulerFactory as fields increases the memory footprint of each AbstractScheduledEventExecutor instance. In a highly scalable system, where potentially millions of these executors could be instantiated, this additional memory usage could become significant. Moreover, the dynamic selection of TaskScheduler based on a factory pattern, while providing flexibility, introduces additional overhead during executor instantiation, which could impact the startup time of systems with a large number of executors.
|
||
|
||
protected AbstractScheduledEventExecutor() { | ||
} | ||
|
||
protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) { | ||
this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory"); | ||
} | ||
|
||
/** | ||
* Return the {@link Ticker} that provides the time source. | ||
*/ | ||
|
@@ -55,18 +50,18 @@ static long deadlineNanos(long nanoTime, long delay) { | |
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; | ||
} | ||
|
||
PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() { | ||
if (scheduledTaskQueue == null) { | ||
scheduledTaskQueue = new DefaultPriorityQueue<>( | ||
SCHEDULED_FUTURE_TASK_COMPARATOR, | ||
// Use same initial capacity as java.util.PriorityQueue | ||
11); | ||
TaskScheduler taskScheduler() { | ||
if (taskScheduler == null && taskSchedulerFactory != null) { | ||
taskScheduler = taskSchedulerFactory.newTaskScheduler(this); | ||
} | ||
return scheduledTaskQueue; | ||
if (taskScheduler == null) { | ||
taskScheduler = DefaultTaskSchedulerFactory.ISTANCE.newTaskScheduler(this); | ||
} | ||
return taskScheduler; | ||
} | ||
|
||
private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue) { | ||
return queue == null || queue.isEmpty(); | ||
private static boolean isNullOrEmpty(TaskScheduler taskScheduler) { | ||
return taskScheduler == null || taskScheduler.isEmpty(); | ||
} | ||
|
||
/** | ||
|
@@ -76,19 +71,11 @@ private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue | |
*/ | ||
protected final void cancelScheduledTasks() { | ||
assert inEventLoop(); | ||
PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue; | ||
if (isNullOrEmpty(scheduledTaskQueue)) { | ||
TaskScheduler taskScheduler = this.taskScheduler; | ||
if (isNullOrEmpty(taskScheduler)) { | ||
return; | ||
} | ||
|
||
final RunnableScheduledFutureNode<?>[] scheduledTasks = | ||
scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES); | ||
|
||
for (RunnableScheduledFutureNode<?> task : scheduledTasks) { | ||
task.cancel(); | ||
} | ||
|
||
scheduledTaskQueue.clearIgnoringIndexes(); | ||
taskScheduler.cancelScheduledTasks(); | ||
} | ||
|
||
/** | ||
|
@@ -106,18 +93,8 @@ protected final RunnableScheduledFuture<?> pollScheduledTask() { | |
*/ | ||
protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) { | ||
assert inEventLoop(); | ||
|
||
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue; | ||
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); | ||
if (scheduledTask == null) { | ||
return null; | ||
} | ||
|
||
if (scheduledTask.deadlineNanos() <= nanoTime) { | ||
scheduledTaskQueue.remove(); | ||
return scheduledTask; | ||
} | ||
return null; | ||
TaskScheduler taskScheduler = this.taskScheduler; | ||
return taskScheduler == null? null : taskScheduler.pollScheduledTask(nanoTime); | ||
} | ||
|
||
/** | ||
|
@@ -126,20 +103,17 @@ protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) { | |
* This method MUST be called only when {@link #inEventLoop()} is {@code true}. | ||
*/ | ||
protected final long nextScheduledTaskNano() { | ||
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue; | ||
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); | ||
assert inEventLoop(); | ||
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask(); | ||
if (scheduledTask == null) { | ||
return -1; | ||
} | ||
return Math.max(0, scheduledTask.deadlineNanos() - ticker().nanoTime()); | ||
} | ||
|
||
final RunnableScheduledFuture<?> peekScheduledTask() { | ||
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue; | ||
if (scheduledTaskQueue == null) { | ||
return null; | ||
} | ||
return scheduledTaskQueue.peek(); | ||
TaskScheduler taskScheduler = this.taskScheduler; | ||
return taskScheduler == null? null : taskScheduler.peekScheduledTask(); | ||
} | ||
|
||
/** | ||
|
@@ -149,235 +123,45 @@ final RunnableScheduledFuture<?> peekScheduledTask() { | |
*/ | ||
protected final boolean hasScheduledTasks() { | ||
assert inEventLoop(); | ||
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue; | ||
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); | ||
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask(); | ||
return scheduledTask != null && scheduledTask.deadlineNanos() <= ticker().nanoTime(); | ||
} | ||
|
||
@Override | ||
public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) { | ||
requireNonNull(command, "command"); | ||
requireNonNull(unit, "unit"); | ||
if (delay < 0) { | ||
delay = 0; | ||
} | ||
RunnableScheduledFuture<Void> task = newScheduledTaskFor( | ||
callable(command, null), deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0); | ||
return schedule(task); | ||
return taskScheduler().schedule(command, delay, unit); | ||
} | ||
|
||
@Override | ||
public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { | ||
requireNonNull(callable, "callable"); | ||
requireNonNull(unit, "unit"); | ||
if (delay < 0) { | ||
delay = 0; | ||
} | ||
RunnableScheduledFuture<V> task = newScheduledTaskFor( | ||
callable, deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0); | ||
return schedule(task); | ||
return taskScheduler().schedule(callable, delay, unit); | ||
} | ||
|
||
@Override | ||
public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { | ||
requireNonNull(command, "command"); | ||
requireNonNull(unit, "unit"); | ||
if (initialDelay < 0) { | ||
throw new IllegalArgumentException( | ||
String.format("initialDelay: %d (expected: >= 0)", initialDelay)); | ||
} | ||
if (period <= 0) { | ||
throw new IllegalArgumentException( | ||
String.format("period: %d (expected: > 0)", period)); | ||
} | ||
|
||
RunnableScheduledFuture<Void> task = newScheduledTaskFor( | ||
callable(command, null), | ||
deadlineNanos(ticker().nanoTime(), unit.toNanos(initialDelay)), unit.toNanos(period)); | ||
return schedule(task); | ||
return taskScheduler().scheduleAtFixedRate(command, initialDelay, period, unit); | ||
} | ||
|
||
@Override | ||
public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { | ||
requireNonNull(command, "command"); | ||
requireNonNull(unit, "unit"); | ||
if (initialDelay < 0) { | ||
throw new IllegalArgumentException( | ||
String.format("initialDelay: %d (expected: >= 0)", initialDelay)); | ||
} | ||
if (delay <= 0) { | ||
throw new IllegalArgumentException( | ||
String.format("delay: %d (expected: > 0)", delay)); | ||
} | ||
|
||
RunnableScheduledFuture<Void> task = newScheduledTaskFor( | ||
callable(command, null), | ||
deadlineNanos(ticker().nanoTime(), unit.toNanos(initialDelay)), -unit.toNanos(delay)); | ||
return schedule(task); | ||
return taskScheduler().scheduleWithFixedDelay(command, initialDelay, delay, unit); | ||
} | ||
|
||
/** | ||
* Add the {@link RunnableScheduledFuture} for execution. | ||
* Schedule the {@link RunnableScheduledFuture} for execution. | ||
*/ | ||
protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) { | ||
if (inEventLoop()) { | ||
add0(task); | ||
} else { | ||
execute(() -> add0(task)); | ||
} | ||
return task; | ||
return taskScheduler().schedule(task); | ||
} | ||
|
||
private <V> void add0(RunnableScheduledFuture<V> task) { | ||
final RunnableScheduledFutureNode<V> node; | ||
if (task instanceof RunnableScheduledFutureNode) { | ||
node = (RunnableScheduledFutureNode<V>) task; | ||
} else { | ||
node = new DefaultRunnableScheduledFutureNode<>(task); | ||
} | ||
scheduledTaskQueue().add(node); | ||
} | ||
|
||
final void removeScheduled(final RunnableScheduledFutureNode<?> task) { | ||
final void removeScheduled(final RunnableScheduledFuture<?> task) { | ||
if (inEventLoop()) { | ||
scheduledTaskQueue().removeTyped(task); | ||
taskScheduler().removeScheduled(task); | ||
} else { | ||
execute(() -> removeScheduled(task)); | ||
} | ||
} | ||
|
||
/** | ||
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}. | ||
* <p> | ||
* This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a | ||
* different {@link RunnableFuture}. | ||
*/ | ||
protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture( | ||
AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> task, | ||
long deadlineNanos, long periodNanos) { | ||
return new RunnableScheduledFutureAdapter<>(executor, promise, task, deadlineNanos, periodNanos); | ||
} | ||
|
||
/** | ||
* Returns a {@code RunnableScheduledFuture} for the given values. | ||
*/ | ||
protected <V> RunnableScheduledFuture<V> newScheduledTaskFor( | ||
Callable<V> callable, long deadlineNanos, long period) { | ||
return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period); | ||
} | ||
|
||
interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> { | ||
} | ||
|
||
private static final class DefaultRunnableScheduledFutureNode<V> implements RunnableScheduledFutureNode<V> { | ||
private final RunnableScheduledFuture<V> future; | ||
private int queueIndex = INDEX_NOT_IN_QUEUE; | ||
|
||
DefaultRunnableScheduledFutureNode(RunnableScheduledFuture<V> future) { | ||
this.future = future; | ||
} | ||
|
||
@Override | ||
public EventExecutor executor() { | ||
return future.executor(); | ||
} | ||
|
||
@Override | ||
public long deadlineNanos() { | ||
return future.deadlineNanos(); | ||
} | ||
|
||
@Override | ||
public long delayNanos() { | ||
return future.delayNanos(); | ||
} | ||
|
||
@Override | ||
public long delayNanos(long currentTimeNanos) { | ||
return future.delayNanos(currentTimeNanos); | ||
} | ||
|
||
@Override | ||
public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener) { | ||
future.addListener(listener); | ||
return this; | ||
} | ||
|
||
@Override | ||
public <C> RunnableScheduledFuture<V> addListener( | ||
C context, FutureContextListener<? super C, ? super V> listener) { | ||
future.addListener(context, listener); | ||
return this; | ||
} | ||
|
||
@Override | ||
public boolean isPeriodic() { | ||
return future.isPeriodic(); | ||
} | ||
|
||
@Override | ||
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) { | ||
return queueIndex; | ||
} | ||
|
||
@Override | ||
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) { | ||
queueIndex = i; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
future.run(); | ||
} | ||
|
||
@Override | ||
public boolean cancel() { | ||
return future.cancel(); | ||
} | ||
|
||
@Override | ||
public boolean isCancelled() { | ||
return future.isCancelled(); | ||
} | ||
|
||
@Override | ||
public boolean isDone() { | ||
return future.isDone(); | ||
} | ||
|
||
@Override | ||
public FutureCompletionStage<V> asStage() { | ||
return future.asStage(); | ||
} | ||
|
||
@Override | ||
public int compareTo(RunnableScheduledFuture<?> o) { | ||
return future.compareTo(o); | ||
} | ||
|
||
@Override | ||
public boolean isSuccess() { | ||
return future.isSuccess(); | ||
} | ||
|
||
@Override | ||
public boolean isFailed() { | ||
return future.isFailed(); | ||
} | ||
|
||
@Override | ||
public boolean isCancellable() { | ||
return future.isCancellable(); | ||
} | ||
|
||
@Override | ||
public Throwable cause() { | ||
return future.cause(); | ||
} | ||
|
||
@Override | ||
public V getNow() { | ||
return future.getNow(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Consider documenting the behavior when both taskScheduler and taskSchedulerFactory are null. It might be beneficial to clarify the expected behavior or fallback mechanism in such cases, ensuring that users of the API have a clear understanding of how task scheduling is handled.
Code Suggestion: