博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ucontext实现的用户级多线程框架3(实现echo服务器)
阅读量:6453 次
发布时间:2019-06-23

本文共 12347 字,大约阅读时间需要 41 分钟。

前面一篇文章实现了一个抢先式的用户级多线程框架,现在用那个框架编写一个echo服务,

因为只是个实验,所以代码写得比较杂乱,还有很多可能出错的情况也没有处理,这些在今后的进一

步研究中都会慢慢修改,下面是代码:

 

uthread.h

/* * brief: 用ucontext实现的用户级线程框架 * author: kenny huang * date: 2009/10/13 * email: huangweilook@21cn.com */ #ifndef _UTHREAD_H #define _UTHREAD_H #include 
#include
#include
#include
#include
#include
#include "socketwrapper.h" #define MAX_UTHREAD 128 void int_signal_handler(int sig); //用户态线程的当前状态 enum thread_status { ACTIVED = 0,//可运行的 BLOCKED,//被阻塞 SLEEP,//主动休眠 DIE,//死死亡 }; typedef int (*uthread_func)(void*); class Scheduler; class u_thread; typedef struct { int index;//在Scheduler::threads中的下标 u_thread *p_uthread; ucontext_t *p_context; }uthread_id; /* * 用户态线程 */ class u_thread { friend class Scheduler; private: u_thread(unsigned int ssize,int index,uthread_id parent) :ssize(ssize),_status(BLOCKED),parent_context(parent.p_context) { stack = new char[ssize]; ucontext.uc_stack.ss_sp = stack; ucontext.uc_stack.ss_size = ssize; getcontext(&ucontext); uid.index = index; uid.p_uthread = this; uid.p_context = &ucontext; } ~u_thread() { delete []stack; } static void star_routine(int uthread,int func,int arg); public: ucontext_t* GetParentContext() { return parent_context; } ucontext_t *GetContext() { return &ucontext; } void SetStatus(thread_status _status) { this->_status = _status; } thread_status GetStatus() { return _status; } uthread_id GetUid() { return uid; } private: ucontext_t ucontext; ucontext_t *parent_context;//父亲的context char *stack;//coroutine使用的栈 unsigned int ssize;//栈的大小 thread_status _status; uthread_id uid; }; void BeginRun(); bool cSpawn(uthread_func func,void *arg,unsigned int stacksize); int cRecv(int sock,char *buf,int len); int cSend(int sock,char *buf,int len); int cListen(int sock); void cSleep(int t); class beat; /* * 任务调度器 */ class Scheduler { friend class beat; friend class u_thread; friend void BeginRun(); friend bool cSpawn(uthread_func func,void *arg,unsigned int stacksize); friend int cRecv(int sock,char *buf,int len); friend int cSend(int sock,char *buf,int len); friend int cListen(int sock); friend void cSleep(int t); public: static void scheduler_init(); static void int_sig(); private: //休眠time时间 static void sleep(int t); static void check_Network(); static void schedule(); static bool spawn(uthread_func func,void *arg,unsigned int stacksize); static int recv(int sock,char *buf,int len); static int send(int sock,char *buf,int len); static int listen(int sock); private: static std::list
activeList;//可运行uthread列表 static std::list
> sleepList;//正在睡眠uthread列表 static volatile bool block_signal; static char stack[4096]; static ucontext_t ucontext;//Scheduler的context static uthread_id uid_current;//当前正获得运行权的context static uthread_id uid_self; static u_thread *threads[MAX_UTHREAD]; static int total_count; static int epollfd; const static int maxsize = 10; }; /*心跳发射器,发射器必须运行在一个独立的线程中,以固定的间隔 * 往所有运行着coroutine的线程发送中断信号 */ class beat { public: beat(unsigned int interval):interval(interval) {} void setThread(pthread_t id) { thread_scheduler = id; } void loop() { while(true) { //每隔固定时间向所有线程发中断信号 ::usleep(1000 * interval); while(1) { if(!Scheduler::block_signal) { pthread_kill(thread_scheduler,SIGUSR1); break; } } } } private: unsigned int interval;//发送中断的间隔(豪秒) pthread_t thread_scheduler; }; bool initcoroutine(unsigned int interval); #endif

uthread.cpp

