RT调度类的调度策略是:保证TopN(N为系统cpu个数)优先级的任务可以优先获得cpu资源。除了在时通过基于cpu优先级的选核策略保证这一点外,还有其它流程,我们姑且将这部分流程称作RT调度器的负载均衡(与CFS调度类的负载均衡有很大的不同)。这篇笔记分析了RT调度类负载均衡相关代码的实现,代码使用的是5.10。
除了任务选核流程,RT调度类的负载均衡通过下面两个操作保证RT调度类的调度策略:
在分析pull和push的实现细节之前,需要先搞清楚几个概念。
当RT任务加入或者离开rt_rq时,会分别调用下面的函数对rt_rq上的任务个数统计字段进行更新:
struct rt_rq {
...
#ifdef CONFIG_SMP
// 队列中可迁移到其它cpu运行的RT任务个数
unsigned long rt_nr_migratory;
// 队列中RT任务的个数
unsigned long rt_nr_total;
#endif
}
// 任务加入rt_rq时累加任务个数统计字段
static void inc_rt_migration(struct sched_rt_entity *rt_se, struct rt_rq *rt_rq)
{
struct task_struct *p;
if (!rt_entity_is_task(rt_se)) // 任务更新,group不更新
return;
p = rt_task_of(rt_se);
rt_rq = &rq_of_rt_rq(rt_rq)->rt;
rt_rq->rt_nr_total++; // 累加RT任务个数
// 任务可以在多个核上运行时,累计可迁移RT任务个数
if (p->nr_cpus_allowed > 1)
rt_rq->rt_nr_migratory++;
update_rt_migration(rt_rq);
}
// 任务离开rt_rq时递减任务个数统计字段
static void dec_rt_migration(struct sched_rt_entity *rt_se, struct rt_rq *rt_rq)
{
struct task_struct *p;
if (!rt_entity_is_task(rt_se))
return;
p = rt_task_of(rt_se);
rt_rq = &rq_of_rt_rq(rt_rq)->rt;
rt_rq->rt_nr_total--;
if (p->nr_cpus_allowed > 1)
rt_rq->rt_nr_migratory--;
update_rt_migration(rt_rq);
}
任务个数发生变化后,调用update_rt_migration()函数更新rt_rq的过载状态。可以看出,只要rt_rq上的任务同时满足下面两个条件,就任务该cpu是RT过载的:
所以,只要cpu上有超过一个RT任务在等待运行,就任务该cpu是RT过载的。
struct rt_rq {
...
#ifdef CONFIG_SMP
int overloaded;
#endif
}
static void update_rt_migration(struct rt_rq *rt_rq)
{
if (rt_rq->rt_nr_migratory && rt_rq->rt_nr_total > 1) {
if (!rt_rq->overloaded) {
// 设置系统过载状态
rt_set_overload(rq_of_rt_rq(rt_rq));
rt_rq->overloaded = 1;
}
} else if (rt_rq->overloaded) {
rt_clear_overload(rq_of_rt_rq(rt_rq));
rt_rq->overloaded = 0;
}
}
将所有cpu的RT过载状态聚合到一起,组成了系统的RT过载状态,维护在root_domain中,并在cpu的RT过载状态发生变化时更新系统的RT过载状态。
struct root_domain {
...
atomic_t rto_count; // rto_mask的weight
cpumask_var_t rto_mask; // RT过载的cpu掩码
}
static inline void rt_set_overload(struct rq *rq)
{
if (!rq->online)
return;
cpumask_set_cpu(rq->cpu, rq->rd->rto_mask);
smp_wmb();
atomic_inc(&rq->rd->rto_count);
}
static inline void rt_clear_overload(struct rq *rq)
{
if (!rq->online)
return;
/* the order here really doesn't matter */
atomic_dec(&rq->rd->rto_count);
cpumask_clear_cpu(rq->cpu, rq->rd->rto_mask);
}
所以,只要有一个cpu是RT过载的,就任务系统是RT过载的。
static inline int rt_overloaded(struct rq *rq)
{
return atomic_read(&rq->rd->rto_count);
}
每个rt_rq都维护了一个Pushable链表,该链表组织了rt_rq中所有可迁移到其它cpu运行的任务。
struct rt_rq {
...
#if defined CONFIG_SMP || defined CONFIG_RT_GROUP_SCHED
struct {
int curr; /* highest queued rt task prio */
#ifdef CONFIG_SMP
// Pushable链表中最高优先级,也是rt_rq中次高优先级
int next; /* next highest */
#endif
} highest_prio;
#ifdef CONFIG_SMP
// Pushable链表,该链表按照p->prio排序的
struct plist_head pushable_tasks;
#endif
}
struct task_struct {
...
#ifdef CONFIG_SMP
struct plist_node pushable_tasks;
#endif
}
只要任务可以在多个cpu上运行(亲核性设置超过一个cpu),在任务入队列时就会将任务加入rt_rq的Pushable链表中;相反的,在任务离开队列时也会将其从Pushable链表中移除。
// 任务入队列
static void
enqueue_task_rt(struct rq *rq, struct task_struct *p, int flags)
{
...
// 正在运行的任务和只能在该cpu上运行的任务不会被加入到Pushable链表中
if (!task_current(rq, p) && p->nr_cpus_allowed > 1)
enqueue_pushable_task(rq, p);
}
static void enqueue_pushable_task(struct rq *rq, struct task_struct *p)
{
// 将任务p加入Pushable链表中
plist_del(&p->pushable_tasks, &rq->rt.pushable_tasks);
plist_node_init(&p->pushable_tasks, p->prio);
plist_add(&p->pushable_tasks, &rq->rt.pushable_tasks);
/* Update the highest prio pushable task */
if (p->prio < rq->rt.highest_prio.next)
rq->rt.highest_prio.next = p->prio;
}
// 任务出队列
static void dequeue_task_rt(struct rq *rq, struct task_struct *p, int flags)
{
...
dequeue_pushable_task(rq, p);
}
static void dequeue_pushable_task(struct rq *rq, struct task_struct *p)
{
// 将任务从Pushable链表中移除
plist_del(&p->pushable_tasks, &rq->rt.pushable_tasks);
/* Update the new highest prio pushable task */
if (has_pushable_tasks(rq)) {
p = plist_first_entry(&rq->rt.pushable_tasks,
struct task_struct, pushable_tasks);
rq->rt.highest_prio.next = p->prio;
} else
rq->rt.highest_prio.next = MAX_RT_PRIO;
}
在Pull流程中,会调用pick_highest_pushable_task()函数找到rt_rq上次高优先级的任务(一般正在运行的任务的优先级是最高的)。
// 检查是否可以pick任务p到cpu上运行
static int pick_rt_task(struct rq *rq, struct task_struct *p, int cpu)
{
// 1. 避免找到正在运行的任务;
// 2. 避免找到不能在cpu上运行的任务
if (!task_running(rq, p) &&
cpumask_test_cpu(cpu, p->cpus_ptr))
return 1;
return 0;
}
// 从rt_rq上找到能够迁移到cpu上运行的次高优先级任务
static struct task_struct *pick_highest_pushable_task(struct rq *rq, int cpu)
{
struct plist_head *head = &rq->rt.pushable_tasks;
struct task_struct *p;
// rt_rq上没有可迁移的任务
if (!has_pushable_tasks(rq))
return NULL;
// 按照优先级由高到低的顺序遍历rt_rq的Pushable链表,
// 找到第一个可迁移到cpu上的任务一定是优先级最高的任务
plist_for_each_entry(p, head, pushable_tasks) {
if (pick_rt_task(rq, p, cpu))
return p;
}
return NULL;
}
在push流程中,调用pick_next_pushable_task()函数找到rt_rq上次高优先级的任务。
static struct task_struct *pick_next_pushable_task(struct rq *rq)
{
struct task_struct *p;
if (!has_pushable_tasks(rq))
return NULL;
// Pushable链表中优先级最高的任务
p = plist_first_entry(&rq->rt.pushable_tasks,
struct task_struct, pushable_tasks);
BUG_ON(rq->cpu != task_cpu(p));
BUG_ON(task_current(rq, p));
BUG_ON(p->nr_cpus_allowed <= 1);
BUG_ON(!task_on_rq_queued(p));
BUG_ON(!rt_task(p));
return p;
}
下面先来讨论pull的时机,然后再看pull流程的实现。
想象一下,需要pull一定是当前cpu上正在运行的任务的状态发生了变化,导致TopN优先级范围内的任务可能产生了变化,因此,cpu要主动检查其它cpu上是否有处于TopN优先级范围的任务要运行,如果有要pull过来调度其运行。发生如下3种事件可能引起上述变化:
static inline bool need_pull_rt_task(struct rq *rq, struct task_struct *prev)
{
/* Try to pull RT tasks here if we lower this rq's prio */
return rq->rt.highest_prio.curr > prev->prio;
}
static int balance_rt(struct rq *rq, struct task_struct *p, struct rq_flags *rf)
{
// 1. 休眠时任务p会在该回调之前从cpu运行队列中移除;
// 2. 任务p的优先级高于cpu运行队列中任务的最高优先级,这通常都是满足的,
// 因为RT调度器总是会优先调度优先级最高的任务
if (!on_rt_rq(&p->rt) && need_pull_rt_task(rq, p)) {
rq_unpin_lock(rq, rf);
pull_rt_task(rq); // 执行pull
rq_repin_lock(rq, rf);
}
// 返回非0会停止balance过程
return sched_stop_runnable(rq) || sched_dl_runnable(rq) || sched_rt_runnable(rq);
}
// 任务p的调度策略由RT调度策略->非RT调度策略时执行该回调
static void switched_from_rt(struct rq *rq, struct task_struct *p)
{
// 1. 任务不是正在运行的任务,不需要主动pull。
// 2. 队列中还有其它RT任务,那么随后的调度流程会重新选择一个任务,这里不需要主动pull
if (!task_on_rq_queued(p) || rq->rt.rt_nr_running)
return;
// 任务p是队列中最后一个RT任务,需要立刻执行pull
rt_queue_pull_task(rq);
}
// 为了防止阻塞原有流程,pull流程推迟到callback中执行
static DEFINE_PER_CPU(struct callback_head, rt_pull_head);
static inline void rt_queue_pull_task(struct rq *rq)
{
queue_balance_callback(rq, &per_cpu(rt_pull_head, rq->cpu), pull_rt_task);
}
static void
prio_changed_rt(struct rq *rq, struct task_struct *p, int oldprio)
{
if (!task_on_rq_queued(p)) // 任务p不处于运行状态,不需要特殊操作
return;
if (rq->curr == p) { // 任务p是正在运行的任务
#ifdef CONFIG_SMP
// 多核系统,正在运行的任务优先级变低了,需要主动执行pull
if (oldprio < p->prio)
rt_queue_pull_task(rq);
// 正在运行的任务的优先级不再是当前cpu队列上最高的话,还需要主动触发一次调度
if (p->prio > rq->rt.highest_prio.curr)
resched_curr(rq);
#else
// 单核系统,正在运行的任务优先级变低了,主动触发一次重新调度
if (oldprio < p->prio)
resched_curr(rq);
#endif /* CONFIG_SMP */
} else { // 任务p是在运行队列中等待运行的任务
// 任务p不是正在运行的任务,但是其优先级变的比正在运行的任务还高,主动触发一次调度
if (p->prio < rq->curr->prio)
resched_curr(rq);
}
}
pull任务的过程由pull_rt_task()函数完成。
static void pull_rt_task(struct rq *this_rq)
{
int this_cpu = this_rq->cpu, cpu;
bool resched = false;
struct task_struct *p;
struct rq *src_rq;
int rt_overload_count = rt_overloaded(this_rq);
// 系统中所有cpu都没有RT过载,说明其它cpu上没有要等待运行的RT任务,
// 所以也不需要主动pull
if (likely(!rt_overload_count))
return;
smp_rmb();
// 只有当前cpu是RT过载的,也不需要pull
if (rt_overload_count == 1 &&
cpumask_test_cpu(this_rq->cpu, this_rq->rd->rto_mask))
return;
#ifdef HAVE_RT_PUSH_IPI
// 如果支持push ipi特性,优先让其它cpu主动将任务push到该cpu,
// 因为pull需要锁住源和目标cpu的rq->lock,这很可能会导致竞争,
// push ipi特性可以将pull操作转换为push,提高效率
if (sched_feat(RT_PUSH_IPI)) {
tell_cpu_to_push(this_rq);
return;
}
#endif
// 遍历RT过载的cpu,尝试从这些cpu上pull任务到当前cpu执行
for_each_cpu(cpu, this_rq->rd->rto_mask) {
if (this_cpu == cpu)
continue;
src_rq = cpu_rq(cpu); // 要pull的任务的源cpu队列
// src_rq上次高优先级任务的优先级低于this_rq上最高优先级任务的优先级,
// 说明this_rq上的任务应该优先运行,不需要从该队列pull任务
if (src_rq->rt.highest_prio.next >=
this_rq->rt.highest_prio.curr)
continue;
// 锁住this_rq和src_rq
double_lock_balance(this_rq, src_rq);
// 找到src_rq上次高优先级的任务
p = pick_highest_pushable_task(src_rq, this_cpu);
// 任务p的优先级比this_rq上最高优先级的任务的优先级更高,将其pull到本cpu执行
if (p && (p->prio < this_rq->rt.highest_prio.curr)) {
WARN_ON(p == src_rq->curr);
WARN_ON(!task_on_rq_queued(p));
// 最高优先级的任务从放到运行队列到被调度运行是有一个短暂的间隔的,
// 这个条件判断就是为了防止将src_rq上最高优先级的任务pull到本cpu上,
// pull流程只拉取次高优先级的任务
if (p->prio < src_rq->curr->prio)
goto skip;
// 所有条件都满足,开始pull,pull完毕后this_rq需要重新调度一次
resched = true;
// 标准的迁核操作:
// 1. 将任务从源队列移除;2. 设置任务到目标cpu;3. 将任务放入目标cpu队列
deactivate_task(src_rq, p, 0);
set_task_cpu(p, this_cpu);
activate_task(this_rq, p, 0);
// 继续从其它cpu pull任务,这样循环结束后可以保证让系统中
// 优先级为TopN+1的任务到该cpu上运行
}
skip:
double_unlock_balance(this_rq, src_rq);
}
if (resched) // 发生了pull,当前cpu重新执行一次调度
resched_curr(this_rq);
}
同样的,先讨论push的时机,然后再看push流程的实现细节。
cpu需要主动push任务到其它cpu运行,一定是当前cpu上有新的任务要运行,但是当前cpu又无法立刻调度它,而且该任务的优先级可能在TopN范围内,所以要尝试进行push。
下面这些场景可能会出现上面描述的情况:
static void task_woken_rt(struct rq *rq, struct task_struct *p)
{
// 任务p必须同时满足下面的条件才会push:
// 1. 任务p不是正在运行的任务;
// 2. 当前cpu不会立刻进行一次调度(调度时也会触发push,见下面介绍);
// 3. 任务p还可以在其它cpu上运行;
// 4. cpu在运行dl或者rt任务;
// 5. cpu上正在运行的任务只能在该cpu上运行,或者其优先级比任务p更高;
// 这些条件都是为了说明任务p无法在该cpu上很快被调度运行,所以要尝试push
bool need_to_push = !task_running(rq, p) &&
!test_tsk_need_resched(rq->curr) &&
p->nr_cpus_allowed > 1 &&
(dl_task(rq->curr) || rt_task(rq->curr)) &&
(rq->curr->nr_cpus_allowed < 2 ||
rq->curr->prio <= p->prio);
if (need_to_push)
push_rt_tasks(rq);
}
static void switched_to_rt(struct rq *rq, struct task_struct *p)
{
// 任务正在等待cpu调度
if (task_on_rq_queued(p) && rq->curr != p) {
#ifdef CONFIG_SMP
// 任务p可以在其它cpu上运行,并且当前cpu已经RT过载,
// 尝试将任务push到其它cpu运行
if (p->nr_cpus_allowed > 1 && rq->rt.overloaded)
rt_queue_push_tasks(rq);
#endif /* CONFIG_SMP */
// 此外,任务的优先级比正在运行的任务的优先级还高,重新调度以抢占该任务
if (p->prio < rq->curr->prio && cpu_online(cpu_of(rq)))
resched_curr(rq);
}
}
// 在callback中执行push,防止长时间阻塞当前流程
static DEFINE_PER_CPU(struct callback_head, rt_push_head);
static inline void rt_queue_push_tasks(struct rq *rq)
{
if (!has_pushable_tasks(rq))
return;
queue_balance_callback(rq, &per_cpu(rt_push_head, rq->cpu), push_rt_tasks);
}
static inline int has_pushable_tasks(struct rq *rq)
{
return !plist_head_empty(&rq->rt.pushable_tasks);
}
static struct task_struct *pick_next_task_rt(struct rq *rq)
{
...
set_next_task_rt(rq, p, true); // 准备运行任务p
}
static inline void set_next_task_rt(struct rq *rq, struct task_struct *p, bool first)
{
...
if (!first)
return;
rt_queue_push_tasks(rq); // 尝试push
}
void rto_push_irq_work_func(struct irq_work *work)
{
...
if (has_pushable_tasks(rq)) {
raw_spin_lock(&rq->lock);
push_rt_tasks(rq);
raw_spin_unlock(&rq->lock);
}
}
push任务的过程由push_rt_tasks()函数完成。
static void push_rt_tasks(struct rq *rq)
{
/* push_rt_task will return true if it moved an RT */
while (push_rt_task(rq));
}
// 尝试将rq上的次高优先级任务push到其它cpu运行
static int push_rt_task(struct rq *rq)
{
struct task_struct *next_task;
struct rq *lowest_rq;
int ret = 0;
// rq没有RT过载,无需push
if (!rq->rt.overloaded)
return 0;
// 找到rq上次高优先级的任务
next_task = pick_next_pushable_task(rq);
if (!next_task)
return 0;
retry:
// 避免push正在运行的任务
if (WARN_ON(next_task == rq->curr))
return 0;
// 任务的优先级高于正在运行的任务,这种情况抢占当前cpu即可,无需push
if (unlikely(next_task->prio < rq->curr->prio)) {
resched_curr(rq);
return 0;
}
// 下面准备迁移任务,为任务寻找目标cpu
get_task_struct(next_task);
// 为任务next_task寻找目标cpu
lowest_rq = find_lock_lowest_rq(next_task, rq);
if (!lowest_rq) { // 没有找到的情况处理
struct task_struct *task;
// find_lock_lowest_rq()释放了锁,这里要重新找一个任务
task = pick_next_pushable_task(rq);
if (task == next_task) {
// 还是上一个任务,但是该任务没有合适的目标cpu,不再尝试push
goto out;
}
// 没有任务要push了,结束push流程
if (!task)
goto out;
// 其它情况重新push
put_task_struct(next_task);
next_task = task;
goto retry;
}
// 将任务迁移到目标cpu上
deactivate_task(rq, next_task, 0);
set_task_cpu(next_task, lowest_rq->cpu);
activate_task(lowest_rq, next_task, 0);
ret = 1; // 返回非0继续push
resched_curr(lowest_rq); // 目标cpu触发一次重新调度
double_unlock_balance(rq, lowest_rq);
out:
put_task_struct(next_task);
return ret;
}
上面调用find_lock_lowest_rq()函数为任务p寻找要push的cpu时,除了基于cpu优先级查找外,还有一些别的条件检查。
static struct rq *find_lock_lowest_rq(struct task_struct *task, struct rq *rq)
{
struct rq *lowest_rq = NULL;
int tries;
int cpu;
for (tries = 0; tries < RT_MAX_TRIES; tries++) { // 最多尝试3次循环
// 基于cpu优先级为task找一个最合适的cpu
cpu = find_lowest_rq(task);
// 查找失败,或者还是当前cpu最适合,结束查找过程
if ((cpu == -1) || (cpu == rq->cpu))
break;
lowest_rq = cpu_rq(cpu);
// 目标cpu上正在运行的任务有更高的优先级,push过去也没用
if (lowest_rq->rt.highest_prio.curr <= task->prio) {
lowest_rq = NULL;
break;
}
// 持锁后再次判断优先级是否满足要求
if (double_lock_balance(rq, lowest_rq)) {
if (unlikely(task_rq(task) != rq ||
!cpumask_test_cpu(lowest_rq->cpu, task->cpus_ptr) ||
task_running(rq, task) ||
!rt_task(task) ||
!task_on_rq_queued(task))) {
double_unlock_balance(rq, lowest_rq);
lowest_rq = NULL;
break;
}
}
// 目标cpu正在运行的任务优先级较低,push过去可以运行,选中它
if (lowest_rq->rt.highest_prio.curr > task->prio)
break;
// 没找到合适,重新选择
double_unlock_balance(rq, lowest_rq);
lowest_rq = NULL;
}
return lowest_rq;
}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- awee.cn 版权所有 湘ICP备2023022495号-5
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务