Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,28 @@
*/
package io.netty5.util.concurrent;

import io.netty5.util.internal.DefaultPriorityQueue;
import io.netty5.util.internal.PriorityQueue;
import io.netty5.util.internal.PriorityQueueNode;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.callable;

/**
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
Comparable::compareTo;
private static final RunnableScheduledFutureNode<?>[]
EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES = new RunnableScheduledFutureNode<?>[0];

private PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue;
private TaskScheduler taskScheduler;
private TaskSchedulerFactory taskSchedulerFactory;
Comment on lines +30 to +31

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Consider documenting the behavior when both taskScheduler and taskSchedulerFactory are null. It might be beneficial to clarify the expected behavior or fallback mechanism in such cases, ensuring that users of the API have a clear understanding of how task scheduling is handled.
Code Suggestion:

+    /**
+     * Task scheduler instance used for scheduling tasks. If null, the taskSchedulerFactory is used
+     * to create a new instance. If both are null, document the expected behavior or fallback mechanism.
+     */
+    private TaskScheduler taskScheduler;
+    private TaskSchedulerFactory taskSchedulerFactory;

Comment on lines +30 to +31

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalability Issue: The introduction of TaskScheduler and TaskSchedulerFactory as fields increases the memory footprint of each AbstractScheduledEventExecutor instance. In a highly scalable system, where potentially millions of these executors could be instantiated, this additional memory usage could become significant. Moreover, the dynamic selection of TaskScheduler based on a factory pattern, while providing flexibility, introduces additional overhead during executor instantiation, which could impact the startup time of systems with a large number of executors.
Fix: Consider using a static, shared TaskScheduler instance for common scheduling scenarios, reducing the memory footprint per executor instance. This shared instance could be a highly efficient, multi-tenant scheduler designed to handle tasks from multiple executors concurrently. For use cases requiring specialized scheduling, continue to allow the injection of a custom TaskScheduler, but ensure this is an exceptional scenario rather than the norm.
Code Suggestion:

-    private TaskScheduler taskScheduler;
-    private TaskSchedulerFactory taskSchedulerFactory;
+    private static final TaskScheduler sharedTaskScheduler = DefaultTaskSchedulerFactory.INSTANCE.newTaskScheduler(this);

     protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) {
-        this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory");
+        // Use sharedTaskScheduler for common cases
     }

+    TaskScheduler taskScheduler() {
+        return sharedTaskScheduler;
+    }


protected AbstractScheduledEventExecutor() {
}

protected AbstractScheduledEventExecutor(TaskSchedulerFactory taskSchedulerFactory) {
this.taskSchedulerFactory = requireNonNull(taskSchedulerFactory, "taskSchedulerFactory");
}

/**
* Return the {@link Ticker} that provides the time source.
*/
Expand All @@ -55,18 +50,18 @@ static long deadlineNanos(long nanoTime, long delay) {
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
TaskScheduler taskScheduler() {
if (taskScheduler == null && taskSchedulerFactory != null) {
taskScheduler = taskSchedulerFactory.newTaskScheduler(this);
}
return scheduledTaskQueue;
if (taskScheduler == null) {
taskScheduler = DefaultTaskSchedulerFactory.ISTANCE.newTaskScheduler(this);
}
return taskScheduler;
}

private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue) {
return queue == null || queue.isEmpty();
private static boolean isNullOrEmpty(TaskScheduler taskScheduler) {
return taskScheduler == null || taskScheduler.isEmpty();
}

/**
Expand All @@ -76,19 +71,11 @@ private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue
*/
protected final void cancelScheduledTasks() {
assert inEventLoop();
PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
TaskScheduler taskScheduler = this.taskScheduler;
if (isNullOrEmpty(taskScheduler)) {
return;
}

final RunnableScheduledFutureNode<?>[] scheduledTasks =
scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES);

for (RunnableScheduledFutureNode<?> task : scheduledTasks) {
task.cancel();
}

scheduledTaskQueue.clearIgnoringIndexes();
taskScheduler.cancelScheduledTasks();
}

/**
Expand All @@ -106,18 +93,8 @@ protected final RunnableScheduledFuture<?> pollScheduledTask() {
*/
protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}

if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
TaskScheduler taskScheduler = this.taskScheduler;
return taskScheduler == null? null : taskScheduler.pollScheduledTask(nanoTime);
}

