在本系列的第五节中(如果您愿意,可以从头开始),我们将工具 / 缓存包的所有结构部分放在一起。 但是,我意识到我犯了一个错误,并没有涵盖 sharedProcessor 和 processorListener 结构! 在继续研究包的行为方面之前,我会在这里做。

我们先来看看 processorListener: 首先,让我们同意 processorListener 是程序中的糟糕名称。同意不? OK,好的; 让我们继续。

processsorListener 是 tools/cache 项目中的实现构造,它缓冲一组通知并将它们分发到 ResourceEventHandler(在 part 3 中介绍)。

如果添加通知,最终将在单独的线程上调用 ResourceEventHandler 的 OnAdd,OnUpdate 或 OnDelete 函数。 它的结构代码非常简单:

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}

    handler ResourceEventHandler
    // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
    // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
    // added until we OOM.
    // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
    // we should try to do something better.
    pendingNotifications buffer.RingGrowing
    // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
    requestedResyncPeriod time.Duration
    // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
    // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
    // informer's overall resync check period.
    resyncPeriod time.Duration
    // nextResync is the earliest time the listener should get a full resync
    nextResync time.Time
    // resyncLock guards access to resyncPeriod and nextResync
    resyncLock sync.Mutex

processorListener 有一个不会退出的运行函数,可以从其 nextCh Go 通道(基本上是同步阻塞队列)中提取通知,并将它们转发到它的 ResourceEventHandler:

func (p *processorListener) run() {
    defer utilruntime.HandleCrash()

    for next := range p.nextCh {
        switch notification := next.(type) {
        case updateNotification:
            p.handler.OnUpdate(notification.oldObj, notification.newObj)
        case addNotification:
        case deleteNotification:
            utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))

那么通知如何在 nextCh 中获得的? processorListener 有一个永不停止的 pop 函数(有点令人惊讶)。

代码对我来说根本不直观,但是如果你眯着眼睛,你可以看到它基本上是从 pendingNotifications 环缓冲区中拉出 items 并将它们放在 nextCh Go 通道上:

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched

因此必须启动 run 和 pop 函数。 该工作属于 sharedProcessor。 sharedProcessor 非常简单

type sharedProcessor struct {
    listenersLock    sync.RWMutex
    listeners        []*processorListener
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group

它也有不会退出的 Run 功能。 它首先要做的是在不同的线程上启动 processorListeners 的 run 和 pop 函数。 然后它阻塞并等待信号关闭:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    p.wg.Wait() // Wait for all .pop() and .run() to stop

好的,谁告诉 sharedProcessor 的 run 方法做什么事情? sharedIndexInformer 的 run 方法。 在那里,你会发现这一行

wg.StartWithChannel(processorStopCh, s.processor.run)

这会在新线程中生成 sharedIndexInformer 的 sharedProcessor 的 run 函数(如果信号给 processorStopCh 通道发送,那么它将停止)。

退一步,为什么所有这些线程? 为什么这么复杂?


rocessorListener 实际上是一个线程的内容,它可能会被一个行为不当的 ResourceEventListener 阻塞一段时间,该资源受最终用户的控制。 所以你希望它的 “出队” 行为在它自己的线程上,这样一个表现不好的 ResourceEventListener 不会意外地导致整个 Pinball 停止工作,而 Kubernetes 继续以疯狂的速度发送事件。

sharedProcessor 实际上是一种将一堆 processorListeners 捆绑在一起,除了管理其线程问题之外,还可以在给他们发送单个通知

例如,在 Java 中,我们有能够中断内置线程的内容,我们可能会将这两个问题混合在一起。 这个东西的更好名称可能更像是 EventDistributor。

如前所述,sharedIndexInformer 有其自己的线程问题,以免减慢 Kubernetes 事件的接收速度。 现在我们已经将 processorListener 和 sharedProcessor 类型添加到组合中,让我们修改我们的整体结构图以包含它们: