datetime:2023/03/22 17:06
author:nzb
线程同步案例
互斥锁实现数据库连接池
#include "/freecplus/_freecplus.h"
#include "/freecplus/db/oracle/_ooci.h"
pthread_mutex_t mutexs[100];
connection conns[100];
bool initconns();
connection *getconn();
void freeconn(connection *in_conn);
void freeconns();
void *pthmain(void *arg);
CTcpServer TcpServer;
vector<long> vpthid;
void mainexit(int sig);
void pthmainexit(void *arg);
CLogFile logfile;
int main(int argc, char *argv[]) {
signal(2, mainexit);
signal(15, mainexit);
logfile.Open("/tmp/serverdb.log", "a+");
if (TcpServer.InitServer(5858) == false)
{
logfile.Write("TcpServer.InitServer(5858) failed.\n");
return -1;
}
if (initconns() == false)
{
logfile.Write("initconns() failed.\n");
return -1;
}
while (true) {
if (TcpServer.Accept() == false)
{
logfile.Write("TcpServer.Accept() failed.\n");
return -1;
}
logfile.Write("客户端(%s)已连接。\n", TcpServer.GetIP());
pthread_t pthid;
if (pthread_create(&pthid, NULL, pthmain, (void *) (long) TcpServer.m_connfd) != 0) {
logfile.Write("pthread_create failed.\n");
return -1;
}
vpthid.push_back(pthid);
}
return 0;
}
void *pthmain(void *arg) {
pthread_cleanup_push(pthmainexit, arg);
pthread_detach(pthread_self());
pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL);
int sockfd = (int) (long) arg;
int ibuflen = 0;
char strbuffer[1024];
while (true) {
memset(strbuffer, 0, sizeof(strbuffer));
if (TcpRead(sockfd, strbuffer, &ibuflen, 300) == false) break;
logfile.Write("接收:%s\n", strbuffer);
connection *conn = getconn();
sleep(2);
freeconn(conn);
strcat(strbuffer, "ok");
logfile.Write("发送:%s\n", strbuffer);
if (TcpWrite(sockfd, strbuffer) == false) break;
}
logfile.Write("客户端已断开。\n");
pthread_cleanup_pop(1);
pthread_exit(0);
}
void mainexit(int sig) {
logfile.Write("mainexit begin.\n");
TcpServer.CloseListen();
for (int ii = 0; ii < vpthid.size(); ii++) {
logfile.Write("cancel %ld\n", vpthid[ii]);
pthread_cancel(vpthid[ii]);
}
freeconns();
logfile.Write("mainexit end.\n");
exit(0);
}
void pthmainexit(void *arg) {
logfile.Write("pthmainexit begin.\n");
close((int) (long) arg);
for (int ii = 0; ii < vpthid.size(); ii++) {
if (vpthid[ii] == pthread_self()) {
vpthid.erase(vpthid.begin() + ii);
}
}
logfile.Write("pthmainexit end.\n");
}
bool initconns() {
for (int ii = 0; ii < 10; ii++) {
if (conns[ii].connecttodb("scott/tiger", "Simplified Chinese_China.ZHS16GBK") != 0) {
logfile.Write("connect database failed.\n%s\n", conns[ii].m_cda.message);
return false;
}
}
for (int ii = 0; ii < 10; ii++) pthread_mutex_init(&mutexs[ii], NULL);
return true;
}
connection *getconn() {
for (int ii = 0; ii < 10; ii++) {
if (pthread_mutex_trylock(&mutexs[ii]) == 0) {
logfile.Write("get a conn[%d] ok.\n", ii);
return &conns[ii];
}
}
return NULL;
}
void freeconn(connection *in_conn) {
for (int ii = 0; ii < 10; ii++) {
if (in_conn == &conns[ii]) pthread_mutex_unlock(&mutexs[ii]);
}
}
void freeconns() {
for (int ii = 0; ii < 10; ii++) {
conns[ii].disconnect();
pthread_mutex_destroy(&mutexs[ii]);
}
}
#include "../_freecplus.h"
int main(int argc, char *argv[]) {
printf("pid=%d\n", getpid());
CTcpClient TcpClient;
if (TcpClient.ConnectToServer("172.21.0.3", 5858) == false)
{
printf("TcpClient.ConnectToServer(\"172.21.0.3\",5858) failed.\n");
return -1;
}
char strbuffer[1024];
for (int ii = 0; ii < 50; ii++)
{
memset(strbuffer, 0, sizeof(strbuffer));
snprintf(strbuffer, 50, "(%d)这是第%d个超级女生,编号%03d。", getpid(), ii + 1, ii + 1);
printf("发送:%s\n", strbuffer);
if (TcpClient.Write(strbuffer) == false) break;
memset(strbuffer, 0, sizeof(strbuffer));
if (TcpClient.Read(strbuffer, 20) == false) break;
printf("接收:%s\n", strbuffer);
sleep(5);
}
}
用互斥锁和条件变量实现高速缓存
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
using namespace std;
int mesgid = 1;
struct st_message {
int mesgid;
char message[1024];
} stmesg;
vector<struct st_message> vcache;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *outcache(void *arg) {
struct st_message stmesg;
while (true) {
pthread_mutex_lock(&mutex);
while (vcache.size() == 0) {
pthread_cond_wait(&cond, &mutex);
}
memcpy(&stmesg, &vcache[0], sizeof(struct st_message));
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex);
printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
usleep(100);
}
}
void incache(int sig) {
struct st_message stmesg;
memset(&stmesg, 0, sizeof(struct st_message));
pthread_mutex_lock(&mutex);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cond);
}
int main() {
signal(15, incache);
pthread_t thid1, thid2, thid3;
pthread_create(&thid1, NULL, outcache, NULL);
pthread_create(&thid2, NULL, outcache, NULL);
pthread_create(&thid3, NULL, outcache, NULL);
pthread_join(thid1, NULL);
pthread_join(thid2, NULL);
pthread_join(thid3, NULL);
return 0;
}
用互斥锁和信号量实现高速缓存
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
#include <semaphore.h>
using namespace std;
int mesgid = 1;
struct st_message {
int mesgid;
char message[1024];
} stmesg;
vector<struct st_message> vcache;
sem_t sem;
pthread_mutex_t mutex;
void *outcache(void *arg) {
struct st_message stmesg;
while (true) {
while (vcache.size() == 0) {
sem_wait(&sem);
printf("%ld wait ok.\n", pthread_self());
}
pthread_mutex_lock(&mutex);
if (vcache.size() == 0)
{
pthread_mutex_unlock(&mutex);
continue;
}
memcpy(&stmesg, &vcache[0], sizeof(struct st_message));
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex);
printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
usleep(100);
}
}
void incache(int sig) {
struct st_message stmesg;
memset(&stmesg, 0, sizeof(struct st_message));
pthread_mutex_lock(&mutex);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
pthread_mutex_unlock(&mutex);
sem_post(&sem);
sem_post(&sem);
sem_post(&sem);
sem_post(&sem);
sem_post(&sem);
sem_post(&sem);
}
int main() {
signal(15, incache);
sem_init(&sem, 0, 0);
pthread_mutex_init(&mutex, NULL);
pthread_t thid1, thid2, thid3;
pthread_create(&thid1, NULL, outcache, NULL);
pthread_create(&thid2, NULL, outcache, NULL);
pthread_create(&thid3, NULL, outcache, NULL);
pthread_join(thid1, NULL);
pthread_join(thid2, NULL);
pthread_join(thid3, NULL);
return 0;
}