当前位置:网站首页>TCP协议之《TSQ控制》

TCP协议之《TSQ控制》

2022-08-10 03:26:00 程序员扫地僧

TCP Small Queues的目的是限制每个TCP连接在Qdisc和device队列中的skb数量,以达到降低RTT(Round-Trip Time)和避免bufferbloat的目的。


一、TSQ初始化
内核定义了静态的每处理器变量tsq_tasklet结构,初始化函数tcp_tasklet_init,为定义的每处理器结构变量初始化一个单独的tasklet,并且每个tasklet有其自身独立的套接口队列。

void __init tcp_tasklet_init(void)
{
    for_each_possible_cpu(i) {
        struct tsq_tasklet *tsq = &per_cpu(tsq_tasklet, i);
 
        INIT_LIST_HEAD(&tsq->head);
        tasklet_init(&tsq->tasklet, tcp_tasklet_func, (unsigned long)tsq);
    }
}
struct tsq_tasklet {
    struct tasklet_struct   tasklet;
    struct list_head    head; /* queue of tcp sockets */
};
static DEFINE_PER_CPU(struct tsq_tasklet, tsq_tasklet);

二、TSQ检查

基础的检查函数为tcp_small_queue_check,其逻辑如下。首先取以下两者之间的较大值:1)当前发送的数据报文skb结构所占用空间的两倍值(2*skb->truesize);2)以及当前大约每毫秒的流量值(其通过sk_pacing_rate计算而来,内核将sk_pacing_shift变量定义为10,将当前每秒钟的流量(sk_pacing_rate)除以2的10次方,得到大约1毫秒的流量值)。其次如果此结果值大约tcp_limit_output_bytes限定的值,使用tcp_limit_output_bytes作为限定值。最后,如果是重传报文,即factor等于1,将最终的判定值增大一倍。

如果sk_wmem_alloc大于limit说明已经发送了太多的数据在Qdisc或者设备队列中,但是如果重传队列为空,此次发送还是允许进行。否则,返回true,禁止发送操作,在此之前,设置sk_tsp_flags的标志位TSQ_THROTTLED,表明是由于TSQ检查结果导致的不能发送。另外,有可能在此函数设置TSQ_THROTTLED标志之前,发生了TX中断,进行了skb释放操作,导致了sk_wmem_alloc的递减,所以,在返回前再次判断sk_wmem_alloc是否超过限值limit。

static bool tcp_small_queue_check(struct sock *sk, const struct sk_buff *skb, unsigned int factor)
{
    limit = max(2 * skb->truesize, sk->sk_pacing_rate >> sk->sk_pacing_shift);
    limit = min_t(u32, limit, sock_net(sk)->ipv4.sysctl_tcp_limit_output_bytes);
    limit <<= factor;
 
    if (refcount_read(&sk->sk_wmem_alloc) > limit) {
        if (tcp_rtx_queue_empty(sk))
            return false;
 
        set_bit(TSQ_THROTTLED, &sk->sk_tsq_flags);
        smp_mb__after_atomic();
        if (refcount_read(&sk->sk_wmem_alloc) > limit)
            return true;
    }
    return false;
}

以上基础函数tcp_small_queue_check在TCP发送路径的tcp_write_xmit函数和tcp_xmit_retransmit_queue函数中都有调用。如下的tcp_write_xmit函数,在调用TSQ检查函数之前,清空套接口的TCP_TSQ_DEFERRED标志,表明进行了发送操作,在之后的TSQ处理函数中将跳过此套接口。如果TSQ检查未通过,将设置TSQ_THROTTLED标志。