#include "uthread.h" #include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "thread.h" ucontext_t Scheduler::ucontext; char Scheduler::stack[4096]; uthread_id Scheduler::uid_current; uthread_id Scheduler::uid_self; u_thread *Scheduler::threads[MAX_UTHREAD]; int Scheduler::total_count = 0; int Scheduler::epollfd = 0; volatile bool Scheduler::block_signal = true; std::list
Scheduler::activeList; std::list
> Scheduler::sleepList; struct sock_struct { int sockfd; u_thread *uThread; }; void int_signal_handler(int sig) { Scheduler::int_sig(); } void u_thread::star_routine(int uthread,int func,int arg) { u_thread *self_uthread = (u_thread *)uthread; assert(self_uthread); self_uthread->SetStatus(ACTIVED); ucontext_t *self_context = self_uthread->GetContext(); swapcontext(self_context,self_uthread->GetParentContext()); Scheduler::block_signal = false; uthread_func _func = (uthread_func)func; void *_arg = (void*)arg; int ret = _func(_arg); self_uthread->SetStatus(DIE); } void Scheduler::scheduler_init() { for(int i = 0; i < MAX_UTHREAD; ++i) threads[i] = 0; getcontext(&ucontext); ucontext.uc_stack.ss_sp = stack; ucontext.uc_stack.ss_size = sizeof(stack); ucontext.uc_link = NULL; //scheduler占用下标0 makecontext(&ucontext,schedule, 0); uid_self.index = 0; uid_self.p_uthread = 0; uid_self.p_context = &ucontext; uid_current = uid_self; } int Scheduler::listen(int sock) { u_thread *self_thread = uid_current.p_uthread; epoll_event ev; sock_struct *ss = new sock_struct; ss->uThread = self_thread; ss->sockfd = sock; ev.data.ptr = (void*)ss; ev.events = EPOLLIN; int ret; TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev)); if(ret != 0) { return -1; } self_thread->SetStatus(BLOCKED); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); Scheduler::block_signal = false; return 1; } int Scheduler::recv(int sock,char *buf,int len) { if(!buf || !(len > 0)) return -1; u_thread *self_thread = uid_current.p_uthread; sock_struct *ss = new sock_struct; ss->uThread = self_thread; ss->sockfd = sock; epoll_event ev; ev.data.ptr = (void*)ss; ev.events = EPOLLIN; int ret; TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev)); if(ret != 0) return -1; self_thread->SetStatus(BLOCKED); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); printf("recv return/n"); Scheduler::block_signal = false; ret = read(sock,buf,len); return ret; } int Scheduler::send(int sock,char *buf,int len) { if(!buf || !(len > 0)) return -1; u_thread *self_thread = uid_current.p_uthread; sock_struct *ss = new sock_struct; ss->uThread = self_thread; ss->sockfd = sock; epoll_event ev; ev.data.ptr = (void*)ss; ev.events = EPOLLOUT; int ret; TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev)); if(ret != 0) return -1; self_thread->SetStatus(BLOCKED); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); Scheduler::block_signal = false; ret = write(sock,buf,len); return ret; } void Scheduler::check_Network() { epoll_event events[maxsize]; sock_struct *ss; int nfds = TEMP_FAILURE_RETRY(epoll_wait(epollfd,events,maxsize,0)); for( int i = 0 ; i < nfds ; ++i) { //套接口可读 if(events[i].events & EPOLLIN) { ss = (sock_struct*)events[i].data.ptr; printf("a sock can read/n"); ss->uThread->SetStatus(ACTIVED); epoll_event ev; ev.data.fd = ss->sockfd; if(0 != TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev))) { printf("error here/n"); exit(0); } delete ss; continue; } //套接口可写 if(events[i].events & EPOLLOUT) { ss = (sock_struct*)events[i].data.ptr; printf("a sock can write/n"); ss->uThread->SetStatus(ACTIVED); epoll_event ev; ev.data.fd = ss->sockfd; TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev)); delete ss; continue; } } } void Scheduler::schedule() { epollfd = TEMP_FAILURE_RETRY(epoll_create(maxsize)); if(epollfd<= 0) { printf("epoll init error/n"); return; } while(total_count > 0) { //首先执行active列表中的uthread std::list
::iterator it = activeList.begin(); std::list
::iterator end = activeList.end(); for( ; it != end; ++it) { if(*it && (*it)->GetStatus() == ACTIVED) { uid_current = (*it)->GetUid(); swapcontext(&ucontext,uid_current.p_context); uid_current = uid_self; int index = (*it)->GetUid().index; if((*it)->GetStatus() == DIE) { printf("%d die/n",index); delete threads[index]; threads[index] = 0; --total_count; activeList.erase(it); break; } else if((*it)->GetStatus() == SLEEP) { printf("%d sleep/n",index); activeList.erase(it); break; } } } //检查网络,看看是否有套接口可以操作 check_Network(); //看看Sleep列表中是否有uthread该醒来了 std::list
>::iterator its = sleepList.begin(); std::list
>::iterator ends = sleepList.end(); time_t now = time(NULL); for( ; its != ends; ++its) { //可以醒来了 if(now >= its->second) { u_thread *uthread = its->first; uthread->SetStatus(ACTIVED); activeList.push_back(uthread); sleepList.erase(its); break; } } } printf("scheduler end/n"); } bool Scheduler::spawn(uthread_func func,void *arg,unsigned int stacksize) { printf("uthread_create/n"); if(total_count >= MAX_UTHREAD) return false; int i = 1; for( ; i < MAX_UTHREAD; ++i) { if(threads[i] == 0) { threads[i] = new u_thread(stacksize,i,uid_current); ++total_count; ucontext_t *cur_context = threads[i]->GetContext(); activeList.push_back(threads[i]); cur_context->uc_link = &ucontext; makecontext(cur_context,(void (*)())u_thread::star_routine, 3,(int)&(*threads[i]),(int)func,(int)arg); swapcontext(uid_current.p_context, cur_context); printf("return from parent/n"); return true; } } return false; } void Scheduler::sleep(int t) { u_thread *self_thread = uid_current.p_uthread; time_t now = time(NULL); now += t; //插入到sleep列表中 sleepList.push_back(std::make_pair(self_thread,now)); //保存当前上下文切换回scheduler self_thread->SetStatus(SLEEP); ucontext_t *cur_context = self_thread->GetContext(); Scheduler::block_signal = true; swapcontext(cur_context,&Scheduler::ucontext); Scheduler::block_signal = false; } void Scheduler::int_sig() { //printf("Scheduler::int_sig()%x/n",uid_current.p_context); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); Scheduler::block_signal = false; } class HeartBeat : public runnable { public: HeartBeat(unsigned int interval) { _beat = new beat(interval); _beat->setThread(pthread_self()); } ~HeartBeat() { delete _beat; } bool run() { _beat->loop(); return true; } private: beat *_beat; }; bool initcoroutine(unsigned int interval) { //初始化信号 struct sigaction sigusr1; sigusr1.sa_flags = 0; sigusr1.sa_handler = int_signal_handler; sigemptyset(&sigusr1.sa_mask); int status = sigaction(SIGUSR1,&sigusr1,NULL); if(status == -1) { printf("error sigaction/n"); return false; } //首先初始化调度器 Scheduler::scheduler_init(); //启动心跳 static HeartBeat hb(interval); static Thread c(&hb); c.start(); return true; } void BeginRun() { Scheduler::schedule(); } bool cSpawn(uthread_func func,void *arg,unsigned int stacksize) { return Scheduler::spawn(func,arg,stacksize); } int cRecv(int sock,char *buf,int len) { return Scheduler::recv(sock,buf,len); } int cSend(int sock,char *buf,int len) { return Scheduler::send(sock,buf,len); } int cListen(int sock) { return Scheduler::listen(sock); } void cSleep(int t) { return Scheduler::sleep(t); }

echoserver.c

// kcoroutine.cpp : 定义控制台应用程序的入口点。 // #include "uthread.h" #include "thread.h" int port; int test(void *arg) {
char *name = (char*)arg; unsigned long c = 0; while(1) {
if(c % 10000 == 0) {
printf("%d/n",c); cSleep(1); } ++c; } } int echo(void *arg) {
int sock = *(int*)arg; while(1) {
char buf[1024]; int ret = cRecv(sock,buf,1024); if(ret > 0) {
printf("%s/n",buf); ret = cSend(sock,buf,ret); } } } int listener(void *arg) {
struct sockaddr_in servaddr; int listenfd; if(0 > (listenfd = Tcp_Listen("127.0.0.1",port,servaddr,5))) {
printf("listen error/n"); return 0; } while(1) {
if(cListen(listenfd) > 0) {
printf("a user comming/n"); struct sockaddr_in cliaddr; socklen_t len; int sock = Accept(listenfd,(struct sockaddr*)NULL,NULL); if(sock >= 0) {
cSpawn(echo,&sock,4096); } } } } int main(int argc, char **argv) {
port = atoi(argv[1]); if(!initcoroutine(20)) return 0; cSpawn(listener,0,4096); char name[10] = "test"; cSpawn(test,name,4096); printf("create finish/n"); //开始调度线程的运行 BeginRun(); return 0; }

运行后会看到控制台中不断的输出1,那是runable_test在工作,

telnet几个客户端上去,就可以看到echo的效果了,总体来看效果还不错,

不过context的切换效率还没有测试过.

转载于:https://www.cnblogs.com/sniperHW/archive/2012/04/02/2429644.html

你可能感兴趣的文章
第二周总结
查看>>
ASP.NET完整打包卸载更新攻略(By Installshield 2010)
查看>>
[120_移动开发Android]006_android开发之数据存储之sdcard访问
查看>>
[若有所悟]IT小兵总结IT人特点及挽留IT人才的九大策略
查看>>
概率图模型建模、学习、推理资料总结
查看>>
【转】知道这20个正则表达式,能让你少写1,000行代码
查看>>
自定义 启动和关闭 oracle 的命令
查看>>
用ASP.NET Core 2.0 建立规范的 REST API
查看>>
SQLite数据库、ListView控件的使用
查看>>
Storm程序的并发机制(重点掌握)
查看>>
Quartz
查看>>
正则表达式介绍
查看>>
初识Scala反射
查看>>
第三十九天
查看>>
Redis详解
查看>>
4Sum——LeetCode
查看>>
论程序员加班的害处
查看>>
codeblocks快捷键
查看>>
基于HTML5的WebGL设计汉诺塔3D游戏
查看>>
WPF资料链接
查看>>