datetime:2023/03/22 17:06
author:nzb

线程同步案例

互斥锁实现数据库连接池

  • 服务端
/*
* 编译指令:g++ -g -Wno-write-strings -o serverdb serverdb.cpp -I/oracle/home/rdbms/public -L/oracle/home/lib -L. -lclntsh /freecplus/db/oracle/_ooci.cpp /freecplus/_freecplus.cpp -lpthread -lm -lc
*/
#include "/freecplus/_freecplus.h"
#include "/freecplus/db/oracle/_ooci.h"

pthread_mutex_t mutexs[100]; // 用于数据库连接池的锁。
connection conns[100]; // 数据库连接池。
bool initconns(); // 初始化数据库连接池。
connection *getconn(); // 从连接池中获取一个数据库连接。
void freeconn(connection *in_conn); // 释放数据库连接。
void freeconns(); // 释放数据库连接池。

void *pthmain(void *arg);

CTcpServer TcpServer; // 创建服务端对象。

vector<long> vpthid;  // 存放线程id的容器。

void mainexit(int sig); // 信号2和15的处理函数。

// 线程清理函数。
void pthmainexit(void *arg);

CLogFile logfile;

int main(int argc, char *argv[]) {

    signal(2, mainexit);
    signal(15, mainexit); // 捕获信号2和15

    logfile.Open("/tmp/serverdb.log", "a+");
    if (TcpServer.InitServer(5858) == false) // 初始化TcpServer的通信端口。
    {
        logfile.Write("TcpServer.InitServer(5858) failed.\n");
        return -1;
    }
    if (initconns() == false) // 初始化数据库连接池。
    {
        logfile.Write("initconns() failed.\n");
        return -1;
    }

    while (true) {
        if (TcpServer.Accept() == false) // 等待客户端连接。
        {
            logfile.Write("TcpServer.Accept() failed.\n");
            return -1;
        }
        logfile.Write("客户端(%s)已连接。\n", TcpServer.GetIP());
        pthread_t pthid;
        if (pthread_create(&pthid, NULL, pthmain, (void *) (long) TcpServer.m_connfd) != 0) {
            logfile.Write("pthread_create failed.\n");
            return -1;
        }
        vpthid.push_back(pthid); // 把线程id保存到vpthid容器中。
    }
    return 0;
}

void *pthmain(void *arg) {
    pthread_cleanup_push(pthmainexit, arg); // 设置线程清理函数。
    pthread_detach(pthread_self()); // 分离线程。
    pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL); // 设置取消方式为立即取消。
    int sockfd = (int) (long) arg; // 与客户端的socket连接。
    int ibuflen = 0;
    char strbuffer[1024]; // 存放数据的缓冲区。
    while (true) {
        memset(strbuffer, 0, sizeof(strbuffer));
        if (TcpRead(sockfd, strbuffer, &ibuflen, 300) == false) break; // 接收客户端发过来的请求报文。
        logfile.Write("接收:%s\n", strbuffer);
        connection *conn = getconn(); // 获取一个数据库连接。
        // 处理业务
        sleep(2);
        freeconn(conn); // 释放一个数据库连接。
        strcat(strbuffer, "ok");   // 在客户端的报文后加上"ok"。
        logfile.Write("发送:%s\n", strbuffer);
        if (TcpWrite(sockfd, strbuffer) == false) break;  // 向客户端回应报文。
    }

    logfile.Write("客户端已断开。\n");  // 程序直接退出,析构函数会释放资源。
    pthread_cleanup_pop(1);
    pthread_exit(0);
}

// 信号2和15的处理函数。
void mainexit(int sig) {
    logfile.Write("mainexit begin.\n");
    // 关闭监听的socket。
    TcpServer.CloseListen();
    // 取消全部的线程。
    for (int ii = 0; ii < vpthid.size(); ii++) {
        logfile.Write("cancel %ld\n", vpthid[ii]);
        pthread_cancel(vpthid[ii]);
    }
    // 释放数据库连接池。
    freeconns();
    logfile.Write("mainexit end.\n");
    exit(0);
}

// 线程清理函数。
void pthmainexit(void *arg) {
    logfile.Write("pthmainexit begin.\n");
    // 关闭与客户端的socket。
    close((int) (long) arg);
    // 从vpthid中删除本线程的id。
    for (int ii = 0; ii < vpthid.size(); ii++) {
        if (vpthid[ii] == pthread_self()) {
            vpthid.erase(vpthid.begin() + ii);
        }
    }
    logfile.Write("pthmainexit end.\n");
}

// 初始化数据库连接池。
bool initconns() {
    for (int ii = 0; ii < 10; ii++) {
        if (conns[ii].connecttodb("scott/tiger", "Simplified Chinese_China.ZHS16GBK") != 0) {
            logfile.Write("connect database failed.\n%s\n", conns[ii].m_cda.message);
            return false;
        }
    }
    for (int ii = 0; ii < 10; ii++) pthread_mutex_init(&mutexs[ii], NULL);
    return true;
}

// 从连接池中获取一个数据库连接。
connection *getconn() {
    for (int ii = 0; ii < 10; ii++) {
        if (pthread_mutex_trylock(&mutexs[ii]) == 0) {
            logfile.Write("get a conn[%d] ok.\n", ii);
            return &conns[ii];
        }
    }
    return NULL;
}

// 释放数据库连接。
void freeconn(connection *in_conn) {
    for (int ii = 0; ii < 10; ii++) {
        if (in_conn == &conns[ii]) pthread_mutex_unlock(&mutexs[ii]);
    }
}

// 释放数据库连接池。
void freeconns() {
    for (int ii = 0; ii < 10; ii++) {
        conns[ii].disconnect();
        pthread_mutex_destroy(&mutexs[ii]);
    }
}
  • 客户端