static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle, int push_one, gfp_t gfp)
{
    struct tcp_sock *tp = tcp_sk(sk);
 
    while ((skb = tcp_send_head(sk))) {
 
        if (test_bit(TCP_TSQ_DEFERRED, &sk->sk_tsq_flags))
            clear_bit(TCP_TSQ_DEFERRED, &sk->sk_tsq_flags);
        if (tcp_small_queue_check(sk, skb, 0))
            break;
            
        if (unlikely(tcp_transmit_skb(sk, skb, 1, gfp)))
            break;
}
另外,在系统调用函数do_tcp_sendpages与tcp_sendmsg_locked函数中,如果缺少TCP缓存,发送函数将已经挂载到发送队列中的数据进行发送,由函数tcp_push负责处理。当其判断如果不需要立即发送数据包(autocork)时,需要设置TSQ_THROTTLED标志,表明TSQ功能阻塞了数据发送。如果TX中断随后发生,其将释放部分发送缓存,发送函数如tcp_sendmsg_locked得以继续进行。

static void tcp_push(struct sock *sk, int flags, int mss_now, int nonagle, int size_goal)
{
    skb = tcp_write_queue_tail(sk);
 
    if (tcp_should_autocork(sk, skb, size_goal)) {
        if (!test_bit(TSQ_THROTTLED, &sk->sk_tsq_flags)) {
            NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPAUTOCORKING);
            set_bit(TSQ_THROTTLED, &sk->sk_tsq_flags);
        }
        if (refcount_read(&sk->sk_wmem_alloc) > skb->truesize)
            return;
    }
    __tcp_push_pending_frames(sk, mss_now, nonagle);
}
此处有必要说一下tcp_should_autocork函数的判断条件:1)skb的数据长度len小于size_goal,表明数据还未填满允许的发送目标长度(参见函数tcp_xmit_size_goal,支持GSO时此目标长度可能大小MSS,反之等于MSS);2)PROC文件/proc/sys/net/ipv4/tcp_autocorking设置为1;3)当前skb不是发送队列头,表明已经有数据发送;4)并且,当前正在发送的数据的总truesize长度大于此skb的truesize,表明位于Qdisc或者设备队列中的数据不是仅有ACK报文,所以TX发送处理不会被延迟。

综上的四个条件,TX中断马上要发送,此时有机会将数据包暂时阻塞,为之后发生的sendmsg的发送数据提供与此skb合并的机会。由于TX中断马上要发生,不会造成此数据包的发送延时。

static bool tcp_should_autocork(struct sock *sk, struct sk_buff *skb, int size_goal)
{   
    return skb->len < size_goal &&
           sock_net(sk)->ipv4.sysctl_tcp_autocorking &&
           skb != tcp_write_queue_head(sk) &&
           refcount_read(&sk->sk_wmem_alloc) > skb->truesize;
}

三、TSQ阻塞时机

当数据报文skb释放时(如TX完成中断发生),检查其所属套接口的TSQ阻塞状态,如下发送缓存释放函数tcp_wfree。首先将发送缓存sk_wmem_alloc的值将其skb的truesize-1的值,保留1个计数是由于此函数之后或者tcp_tasklet_func函数将调用sk_free函数,如果sk_wmem_alloc为1,sk_free将释放套接口结构。其次如果当前进程上下文在ksoftirqd中,并且Qdisc或者设备队列中还有数据未发送,表明系统当前繁忙,不在进行TSQ拥塞处理,避免导致加重系统的繁忙状况,也可使此TCP流的确认ACK报文得以处理。

如果此套接口没有被TSQ阻塞,TSQF_THROTTLED标志未设置,或者其已经位于TSQ处理队列中,直接返回。如果在此期间套接口的阻塞状态已经缓解(见cmpxchg函数实现),直接返回。否则,将处于阻塞状态的套接口的sk_tsq_flags中的阻塞状态清除,并且将此套接口添加到当前处理器的TSQ处理队列中,并且调用处理器的takslet进行处理。

void tcp_wfree(struct sk_buff *skb)
{
    WARN_ON(refcount_sub_and_test(skb->truesize - 1, &sk->sk_wmem_alloc));
 
    if (refcount_read(&sk->sk_wmem_alloc) >= SKB_TRUESIZE(1) && this_cpu_ksoftirqd() == current)
        goto out;
 
    for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) {
        struct tsq_tasklet *tsq;
        bool empty;
 
        if (!(oval & TSQF_THROTTLED) || (oval & TSQF_QUEUED))
            goto out;
 
        nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED;
        nval = cmpxchg(&sk->sk_tsq_flags, oval, nval);
        if (nval != oval)
            continue;
 
        local_irq_save(flags);
        tsq = this_cpu_ptr(&tsq_tasklet);
        empty = list_empty(&tsq->head);
        list_add(&tp->tsq_node, &tsq->head);
        if (empty)
            tasklet_schedule(&tsq->tasklet);
        local_irq_restore(flags);
        return;
    }
out:
    sk_free(sk);
}

