datetime:2023/03/22 17:06
author:nzb
线程同步案例
互斥锁实现数据库连接池
#include "/freecplus/_freecplus.h"
#include "/freecplus/db/oracle/_ooci.h"
pthread_mutex_t mutexs[100]; 
connection conns[100]; 
bool initconns(); 
connection *getconn(); 
void freeconn(connection *in_conn); 
void freeconns(); 
void *pthmain(void *arg);
CTcpServer TcpServer; 
vector<long> vpthid;  
void mainexit(int sig); 
void pthmainexit(void *arg);
CLogFile logfile;
int main(int argc, char *argv[]) {
    signal(2, mainexit);
    signal(15, mainexit); 
    logfile.Open("/tmp/serverdb.log", "a+");
    if (TcpServer.InitServer(5858) == false) 
    {
        logfile.Write("TcpServer.InitServer(5858) failed.\n");
        return -1;
    }
    if (initconns() == false) 
    {
        logfile.Write("initconns() failed.\n");
        return -1;
    }
    while (true) {
        if (TcpServer.Accept() == false) 
        {
            logfile.Write("TcpServer.Accept() failed.\n");
            return -1;
        }
        logfile.Write("客户端(%s)已连接。\n", TcpServer.GetIP());
        pthread_t pthid;
        if (pthread_create(&pthid, NULL, pthmain, (void *) (long) TcpServer.m_connfd) != 0) {
            logfile.Write("pthread_create failed.\n");
            return -1;
        }
        vpthid.push_back(pthid); 
    }
    return 0;
}
void *pthmain(void *arg) {
    pthread_cleanup_push(pthmainexit, arg); 
    pthread_detach(pthread_self()); 
    pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL); 
    int sockfd = (int) (long) arg; 
    int ibuflen = 0;
    char strbuffer[1024]; 
    while (true) {
        memset(strbuffer, 0, sizeof(strbuffer));
        if (TcpRead(sockfd, strbuffer, &ibuflen, 300) == false) break; 
        logfile.Write("接收:%s\n", strbuffer);
        connection *conn = getconn(); 
        
        sleep(2);
        freeconn(conn); 
        strcat(strbuffer, "ok");   
        logfile.Write("发送:%s\n", strbuffer);
        if (TcpWrite(sockfd, strbuffer) == false) break;  
    }
    logfile.Write("客户端已断开。\n");  
    pthread_cleanup_pop(1);
    pthread_exit(0);
}
void mainexit(int sig) {
    logfile.Write("mainexit begin.\n");
    
    TcpServer.CloseListen();
    
    for (int ii = 0; ii < vpthid.size(); ii++) {
        logfile.Write("cancel %ld\n", vpthid[ii]);
        pthread_cancel(vpthid[ii]);
    }
    
    freeconns();
    logfile.Write("mainexit end.\n");
    exit(0);
}
void pthmainexit(void *arg) {
    logfile.Write("pthmainexit begin.\n");
    
    close((int) (long) arg);
    
    for (int ii = 0; ii < vpthid.size(); ii++) {
        if (vpthid[ii] == pthread_self()) {
            vpthid.erase(vpthid.begin() + ii);
        }
    }
    logfile.Write("pthmainexit end.\n");
}
bool initconns() {
    for (int ii = 0; ii < 10; ii++) {
        if (conns[ii].connecttodb("scott/tiger", "Simplified Chinese_China.ZHS16GBK") != 0) {
            logfile.Write("connect database failed.\n%s\n", conns[ii].m_cda.message);
            return false;
        }
    }
    for (int ii = 0; ii < 10; ii++) pthread_mutex_init(&mutexs[ii], NULL);
    return true;
}
connection *getconn() {
    for (int ii = 0; ii < 10; ii++) {
        if (pthread_mutex_trylock(&mutexs[ii]) == 0) {
            logfile.Write("get a conn[%d] ok.\n", ii);
            return &conns[ii];
        }
    }
    return NULL;
}
void freeconn(connection *in_conn) {
    for (int ii = 0; ii < 10; ii++) {
        if (in_conn == &conns[ii]) pthread_mutex_unlock(&mutexs[ii]);
    }
}
void freeconns() {
    for (int ii = 0; ii < 10; ii++) {
        conns[ii].disconnect();
        pthread_mutex_destroy(&mutexs[ii]);
    }
}
#include "../_freecplus.h"
int main(int argc, char *argv[]) {
    printf("pid=%d\n", getpid());
    CTcpClient TcpClient; 
    if (TcpClient.ConnectToServer("172.21.0.3", 5858) == false) 
    {
        printf("TcpClient.ConnectToServer(\"172.21.0.3\",5858) failed.\n");
        return -1;
    }
    char strbuffer[1024];  
    for (int ii = 0; ii < 50; ii++) 
    {
        memset(strbuffer, 0, sizeof(strbuffer));
        snprintf(strbuffer, 50, "(%d)这是第%d个超级女生,编号%03d。", getpid(), ii + 1, ii + 1);
        printf("发送:%s\n", strbuffer);
        if (TcpClient.Write(strbuffer) == false) break;  
        memset(strbuffer, 0, sizeof(strbuffer));
        if (TcpClient.Read(strbuffer, 20) == false) break; 
        printf("接收:%s\n", strbuffer);
        sleep(5);
    }
    
}
用互斥锁和条件变量实现高速缓存
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
using namespace std;
int mesgid = 1; 
struct st_message {
    int mesgid;
    char message[1024];
} stmesg;
vector<struct st_message> vcache; 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;   
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
void *outcache(void *arg) {
    struct st_message stmesg;
    while (true) {
        pthread_mutex_lock(&mutex); 
        
        while (vcache.size() == 0) {
            pthread_cond_wait(&cond, &mutex);
        }
        
        memcpy(&stmesg, &vcache[0], sizeof(struct st_message)); 
        vcache.erase(vcache.begin());
        pthread_mutex_unlock(&mutex); 
        
        printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
        usleep(100);
    }
}
void incache(int sig) {
    struct st_message stmesg;
    memset(&stmesg, 0, sizeof(struct st_message));
    pthread_mutex_lock(&mutex); 
    
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg); 
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    pthread_mutex_unlock(&mutex); 
    pthread_cond_broadcast(&cond); 
}
int main() {
    signal(15, incache); 
    pthread_t thid1, thid2, thid3;
    pthread_create(&thid1, NULL, outcache, NULL);
    pthread_create(&thid2, NULL, outcache, NULL);
    pthread_create(&thid3, NULL, outcache, NULL);
    pthread_join(thid1, NULL);
    pthread_join(thid2, NULL);
    pthread_join(thid3, NULL);
    return 0;
}
用互斥锁和信号量实现高速缓存
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
#include <semaphore.h>
using namespace std;
int mesgid = 1; 
struct st_message {
    int mesgid;
    char message[1024];
} stmesg;
vector<struct st_message> vcache; 
sem_t sem; 
pthread_mutex_t mutex; 
void *outcache(void *arg) {
    struct st_message stmesg;
    while (true) {
        while (vcache.size() == 0) {
            sem_wait(&sem); 
            printf("%ld wait ok.\n", pthread_self());
        }
        pthread_mutex_lock(&mutex); 
        if (vcache.size() == 0) 
        {
            pthread_mutex_unlock(&mutex);
            continue; 
        }
        
        memcpy(&stmesg, &vcache[0], sizeof(struct st_message));
        vcache.erase(vcache.begin());
        pthread_mutex_unlock(&mutex); 
        
        printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
        usleep(100);
    }
}
void incache(int sig) {
    struct st_message stmesg;
    memset(&stmesg, 0, sizeof(struct st_message));
    pthread_mutex_lock(&mutex); 
    
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    stmesg.mesgid = mesgid++;
    vcache.push_back(stmesg);
    pthread_mutex_unlock(&mutex); 
    sem_post(&sem); 
    sem_post(&sem); 
    sem_post(&sem); 
    sem_post(&sem); 
    sem_post(&sem); 
    sem_post(&sem); 
}
int main() {
    signal(15, incache); 
    sem_init(&sem, 0, 0); 
    pthread_mutex_init(&mutex, NULL); 
    pthread_t thid1, thid2, thid3;
    pthread_create(&thid1, NULL, outcache, NULL);
    pthread_create(&thid2, NULL, outcache, NULL);
    pthread_create(&thid3, NULL, outcache, NULL);
    pthread_join(thid1, NULL);
    pthread_join(thid2, NULL);
    pthread_join(thid3, NULL);
    return 0;
}