当前位置:网站首页>基于信号量与环形队列实现读写异步缓存队列

基于信号量与环形队列实现读写异步缓存队列

2022-08-09 09:41:00 我要出家当道士

目录

一、需求 

二、技术思路

三、示例代码


一、需求 

         实现一个读写异步的缓存队列,主要实现功能如下:

        1、缓存队列作为临界资源,要是线程安全的,不会出现读线程与写线程同时操作缓存队列的情况发生。

        2、当缓存队列被塞满以后,写线程阻塞等待读线程读取数据。

        3、当缓存队列空时,读线程需要阻塞等待写线程写入数据。

        4、可指定缓存队列的长度,缓存队列中,存放 byte 类型的数据。

二、技术思路

        基于信号量与环形队列实现。

        参考《现代操作系统》,使用了三个信号量:一个称为full,用来记录充满的缓冲槽数目;一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的数目,mutex初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区 。

        环形队列是为了能够重复利用队列中的空间。当读数据或写数据抵达队列边界时,能够跳到开头衔接上。由于已经使用信号量做队列长度的约束,所以在使用环形队列时可以省不少事,只需要实现首尾跳转即可,无需关心队头与队尾位置的问题。

三、示例代码

        仅作参考,未贴头文件。

queue.c 

#include "queue.h"

typedef struct shared_queue
{
    sem_t queue_full;
    sem_t queue_empty;
    sem_t queue_lock;

    item *queue;
    int queue_len;
    // ring queue
    int tail;
    int font;

}sharq;

int queue_init(sharq *que, int len)
{
    if (que == NULL)
    {
        printf("[error] : the pointer of queue is NULL\n");
        return ERROR;
    }
    que->queue = (item *)malloc(sizeof(item) * len);
    if (que->queue == NULL)
    {
        printf("[error] : queue malloc failed\n");
        return ERROR;
    }

    que->queue_len = len;
    que->tail = 0;
    que->font = 0;

    sem_init(&que->queue_full, 0, 0);
    sem_init(&que->queue_empty, 0, len);
    sem_init(&que->queue_lock, 0, 1);

    return OK; 
}

int queue_write(sharq *que, byte *input, int len_in)
{
    sem_wait(&que->queue_empty);
    sem_wait(&que->queue_lock);

    // insert
    item *it = que->queue + que->tail;
    it->buff = (byte *)malloc(len_in);
    if (it->buff == NULL)
    {
        printf("[error] : item malloc failed\n");
        return ERROR;
    }
    memcpy(it->buff, input, len_in);
    it->buff_len = len_in;

    // ring queue 
    que->tail++;
    que->tail %= que->queue_len;

    sem_post(&que->queue_lock);
    sem_post(&que->queue_full);
    return OK;
}

int queue_read(sharq *que, byte *output, int len_out)
{
    sem_wait(&que->queue_full);
    sem_wait(&que->queue_lock);

    // delete
    item *it = que->queue + que->font;
    if (len_out < it->buff_len)
    {
        printf("[error] : length of output_buff is lower than item_buff\n");
        return ERROR;
    }
    memcpy(output, it->buff, it->buff_len); 

    // ring queue
    que->font++;
    que->font %= que->queue_len;

    sem_post(&que->queue_lock);
    sem_post(&que->queue_empty);
    return OK;
}

int queue_destroy(sharq *que)
{
    if (que == NULL)
    {
        printf("[error] : can not destroy NULL queue\n");
        return ERROR;
    }
    sem_destroy(&que->queue_full);
    sem_destroy(&que->queue_empty);
    sem_destroy(&que->queue_lock);

    if (que->queue != NULL)
    {
        free(que->queue);
    }
    return OK;
}

main.c 

#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include "./queue/queue.h"

#define TEST_SUM 2000

void *producer(void *args)
{
    sharq *queue = (sharq *)args;
    // test data
    char strs[50] = "this is a test  ";
    int len = strlen(strs);
    for (int i = 0; i < TEST_SUM; i++)
    {
        strs[len - 1] = '0' + i % 10;
        int res = queue_write(queue, (byte *)strs, len);
        if (res == ERROR)
        {
            printf("[ERROR] : write error\n");
        }
    }
    printf("write Over(%d)!\n", TEST_SUM);
}

void *consumer(void *args)
{
    sharq *queue = (sharq *)args;
    byte *read_buff = (byte *)malloc(BLOCK);
    int count = 0;
    while(1)
    {
        bzero(read_buff, BLOCK);
        int res = queue_read(queue, read_buff, BLOCK);
        if (res == ERROR)
        {
            printf("[ERROR] : writer failed\n");
            break;
        }
        //printf("str : %s\n", (char *)read_buff);
        count++;
        if (count == TEST_SUM)
        {
            break;
        }
    }
    printf("read  Over(%d)!\n", TEST_SUM);
    free(read_buff);
}

int main(void)
{
    sharq queue;
    queue_init(&queue, 10); 

    pthread_t pro_t, con_t;

    pthread_create(&pro_t, NULL, producer, &queue);
    pthread_create(&con_t, NULL, consumer, &queue);

    pthread_join(pro_t, NULL);
    pthread_join(con_t, NULL);

    return 0;
}

四、数据填充

         此处无需再看,只是为了满足CSDN发文助手的文章质量检测。

        以下内容由“废话生成器”生成。

        鲁巴金曾经说过,读书是在别人思想的帮助下,建立起自己的思想。这不禁令我深思。 在这种困难的抉择下,本人思来想去,寝食难安。 我们都知道,只要有意义,那么就必须慎重考虑。 我们都知道,只要有意义,那么就必须慎重考虑。 我认为, 塞涅卡在不经意间这样说过,真正的人生,只有在经过艰难卓绝的斗争之后才能实现。我希望诸位也能好好地体会这句话。 总结的来说, 罗曼·罗兰在不经意间这样说过,只有把抱怨环境的心情,化为上进的力量,才是成功的保证。这启发了我, 维龙在不经意间这样说过,要成功不需要什么特别的才能,只要把你能做的小事做得好就行了。这不禁令我深思。 既然如何, 在这种困难的抉择下,本人思来想去,寝食难安。 检测垃圾的发生,到底需要如何做到,不检测垃圾的发生,又会如何产生。 马云曾经说过,最大的挑战和突破在于用人,而用人最大的突破在于信任人。这句话语虽然很短,但令我浮想联翩。

原网站

版权声明
本文为[我要出家当道士]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_37437983/article/details/126212626