/**
Expand All @@ -126,20 +103,17 @@ protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final long nextScheduledTaskNano() {
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
assert inEventLoop();
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return -1;
}
return Math.max(0, scheduledTask.deadlineNanos() - ticker().nanoTime());
}

final RunnableScheduledFuture<?> peekScheduledTask() {
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
TaskScheduler taskScheduler = this.taskScheduler;
return taskScheduler == null? null : taskScheduler.peekScheduledTask();
}

/**
Expand All @@ -149,235 +123,45 @@ final RunnableScheduledFuture<?> peekScheduledTask() {
*/
protected final boolean hasScheduledTasks() {
assert inEventLoop();
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
return scheduledTask != null && scheduledTask.deadlineNanos() <= ticker().nanoTime();
}

@Override
public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0);
return schedule(task);
return taskScheduler().schedule(command, delay, unit);
}

@Override
public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
requireNonNull(callable, "callable");
requireNonNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
RunnableScheduledFuture<V> task = newScheduledTaskFor(
callable, deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0);
return schedule(task);
return taskScheduler().schedule(callable, delay, unit);
}

@Override
public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}

RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null),
deadlineNanos(ticker().nanoTime(), unit.toNanos(initialDelay)), unit.toNanos(period));
return schedule(task);
return taskScheduler().scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}

RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null),
deadlineNanos(ticker().nanoTime(), unit.toNanos(initialDelay)), -unit.toNanos(delay));
return schedule(task);
return taskScheduler().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

/**
* Add the {@link RunnableScheduledFuture} for execution.
* Schedule the {@link RunnableScheduledFuture} for execution.
*/
protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) {
if (inEventLoop()) {
add0(task);
} else {
execute(() -> add0(task));
}
return task;
return taskScheduler().schedule(task);
}

private <V> void add0(RunnableScheduledFuture<V> task) {
final RunnableScheduledFutureNode<V> node;
if (task instanceof RunnableScheduledFutureNode) {
node = (RunnableScheduledFutureNode<V>) task;
} else {
node = new DefaultRunnableScheduledFutureNode<>(task);
}
scheduledTaskQueue().add(node);
}

final void removeScheduled(final RunnableScheduledFutureNode<?> task) {
final void removeScheduled(final RunnableScheduledFuture<?> task) {
if (inEventLoop()) {
scheduledTaskQueue().removeTyped(task);
taskScheduler().removeScheduled(task);
} else {
execute(() -> removeScheduled(task));
}
}

/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}.
* <p>
* This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a
* different {@link RunnableFuture}.
*/
protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture(
AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> task,
long deadlineNanos, long periodNanos) {
return new RunnableScheduledFutureAdapter<>(executor, promise, task, deadlineNanos, periodNanos);
}

/**
* Returns a {@code RunnableScheduledFuture} for the given values.
*/
protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(
Callable<V> callable, long deadlineNanos, long period) {
return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period);
}

interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> {
}

private static final class DefaultRunnableScheduledFutureNode<V> implements RunnableScheduledFutureNode<V> {
private final RunnableScheduledFuture<V> future;
private int queueIndex = INDEX_NOT_IN_QUEUE;

DefaultRunnableScheduledFutureNode(RunnableScheduledFuture<V> future) {
this.future = future;
}

@Override
public EventExecutor executor() {
return future.executor();
}

@Override
public long deadlineNanos() {
return future.deadlineNanos();
}

@Override
public long delayNanos() {
return future.delayNanos();
}

@Override
public long delayNanos(long currentTimeNanos) {
return future.delayNanos(currentTimeNanos);
}

@Override
public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener) {
future.addListener(listener);
return this;
}

@Override
public <C> RunnableScheduledFuture<V> addListener(
C context, FutureContextListener<? super C, ? super V> listener) {
future.addListener(context, listener);
return this;
}

@Override
public boolean isPeriodic() {
return future.isPeriodic();
}

@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
return queueIndex;
}

@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
queueIndex = i;
}

@Override
public void run() {
future.run();
}

@Override
public boolean cancel() {
return future.cancel();
}

@Override
public boolean isCancelled() {
return future.isCancelled();
}

@Override
public boolean isDone() {
return future.isDone();
}

@Override
public FutureCompletionStage<V> asStage() {
return future.asStage();
}

@Override
public int compareTo(RunnableScheduledFuture<?> o) {
return future.compareTo(o);
}

@Override
public boolean isSuccess() {
return future.isSuccess();
}

@Override
public boolean isFailed() {
return future.isFailed();
}

@Override
public boolean isCancellable() {
return future.isCancellable();
}

@Override
public Throwable cause() {
return future.cause();
}

@Override
public V getNow() {
return future.getNow();
}
}
}
Loading