#include "../_freecplus.h"


int main(int argc, char *argv[]) {
    printf("pid=%d\n", getpid());
    CTcpClient TcpClient; // 创建客户端的对象。
    if (TcpClient.ConnectToServer("172.21.0.3", 5858) == false) // 向服务端发起连接请求。
    {
        printf("TcpClient.ConnectToServer(\"172.21.0.3\",5858) failed.\n");
        return -1;
    }
    char strbuffer[1024];  // 存放数据的缓冲区。
    for (int ii = 0; ii < 50; ii++) // 利用循环,与服务端进行5次交互。
    {
        memset(strbuffer, 0, sizeof(strbuffer));
        snprintf(strbuffer, 50, "(%d)这是第%d个超级女生,编号%03d。", getpid(), ii + 1, ii + 1);
        printf("发送:%s\n", strbuffer);
        if (TcpClient.Write(strbuffer) == false) break;  // 向服务端发送请求报文。
        memset(strbuffer, 0, sizeof(strbuffer));
        if (TcpClient.Read(strbuffer, 20) == false) break; // 接收服务端的回应报文。
        printf("接收:%s\n", strbuffer);
        sleep(5);
    }
    // 程序直接退出,析构函数会释放资源。
}

用互斥锁和条件变量实现高速缓存

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>

using namespace std;

int mesgid = 1; // 消息的记数器。
// 缓存消息的结构体。
struct st_message {
    int mesgid;
    char message[1024];
} stmesg;

vector<struct st_message> vcache; // 用vector容器做缓存。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;   // 声名并初始化条件变量。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 声名并初始化互斥锁。

// 消费者、出队线程主函数。
void *outcache(void *arg) {
    struct st_message stmesg;
    while (true) {
        pthread_mutex_lock(&mutex); // 加锁。
        // 如果缓存为空,等待。 // 条件变量虚假唤醒。
        while (vcache.size() == 0) {
            pthread_cond_wait(&cond, &mutex);
        }
        // 从缓存中获取第一条记录,然后删除该记录。
        memcpy(&stmesg, &vcache[0], sizeof(struct st_message)); // 内存拷贝,再加速,使用链表
        vcache.erase(vcache.begin());
        pthread_mutex_unlock(&mutex); // 解锁。
        // 以下是处理业务的代码。
        printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
        usleep(100);
    }
}


// 生产者、把生产的数据存入缓存。
void incache(int sig) {
    struct st_message stmesg;
    memset(&stmesg, 0, sizeof(struct st_message));
    pthread_mutex_lock(&mutex); // 加锁。
    // 生产数据,放入缓存。
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg); // 内存拷贝,再加速,使用链表

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    pthread_mutex_unlock(&mutex); // 解锁。

    pthread_cond_broadcast(&cond); // 触发条件,激活全部的线程。
}


int main() {
    signal(15, incache); // 接收15的信号,调用生产者函数。
    pthread_t thid1, thid2, thid3;
    pthread_create(&thid1, NULL, outcache, NULL);
    pthread_create(&thid2, NULL, outcache, NULL);
    pthread_create(&thid3, NULL, outcache, NULL);
    pthread_join(thid1, NULL);
    pthread_join(thid2, NULL);
    pthread_join(thid3, NULL);
    return 0;
}

用互斥锁和信号量实现高速缓存

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
#include <semaphore.h>

using namespace std;
int mesgid = 1; // 消息的记数器。

// 缓存消息的结构体。
struct st_message {
    int mesgid;
    char message[1024];
} stmesg;

vector<struct st_message> vcache; // 用vector容器做缓存。
sem_t sem; // 声明信号量。
pthread_mutex_t mutex; // 声名并初始化互斥锁。

// 消费者、出队线程主函数。
void *outcache(void *arg) {
    struct st_message stmesg;
    while (true) {
        while (vcache.size() == 0) {
            sem_wait(&sem); // 如果缓存中没有数据,等待信号。
            printf("%ld wait ok.\n", pthread_self());
        }
        pthread_mutex_lock(&mutex); // 加锁。
        if (vcache.size() == 0) // 判断缓存中是否有数据。
        {
            pthread_mutex_unlock(&mutex);
            continue; // 解锁,continue。
        }
        // 从缓存中获取第一条记录,然后删除该记录。
        memcpy(&stmesg, &vcache[0], sizeof(struct st_message));
        vcache.erase(vcache.begin());
        pthread_mutex_unlock(&mutex); // 解锁。
        // 以下是处理业务的代码。
        printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
        usleep(100);
    }
}

// 生产者、把生产的数据存入缓存。
void incache(int sig) {
    struct st_message stmesg;
    memset(&stmesg, 0, sizeof(struct st_message));
    pthread_mutex_lock(&mutex); // 加锁。
    // 生产数据,放入缓存。
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);

    pthread_mutex_unlock(&mutex); // 解锁。
    sem_post(&sem); // 信号加1。
    sem_post(&sem); // 信号加1。
    sem_post(&sem); // 信号加1。
    sem_post(&sem); // 信号加1。
    sem_post(&sem); // 信号加1。
    sem_post(&sem); // 信号加1。
}

int main() {
    signal(15, incache); // 接收15的信号,调用生产者函数。
    sem_init(&sem, 0, 0); // 初始化信号量。
    pthread_mutex_init(&mutex, NULL); // 初始化互斥锁。

    pthread_t thid1, thid2, thid3;
    pthread_create(&thid1, NULL, outcache, NULL);
    pthread_create(&thid2, NULL, outcache, NULL);
    pthread_create(&thid3, NULL, outcache, NULL);
    pthread_join(thid1, NULL);
    pthread_join(thid2, NULL);
    pthread_join(thid3, NULL);
    return 0;
}

results matching ""

    No results matching ""