除以以上的tcp_wfree函数之外,在TCP的pacing功能中,pacing超时处理函数tcp_pace_kick已将处理TSQ除以阻塞状态的套接口,其逻辑与tcp_wfree中的一致,唯一区别在于pacing处理中,不会先检查套接口是否处于阻塞状态。

enum hrtimer_restart tcp_pace_kick(struct hrtimer *timer)
{
    struct tcp_sock *tp = container_of(timer, struct tcp_sock, pacing_timer);
    struct sock *sk = (struct sock *)tp;
 
    for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) {
        struct tsq_tasklet *tsq;
        bool empty;
 
        if (oval & TSQF_QUEUED)
            break;
 
        nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED;
        nval = cmpxchg(&sk->sk_tsq_flags, oval, nval);
        if (nval != oval)
            continue;
 
        if (!refcount_inc_not_zero(&sk->sk_wmem_alloc))
            break;
        /* queue this socket to tasklet queue */
        tsq = this_cpu_ptr(&tsq_tasklet);
        empty = list_empty(&tsq->head);
        list_add(&tp->tsq_node, &tsq->head);
        if (empty)
            tasklet_schedule(&tsq->tasklet);
        break;
    }
    return HRTIMER_NORESTART;
}

四、TSQ阻塞处理
以上的tasklet_schedule调用了初始化时注册的tcp_tasklet_func函数。如下,在其处理过程中,首先从TSQ队列中移除套接口,清除其TSQ_QUEUE入队标志。其次如果此套接口未设置TCP_TSQ_DEFERRED标志,表明其已经在tcp_write_xmit发送函数中得到处理,此处不再处理。否则,如果此套接口没有被用户层系统调用锁定,调用TSQ核心处理函数tcp_tsq_handler处理。

static void tcp_tasklet_func(unsigned long data)
{
    struct tsq_tasklet *tsq = (struct tsq_tasklet *)data;
 
    local_irq_save(flags);
    list_splice_init(&tsq->head, &list);
    local_irq_restore(flags);
 
    list_for_each_safe(q, n, &list) {
        tp = list_entry(q, struct tcp_sock, tsq_node);
        list_del(&tp->tsq_node);
 
        sk = (struct sock *)tp;
        smp_mb__before_atomic();
        clear_bit(TSQ_QUEUED, &sk->sk_tsq_flags);
 
        if (!sk->sk_lock.owned && test_bit(TCP_TSQ_DEFERRED, &sk->sk_tsq_flags)) {
            bh_lock_sock(sk);
            if (!sock_owned_by_user(sk)) {
                clear_bit(TCP_TSQ_DEFERRED, &sk->sk_tsq_flags);
                tcp_tsq_handler(sk);
            }
            bh_unlock_sock(sk);
        }
        sk_free(sk);
    }
}

TSQ的核心处理函数tcp_tsq_handler,负责重新调用TCP的发送或者重传函数。

static void tcp_tsq_handler(struct sock *sk)
{
    if ((1 << sk->sk_state) &
        (TCPF_ESTABLISHED | TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_CLOSE_WAIT  | TCPF_LAST_ACK)) {
        struct tcp_sock *tp = tcp_sk(sk);
 
        if (tp->lost_out > tp->retrans_out && tp->snd_cwnd > tcp_packets_in_flight(tp)) {
            tcp_mstamp_refresh(tp);
            tcp_xmit_retransmit_queue(sk);
        }
        tcp_write_xmit(sk, tcp_current_mss(sk), tp->nonagle, 0, GFP_ATOMIC);
    }
}

五、TSQ延迟处理

如果在以上的操作中,由于用户层正在执行套接口相关的系统调用,导致未能进行。将在用户层系统调用退出时,在套接口释放函数中tcp_release_cb,根据标志TCPF_TSQ_DEFERRED的判断,执行TSQ函数tcp_tsq_handler。

void tcp_release_cb(struct sock *sk)
{
    do {
        flags = sk->sk_tsq_flags;
        if (!(flags & TCP_DEFERRED_ALL))
            return;
        nflags = flags & ~TCP_DEFERRED_ALL;
    } while (cmpxchg(&sk->sk_tsq_flags, flags, nflags) != flags);
 
    if (flags & TCPF_TSQ_DEFERRED)
        tcp_tsq_handler(sk);
}

原网站

版权声明
本文为[程序员扫地僧]所创,转载请带上原文链接,感谢
https://blog.csdn.net/wuyongmao/article/details/126246846