Scheduler 是整个 kube-scheduler
的一个 structure,提供了 kube-scheduler
运行所需的组件。
type Scheduler struct {
// Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加
Cache internalcache.Cache
// Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略
Extenders []framework.Extender
// NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du
NextPod func() *framework.QueuedPodInfo
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.QueuedPodInfo, error)
// SchedulePod 尝试将给出的pod调度到Node。
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// 关闭scheduler的信号
StopEverything <-chan struct{}
// SchedulingQueue保存要调度的Pod
SchedulingQueue internalqueue.SchedulingQueue
// Profiles中是多个调度框架
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
}
作为实际执行的两个核心,SchedulingQueue
,与 scheduleOne
将会分析到这两个
在知道 kube-scheduler
初始化过程后,需要对 kube-scheduler
的整个 structure 和 workflow 进行分析
在 Run 中,运行的是 一个 SchedulingQueue
与 一个 scheduleOne
,从结构上看是属于 Scheduler
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
}
SchedulingQueue 是一个队列的抽象,用于存储等待调度的Pod。该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式。
type SchedulingQueue interface {
framework.PodNominator
Add(pod *v1.Pod) error
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
// The passed-in pods are originally compiled from plugins that want to activate Pods,
// by injecting the pods through a reserved CycleState struct (PodsToActivate).
Activate(pods map[string]*v1.Pod)
// 将不可调度的Pod重入到队列中
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
// SchedulingCycle returns the current number of scheduling cycle which is
// cached by scheduling queue. Normally, incrementing this number whenever
// a pod is popped (e.g. called Pop()) is enough.
SchedulingCycle() int64
// Pop会弹出一个pod,并从head优先级队列中删除
Pop() (*framework.QueuedPodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// Run starts the goroutines managing the queue.
Run()
}
而 PriorityQueue 是 SchedulingQueue
的实现,该部分的核心构成是两个子队列与一个数据结构,即 activeQ
、backoffQ
和 unschedulablePods
activeQ
:是一个 heap 类型的优先级队列,是 sheduler 从中获得优先级最高的Pod进行调度backoffQ
:也是一个 heap 类型的优先级队列,存放的是不可调度的PodunschedulablePods
:保存确定不可被调度的Pod
type SchedulingQueue interface {
framework.PodNominator
Add(pod *v1.Pod) error
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
// The passed-in pods are originally compiled from plugins that want to activate Pods,
// by injecting the pods through a reserved CycleState struct (PodsToActivate).
Activate(pods map[string]*v1.Pod)
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
// The podSchedulingCycle represents the current scheduling cycle number which can be
// returned by calling SchedulingCycle().
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
// SchedulingCycle returns the current number of scheduling cycle which is
// cached by scheduling queue. Normally, incrementing this number whenever
// a pod is popped (e.g. called Pop()) is enough.
SchedulingCycle() int64
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*framework.QueuedPodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// Run starts the goroutines managing the queue.
Run()
}
在New scheduler 时可以看到会初始化这个queue
podQueue := internalqueue.NewSchedulingQueue(
// 实现pod对比的一个函数即less
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
而 NewSchedulingQueue 则是初始化这个 PriorityQueue
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, informerFactory, opts...)
}
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
for _, opt := range opts {
opt(&options)
}
// 这个就是 less函数,作为打分的一部分
comp := func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
return lessFn(pInfo1, pInfo2)
}
if options.podNominator == nil {
options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
}
pq := &PriorityQueue{
PodNominator: options.podNominator,
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
return pq
}
了解了Queue的结构,就需要知道 入队列与出队列是在哪里操作的。在初始化时,需要注册一个 addEventHandlerFuncs
这个时候,会注入三个动作函数,也就是controller中的概念;而在AddFunc中可以看到会入队列。
注入是对 Pod 的informer注入的,注入的函数 addPodToSchedulingQueue 就是入栈
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulingQueue.Add(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
而这个 SchedulingQueue
的实现就是 PriorityQueue
,而Add中则对 activeQ进行的操作
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
// 格式化入栈数据,包含podinfo,里会包含v1.Pod
// 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称
pInfo := p.newQueuedPodInfo(pod)
// 入栈
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
return err
}
if p.unschedulablePods.get(pod) != nil {
klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
p.unschedulablePods.delete(pod)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.cond.Broadcast()
return nil
}
在上面看 scheduler 结构时,可以看到有一个 nextPod的,nextPod就是从队列中弹出一个pod,这个在scheduler 时会传入 MakeNextPodFunc 就是这个 nextpod
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
return func() *framework.QueuedPodInfo {
podInfo, err := queue.Pop()
if err == nil {
klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
for plugin := range podInfo.UnschedulablePlugins {
metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
}
return podInfo
}
klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
return nil
}
}
而这个 queue.Pop()
对应的就是 PriorityQueue
的 Pop() ,在这里会将作为 activeQ 的消费端
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pInfo := obj.(*framework.QueuedPodInfo)
pInfo.Attempts++
p.schedulingCycle++
return pInfo, nil
}
在上面入口部分也看到了,scheduleOne 和 scheduler,scheduleOne 就是去消费一个Pod,他会调用 NextPod,NextPod就是在初始化传入的 MakeNextPodFunc
,至此回到对应的 Pop来做消费。
schedulerOne是为一个Pod做调度的流程。
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.ErrorS(err, "Error occurred")
return
}
if sched.skipPodSchedule(fwk, pod) {
return
}
...
调度上下文
当了解了scheduler结构后,下面分析下调度上下文的过程。看看扩展点是怎么工作的。这个时候又需要提到官网的调度上下文的图。
调度框架 [2]调度框架 (scheduling framework
SF ) 是kubernetes为 scheduler设计的一个pluggable的架构。SF 将scheduler设计为 Plugin 式的 API,API将上一章中提到的一些列调度策略实现为 Plugin
。
在 SF 中,定义了一些扩展点 (extension points
EP ),而被实现为Plugin的调度程序将被注册在一个或多个 EP 中,换句话来说,在这些 EP 的执行过程中如果注册在多个 EP 中,将会在多个 EP 被调用。
每次调度都分为两个阶段,调度周期(Scheduling Cycel
)与绑定周期(Binding Cycle
)。
- SC 表示为,为Pod选择一个节点;SC 是串行运行的。
- BC 表示为,将 SC 决策结果应用于集群中;BC 可以同时运行。
调度周期与绑定周期结合一起,被称为调度上下文 (Scheduling Context
),下图则是调度上下文的工作流
扩展点 [3]注:如果决策结果为Pod的调度结果无可用节点,或存在内部错误,则中止 SC 或 BC。Pod将重入队列重试
扩展点(Extension points
)是指在调度上下文中的每个可扩展API,通过图提现为[图1]。其中 Filter
相当于 Predicate
而 Scoring
相当于 Priority
。
对于调度阶段会通过以下扩展点:
-
Sort
:该插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。 -
preFilter
:该插件用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度 -
filter
:该插件相当于调度上下文中的Predicates
,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。 -
postFilter
:当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter
插件将 Pod 标记为schedulable,则不会调用其余插件。即filter
成功后不会进行这步骤 -
preScore
:可用于进行预Score工作(通知性的扩展点)。 -
score
:该插件为每个通过filter
阶段的Node提供打分服务。然后Scheduler将选择具有最高加权分数总和的Node。 -
reserve
:因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发UnReserve
回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到postBind
,要么失败触发UnReserve
。 -
permit
:该插件可以阻止或延迟 Pod 的绑定,一般情况下这步骤会做三件事:appove
:调度器继续绑定过程Deny
:如果任何一个Premit拒绝了Pod与节点的绑定,那么将触发UnReserve
,并重入队列Wait
: 如果 Permit 插件返回Wait
,该 Pod 将保留在内部Wait
Pod 列表中,直到被Appove
。如果发生超时,wait
变为deny
,将Pod放回至调度队列中,并触发Unreserve
回滚 。
-
preBind
:该插件用于在 bind Pod 之前执行所需的前置工作。如,preBind
可能会提供一个网络卷并将其挂载到目标节点上。如果在该步骤中的任意插件返回错误,则Pod 将被deny
并放置到调度队列中。 -
bind
:在所有的preBind
完成后,该插件将用于将Pod绑定到Node,并按顺序调用绑定该步骤的插件。如果有一个插件处理了这个事件,那么则忽略其余所有插件。 -
postBind
:该插件在绑定 Pod 后调用,可用于清理相关资源(通知性的扩展点)。 -
multiPoint
:这是一个仅配置字段,允许同时为所有适用的扩展点启用或禁用插件。
而 scheduler 对于调度上下文在代码中的实现就是 scheduleOne
,下面就是看这个调度上下文
Sort
插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。
在进入 scheduleOne
后,NextPod
从 activeQ
中队列中得到一个Pod,然后的 frameworkForPod
会做打分的动作就是调度上下文的第一个扩展点 sort
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
fwk, err := sched.frameworkForPod(pod)
...
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
// 获取指定的profile
fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
if !ok {
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
return fwk, nil
}
回顾,因为在New scheduler时会初始化这个 sort 函数
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
preFilter
preFilter作为第一个扩展点,是用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.ErrorS(err, "Error occurred")
return
}
if sched.skipPodSchedule(fwk, pod) {
return
}
klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 这里将进入prefilter
scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
schedulePod 尝试将给定的 pod 调度到节点列表中的节点之一。如果成功,它将返回节点的名称。
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
// 用于将cache更新为当前内容
if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
// 找到一个合适的pod时,会执行扩展点
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
...
findNodesThatFitPod 会执行对应的过滤插件来找到最适合的Node,包括备注,以及方法名都可以看到,这里运行的插件