当前位置 博文首页 > binecy:Netty源码解析 -- FastThreadLocal与HashedWheelTimer

    binecy:Netty源码解析 -- FastThreadLocal与HashedWheelTimer

    作者:binecy 时间:2021-01-17 12:02

    Netty源码分析系列文章已接近尾声,本文再来分析Netty中两个常见组件:FastThreadLoca与HashedWheelTimer。
    源码分析基于Netty 4.1.52

    FastThreadLocal

    FastThreadLocal比较简单。
    FastThreadLocal和FastThreadLocalThread是配套使用的。
    FastThreadLocalThread继承了Thread,FastThreadLocalThread#threadLocalMap 是一个InternalThreadLocalMap,该InternalThreadLocalMap对象只能用于当前线程。
    InternalThreadLocalMap#indexedVariables是一个数组,存放了当前线程所有FastThreadLocal对应的值。
    而每个FastThreadLocal都有一个index,用于定位InternalThreadLocalMap#indexedVariables。

    FastThreadLocal#get

    public final V get() {
        // #1
    	InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    	// #2
    	Object v = threadLocalMap.indexedVariable(index);
    	if (v != InternalThreadLocalMap.UNSET) {
    		return (V) v;
    	}
        // #3
    	return initialize(threadLocalMap);
    }
    

    #1 获取该线程的InternalThreadLocalMap
    如果是FastThreadLocalThread,直接获取FastThreadLocalThread#threadLocalMap。
    否则,从UnpaddedInternalThreadLocalMap.slowThreadLocalMap获取该线程InternalThreadLocalMap。
    注意,UnpaddedInternalThreadLocalMap.slowThreadLocalMap是一个ThreadLocal,这里实际回退到使用ThreadLocal了。
    #2 每个FastThreadLocal都有一个index。
    通过该index,获取InternalThreadLocalMap#indexedVariables中存放的值
    #3 找不到值,通过initialize方法构建新对象。

    可以看到,FastThreadLocal中连hash算法都不用,通过下标获取对应的值,复杂度为log(1),自然很快啦。

    HashedWheelTimer

    HashedWheelTimer是Netty提供的时间轮调度器。
    时间轮是一种充分利用线程资源进行批量化任务调度的调度模型,能够高效的管理各种延时任务。
    简单说,就是将延时任务存放到一个环形队列中,并通过执行线程定时执行该队列的任务。

    例如,
    环形队列上有60个格子,
    执行线程每秒移动一个格子,则环形队列每轮可存放1分钟内的任务。
    现在有两个定时任务
    task1,32秒后执行
    task2,2分25秒后执行
    而执行线程当前位于第6格子
    则task1放到32+6=38格,轮数为0
    task2放到25+6=31个,轮数为2
    执行线程将执行当前格子轮数为0的任务,并将其他任务轮数减1。

    缺点,时间轮调度器的时间精度不高。
    因为时间轮算法的精度取决于执行线程移动速度。
    例如上面例子中执行线程每秒移动一个格子,则调度精度小于一秒的任务就无法准时调用。

    HashedWheelTimer关键字段

    // 任务执行器,负责执行任务
    Worker worker = new Worker();
    // 任务执行线程
    Thread workerThread;
    //  HashedWheelTimer状态, 0 - init, 1 - started, 2 - shut down
    int workerState;
    // 时间轮队列,使用数组实现
    HashedWheelBucket[] wheel;
    // 暂存新增的任务
    Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    // 已取消任务
    Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
    

    添加延迟任务 HashedWheelTimer#newTimeout

    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ...
    
        // #1
        start();
    
        // #2
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    
        ...
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
    

    #1 如果HashedWheelTimer未启动,则启动该HashedWheelTimer
    HashedWheelTimer#start方法负责是启动workerThread线程
    #2 startTime是HashedWheelTimer启动时间
    deadline是相对HashedWheelTimer启动的延迟时间
    构建HashedWheelTimeout,添加到HashedWheelTimer#timeouts

    时间轮运行 Worker#run

    public void run() {
    	...
    
    	// #1
    	startTimeInitialized.countDown();
    
    	do {
    		// #2
    		final long deadline = waitForNextTick();
    		if (deadline > 0) {
    			// #3
    			int idx = (int) (tick & mask);
    			processCancelledTasks();
    			HashedWheelBucket bucket = wheel[idx];
    			// #4
    			transferTimeoutsToBuckets();
    			// #5
    			bucket.expireTimeouts(deadline);
    			// #6
    			tick++;
    		}
    	} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    
        // #7
    	...
    }
    

    #1 HashedWheelTimer#start方法阻塞HashedWheelTimer线程直到Worker启动完成,这里解除HashedWheelTimer线程阻塞。
    #2 计算下一格子开始执行的时间,然后sleep到下次格子开始执行时间
    #2 tick是从HashedWheelTimer启动后移动的总格子数,这里获取tick对应的格子索引。
    由于Long类型足够大,这里并不考虑溢出问题。
    #4 将HashedWheelTimer#timeouts的任务迁移到对应的格子中
    #5 处理已到期任务
    #6 移动到下一个格子
    #7 这里是HashedWheelTimer#stop后的逻辑处理,取消任务,停止时间轮

    迁移任务 Worker#transferTimeoutsToBuckets

    private void transferTimeoutsToBuckets() {
    	// #1
    	for (int i = 0; i < 100000; i++) {
    		HashedWheelTimeout timeout = timeouts.poll();
    		if (timeout == null) {
    			// all processed
    			break;
    		}
    		if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
    			continue;
    		}
    		// #2
    		long calculated = timeout.deadline / tickDuration;
    		// #3
    		timeout.remainingRounds = (calculated - tick) / wheel.length;
    		// #4
    		final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
    		// #5
    		int stopIndex = (int) (ticks & mask);
    
    		HashedWheelBucket bucket = wheel[stopIndex];
    		bucket.addTimeout(timeout);
    	}
    }
    

    #1 注意,每次只迁移100000个任务,以免阻塞线程
    #2 任务延迟时间/每格时间数, 得到该任务需延迟的总格子移动数
    #3 (总格子移动数 - 已移动格子数)/每轮格子数,得到轮数
    #4 如果任务在timeouts队列放得太久导致已经过了执行时间,则使用当前tick, 也就是放到当前bucket,以便尽快执行该任务
    #5 计算tick对应格子索引,放到对应的格子位置

    执行到期任务 HashedWheelBucket#expireTimeouts

    public void expireTimeouts(long deadline) {
    	HashedWheelTimeout timeout = head;
    
    	while (timeout != null) {
    		HashedWheelTimeout next = timeout.next;
    		// #1
    		if (timeout.remainingRounds <= 0) {
    			// #2
    			next = remove(timeout);
    			if (timeout.deadline <= deadline) {
    				// #3
    				timeout.expire();
    			} else {
    				throw new IllegalStateException(String.format(
    						"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
    			}
    		} else if (timeout.isCancelled()) {
    			next = remove(timeout);
    		} else {
    			// #4
    			timeout.remainingRounds --;
    		}
    		timeout = next;
    	}
    }
    

    #1 选择轮数小于等于0的任务
    #2 移除任务
    #3 修改状态为过期,并执行任务
    #4 其他任务轮数减1

    ScheduledExecutorService使用堆(DelayedWorkQueue)维护任务,新增任务复杂度为O(logN)。
    而 HashedWheelTimer 新增任务复杂度为O(1),所以在任务非常多时, HashedWheelTimer 可以表现出它的优势。
    但是任务较少甚至没有任务时,HashedWheelTimer的执行线程都需要不断移动,也会造成性能消耗。
    注意,HashedWheelTimer使用同一个线程调用和执行任务,如果某些任务执行时间过久,则影响后续定时任务执行。当然,我们也可以考虑在任务中另起线程执行逻辑。
    另外,如果任务过多,也会导致任务长期滞留在HashedWheelTimer#timeouts中而不能及时执行。

    如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!

    下一篇:没有了