package org.example.threadpool;


import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class TestPool {
    public static void main(String[] args) {
        /**
         *         // 1. 死等 take
         *         // 2.带超时的等待 offer
         *         // 3. 放弃任务执行
         *         // 4.抛出异常
         *         // 5. 让调用者自己执行任务
         */
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> {
            // 死等
            queue.put(task);
        });
        ThreadPool threadPool2 = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> {
            // 有超时
            queue.offer(task, 1000, TimeUnit.MILLISECONDS);
        });
        ThreadPool threadPool3 = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> {
            // 让调用者放弃执行
            log.debug("放弃{}", task);
        });

        ThreadPool threadPool4 = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> {
            throw new RuntimeException("任务执行失败:" + task);
            // 后面的任务就不执行了
        });
        ThreadPool threadPool5 = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> {
            // 调用者自己执行任务
            task.run();
        });
        for (int i = 0; i < 15; i++) {
            int finalI = i;
            threadPool5.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                log.debug("{}", finalI);
            });
        }
    }
}

@Slf4j
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    //线程集合
    private HashSet<Worker> workers = new HashSet();

    //核心线程数
    private int coreSize;

    // 获取任务超时时间
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    // 执行任务
    public void execute(Runnable task) {
        //线程数还没有超过核心数
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增worker对象 {} ", worker);
                workers.add(worker);
                worker.start();
                return;
            }
        }
        // 线程数超过核心数,要加入任务队列暂存
        // 1. 死等 take
        // 2.带超时的等待 offer
        // 3. 放弃任务执行
        // 4.抛出异常
        // 5. 让调用者自己执行任务
        taskQueue.tryPut(rejectPolicy, task);
    }

    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 当task不为空,执行任务
            // 当task执行完毕,接着从任务队列获取任务
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                // 有任务对象
                try {
                    log.debug("正在执行... {} ", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除: {}", workers);
                workers.remove(this);
            }

        }
    }
}

@Slf4j
class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();

    // 3. 消费者成员变量
    private Condition fullWaitSet = lock.newCondition();
    // 4. 消费者成员变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5.容量上限
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 6. 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 7. 阻塞添加
    public void put(T element) {
        lock.lock();
        try {
            while (capacity == queue.size()) {
                try {
                    log.debug("等待加入任务队列... {} ", element);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            log.debug("任务数超过核心数,加入任务队列暂存{}", element);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    //带超时时间的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (capacity == queue.size()) {
                try {
                    log.debug("等待加入任务队列... {} ", task);
                    if (nanos <= 0) {
                        log.debug("加入超时,丢弃{}", task);
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            log.debug("任务数超过核心数,加入任务队列暂存{}", task);
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否已满
            if (queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else {
                // 有空闲
                log.debug("任务数超过核心数,加入任务队列暂存{}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            // 将timeout 统一转换为纳秒
            long nanos = timeUnit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回的是剩余的时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 获取队列大小
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

你的这段代码是一个 轻量级的自定义线程池实现,它模仿了 Java 标准库中的 ThreadPoolExecutor 的行为,但更加直观和教育性强,尤其适合学习线程池机制。


🔧 一、整体结构说明

该线程池由以下核心组件构成:

组件 作用
ThreadPool 线程池核心类,负责任务调度、线程管理
BlockingQueue<T> 自定义的阻塞队列,用于存放等待执行的任务
Worker 工作线程,负责从队列中取任务并执行
RejectPolicy<T> 拒绝策略接口,当任务无法提交时,决定该怎么办

🧱 二、线程池工作流程

提交任务(ThreadPool.execute(Runnable)

  1. 如果当前工作线程数量 < coreSize

    • 创建新线程,执行任务。
  2. 否则

    • 将任务加入阻塞队列 taskQueue
    • 如果队列已满,则根据设置的 RejectPolicy 决定如何处理任务。

任务执行(Worker.run()

  • 每个 Worker 执行逻辑如下:

    while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null)
    

    表示:

    • 如果手头有任务,先执行。
    • 如果没有,就尝试从任务队列中拉取一个任务,等待一段时间(超时返回 null)。
    • 若取不到新任务,说明线程空闲太久,Worker线程退出并从线程池中移除

阻塞队列 BlockingQueue<T>

实现了如下特性:

方法 说明
put() 如果队列满,阻塞等待
offer(timeout) 如果队列满,阻塞一段时间后返回 false
take() 如果队列空,阻塞等待任务
poll(timeout) 如果队列空,阻塞一段时间后返回 null
tryPut() 根据 RejectPolicy 选择添加任务或执行拒绝策略

✅ 三、自定义拒绝策略(RejectPolicy

支持 5 种拒绝策略,用户可按需配置:

  1. 死等(阻塞等待)

    queue.put(task);
    
  2. 带超时等待

    queue.offer(task, 1000, TimeUnit.MILLISECONDS);
    
  3. 放弃任务执行(丢弃任务)

    log.debug("放弃{}", task);
    
  4. 抛出异常

    throw new RuntimeException("任务执行失败:" + task);
    
  5. 让调用者线程自己执行任务

    task.run();
    

➡️ 这一设计很灵活,也模仿了 Java 的 AbortPolicyDiscardPolicyCallerRunsPolicy 等行为。


🌟 四、功能亮点与优点

特性 说明
核心线程数控制 coreSize 控制最大并发线程数量
任务队列容量控制 BlockingQueue 控制任务排队量
阻塞等待+超时支持 put/offertake/poll 支持阻塞和超时机制
线程空闲自动销毁 poll(timeout) 超时不取到任务,线程自动退出
拒绝策略高度自定义 可传入任意函数逻辑实现任务拒绝处理
并发安全 使用了 ReentrantLock + Condition 精准控制并发访问队列
日志输出直观 log.debug() 方便调试线程创建、任务处理、阻塞等过程

🧠 五、可扩展点建议

改进方向 建议说明
✅ 线程命名 Worker线程可设置名字方便排查
✅ 任务执行耗时统计 加一层 wrapper 记录任务开始/结束时间
✅ 可配置最大线程数 当前只支持 coreSize,无最大线程机制
✅ 支持定时任务或延迟任务 实现定时器或优先队列处理可调度任务
✅ 支持关闭线程池 添加 shutdown() 方法,让线程池优雅退出
✅ 支持动态扩容/收缩线程 动态调整 coreSize 是更高级的能力

📦 总结

这套自定义线程池:

  • 完整模拟了线程池的基本机制
  • 支持可插拔式拒绝策略
  • 适合教学与理解线程池核心原理

尤其适合初学者深入理解线程池中的:

  • 核心线程 vs 任务队列;
  • 阻塞 vs 非阻塞等待;
  • 拒绝策略设计;
  • 并发队列管理与 Condition 的使用。