分类目录归档:网络编程

redis的epoll网络模型实例——redis源码分析之网络

linux网络编程之epoll学习
http://www.cxphp.com/?p=224

对redis中使用的网络框架结构,相信很多文章都写了、也分析了,但是很少有抽取其结构出来的实作的。
本文根据上面的例子修改而成,使整个类似redis的epoll的读、写流转起来。
本epoll模型是根据redis中使用的epoll网络事件模型抽取而来。
socket.h

#ifndef _SOCKET_H_
#define _SOCKET_H_

#include 
#include 
#include 
#include 
#include 

#define EV_READ 1
#define EV_WRITE 2
#define EV_SIZE (1024 * 10)

typedef void procEvent(int fd, void *argv);

typedef struct fileEvent
{
	procEvent *readProc;
	procEvent *writeProc;
	void *argv;
}FileEvent;

typedef struct firedEvent
{
	int fd;
	int mask;
}FiredEvent;

typedef struct _event
{
	FileEvent fe[EV_SIZE];
	struct epoll_event events[EV_SIZE];
	FiredEvent fired[EV_SIZE];
	int epfd;
	unsigned char running;
}EventLoop;

int setnonblocking(int fd);
int createSocket(char *host, unsigned short port);
int createEvent(EventLoop *el, int fd);
int enableEventIn(EventLoop *el, int fd, procEvent *proc);
int disableEventIn(EventLoop *el, int fd);
int enableEventOut(EventLoop *el, int fd, procEvent *proc);
int disableEventOut(EventLoop *el, int fd);
int deleteEvent(EventLoop *el, int fd, short mask);
int dispatchEvent(EventLoop *el);
void stopEvent(EventLoop *el);
EventLoop *initEvent();

#endif

socket.c

#include 
#include "socket.h"

int setnonblocking(int fd)
{
	if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) == -1)
		return -1;

	return 0;
}


int createSocket(char *host, unsigned short port)
{
	struct sockaddr_in addr;
	int fd = socket(AF_INET, SOCK_STREAM, 0);
	if(fd == -1)
		return -1;


	if(setnonblocking(fd) == -1)
	{
		close(fd);
		return -1;
	}


	int reuse_addr = 1;
	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr));

	addr.sin_family = AF_INET;
	addr.sin_port	= htons(port);
	addr.sin_addr.s_addr = inet_addr(host);
	if(bind(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)) == -1)
	{
		close(fd);
		return -1;
	}

	if(listen(fd, 512) == -1)
	{
		close(fd);
		return -1;
	}

	return fd;
}

EventLoop *initEvent()
{
	EventLoop *el = malloc(sizeof(EventLoop));
	int i = 0;
	el->epfd = epoll_create(1024);
	if(el->epfd == -1)
	{
		free(el);
		return NULL;
	}

	for(i = 0; i < EV_SIZE; ++i)
	{
		el->events[i].events = 0;
	}

	return el;
}


int createEvent(EventLoop *el, int fd)
{
	struct epoll_event ee;

	ee.events = 0;
	ee.data.fd = fd;


	if(epoll_ctl(el->epfd, EPOLL_CTL_ADD, fd, &ee) == -1)
		return -1;

	return 0;
}

int enableEventIn(EventLoop *el, int fd, procEvent *proc)
{
	struct epoll_event *pee = el->events + fd;
	pee->events |= EPOLLIN;
	pee->data.fd = fd;
	el->fe[fd].readProc = proc;
	if(epoll_ctl(el->epfd, EPOLL_CTL_MOD, fd, pee) == -1)
			return -1;

	return 0;
}

int disableEventIn(EventLoop *el, int fd)
{
	struct epoll_event *pee = el->events + fd;
	pee->events &= ~EPOLLIN;
	pee->data.fd = fd;
	if(epoll_ctl(el->epfd, EPOLL_CTL_MOD, fd, pee) == -1)
			return -1;

	return 0;
}

int enableEventOut(EventLoop *el, int fd, procEvent *proc)
{
	struct epoll_event *pee = el->events + fd;
	pee->events |= EPOLLOUT;
	pee->data.fd = fd;
	el->fe[fd].writeProc = proc;
	if(epoll_ctl(el->epfd, EPOLL_CTL_MOD, fd, pee) == -1)
			return -1;

	return 0;
}

int disableEventOut(EventLoop *el, int fd)
{
	struct epoll_event *pee = el->events + fd;
	pee->events &= ~EPOLLOUT;
	pee->data.fd = fd;
	if(epoll_ctl(el->epfd, EPOLL_CTL_MOD, fd, pee) == -1)
			return -1;

	return 0;
}


int deleteEvent(EventLoop *el, int fd, short mask)
{
	struct epoll_event ee;
	ee.events = 0;
	ee.data.fd = fd;

	if(mask & EV_READ)
	{
		ee.events &= ~EPOLLIN;
		el->fe[fd].readProc = NULL;
	}

	if(mask & EV_WRITE)
	{
		ee.events &= ~EPOLLOUT;
		el->fe[fd].writeProc = NULL;
	}

	if(epoll_ctl(el->epfd, EPOLL_CTL_MOD, fd, &ee) == -1)
		return -1;

	return 0;
}

int dispatchEvent(EventLoop *el)
{
	int nfds, i;
	el->running = 1;
	while(el->running)
	{
		nfds = epoll_wait(el->epfd, el->events, EV_SIZE, 100);
		for(i = 0; i < nfds; ++i)
		{
			struct epoll_event *ee = el->events + i;
			int mask = 0;
			if(ee->events & EPOLLIN)
				mask |= EV_READ;

			if(ee->events &EPOLLOUT)
				mask |= EV_WRITE;

			el->fired[i].fd = ee->data.fd;
			el->fired[i].mask = mask;
		}

		for(i = 0; i < nfds; ++i)
		{
			int mask = el->fired[i].mask;
			int fd = el->fired[i].fd;

			FileEvent *fe = &el->fe[fd];

			if(mask & EV_READ)
				fe->readProc(fd, fe->argv);

			if(mask & EV_WRITE)
				fe->writeProc(fd, fe->argv);
		}

		if(nfds == 0)
		{
			printf("nfds = %d\n", nfds);
			usleep(1000 * 1000);
		}
	}

	return 0;
}

void stopEvent(EventLoop *el)
{
	el->running = 0;
}

main.c

#include 
#include 
#include 
#include 
#include 
#include "socket.h"

#define MAX_BUFFER_SIZE 4096

EventLoop *base;

void onRead(int fd, void* argv);

void onWrite(int fd, void* argv)
{
	char *pbuf = "I've received...\n";
	send(fd, pbuf, strlen(pbuf), 0);
	disableEventOut(base, fd);
	enableEventIn(base, fd, onRead);
}

void onRead(int fd, void* argv)
{
	char buf[MAX_BUFFER_SIZE] = {0};
	int nread = 0;
	if((nread = read(fd, buf, MAX_BUFFER_SIZE)) == 0)
	{
		deleteEvent(base, fd, EV_READ & EV_WRITE);
		printf("close fd = %d\n", fd);
		close(fd);
		return;
	}

	buf[nread] = '\0';

	printf("read buf = [%s] from fd = %d\n", buf, fd);
	disableEventIn(base, fd);
	enableEventOut(base, fd, onWrite);
}

void onAccept(int fd, void* argv)
{
	struct sockaddr_in addr;
	int len = sizeof(addr);
	int cfd;

	while(1)
	{
		cfd = accept(fd, (struct sockaddr*)&addr, &len);
		if(cfd == -1)
		{
			if(errno == EINTR)
				continue;

			printf("accept failed\n");
			return;
		}

		break;
	}

	setnonblocking(cfd);
	printf("new client accept, cfd = %d\n", cfd);
	createEvent(base, cfd);
	enableEventIn(base, cfd, onRead);
	disableEventOut(base, cfd);
}

void sigHandle(const int sig)
{
	printf("sighandled\n");
	kill(getpid(), sig);
	exit(EXIT_SUCCESS);
}

int main(int argc, char* argv[])
{
	int fd;

	signal(SIGINT, sigHandle);
	signal(SIGKILL, sigHandle);
	signal(SIGQUIT, sigHandle);
	signal(SIGTERM, sigHandle);
	signal(SIGHUP, sigHandle);

	if((fd = createSocket("0.0.0.0", 7000)) == -1){
		perror("create socket failed\n");
		exit(EXIT_FAILURE);
	}

	base = initEvent();
	if(base == NULL){
		perror("initialize event failed\n");
		exit(EXIT_FAILURE);
	}

	createEvent(base, fd, EV_READ, onAccept, NULL);
	dispatchEvent(base);
	close(fd);

	return 0;
}

编译

cc socket.c main.c  -g -O0

setsockopt和getsockopt

#include	"netfunc.h"
#include			/* for TCP_xxx defines */

union val {
  int				i_val;
  long				l_val;
  struct linger		linger_val;
  struct timeval	timeval_val;
} val;

static char	*sock_str_flag(union val *, int);
static char	*sock_str_int(union val *, int);
static char	*sock_str_linger(union val *, int);
static char	*sock_str_timeval(union val *, int);

struct sock_opts {
  const char	   *opt_str;
  int		opt_level;
  int		opt_name;
  char   *(*opt_val_str)(union val *, int);
} sock_opts[] = {
	{ "SO_BROADCAST",		SOL_SOCKET,	SO_BROADCAST,	sock_str_flag },
	{ "SO_DEBUG",			SOL_SOCKET,	SO_DEBUG,		sock_str_flag },
	{ "SO_DONTROUTE",		SOL_SOCKET,	SO_DONTROUTE,	sock_str_flag },
	{ "SO_ERROR",			SOL_SOCKET,	SO_ERROR,		sock_str_int },
	{ "SO_KEEPALIVE",		SOL_SOCKET,	SO_KEEPALIVE,	sock_str_flag },
	{ "SO_LINGER",			SOL_SOCKET,	SO_LINGER,		sock_str_linger },
	{ "SO_OOBINLINE",		SOL_SOCKET,	SO_OOBINLINE,	sock_str_flag },
	{ "SO_RCVBUF",			SOL_SOCKET,	SO_RCVBUF,		sock_str_int },
	{ "SO_SNDBUF",			SOL_SOCKET,	SO_SNDBUF,		sock_str_int },
	{ "SO_RCVLOWAT",		SOL_SOCKET,	SO_RCVLOWAT,	sock_str_int },
	{ "SO_SNDLOWAT",		SOL_SOCKET,	SO_SNDLOWAT,	sock_str_int },
	{ "SO_RCVTIMEO",		SOL_SOCKET,	SO_RCVTIMEO,	sock_str_timeval },
	{ "SO_SNDTIMEO",		SOL_SOCKET,	SO_SNDTIMEO,	sock_str_timeval },
	{ "SO_REUSEADDR",		SOL_SOCKET,	SO_REUSEADDR,	sock_str_flag },
#ifdef	SO_REUSEPORT
	{ "SO_REUSEPORT",		SOL_SOCKET,	SO_REUSEPORT,	sock_str_flag },
#else
	{ "SO_REUSEPORT",		0,			0,				NULL },
#endif
	{ "SO_TYPE",			SOL_SOCKET,	SO_TYPE,		sock_str_int },
	//{ "SO_USELOOPBACK",		SOL_SOCKET,	SO_USELOOPBACK,	sock_str_flag },
	{ "IP_TOS",				IPPROTO_IP,	IP_TOS,			sock_str_int },
	{ "IP_TTL",				IPPROTO_IP,	IP_TTL,			sock_str_int },
#ifdef	IPV6_DONTFRAG
	{ "IPV6_DONTFRAG",		IPPROTO_IPV6,IPV6_DONTFRAG,	sock_str_flag },
#else
	{ "IPV6_DONTFRAG",		0,			0,				NULL },
#endif
#ifdef	IPV6_UNICAST_HOPS
	{ "IPV6_UNICAST_HOPS",	IPPROTO_IPV6,IPV6_UNICAST_HOPS,sock_str_int },
#else
	{ "IPV6_UNICAST_HOPS",	0,			0,				NULL },
#endif
#ifdef	IPV6_V6ONLY
	{ "IPV6_V6ONLY",		IPPROTO_IPV6,IPV6_V6ONLY,	sock_str_flag },
#else
	{ "IPV6_V6ONLY",		0,			0,				NULL },
#endif
	{ "TCP_MAXSEG",			IPPROTO_TCP,TCP_MAXSEG,		sock_str_int },
	{ "TCP_NODELAY",		IPPROTO_TCP,TCP_NODELAY,	sock_str_flag },
#ifdef	SCTP_AUTOCLOSE
	{ "SCTP_AUTOCLOSE",		IPPROTO_SCTP,SCTP_AUTOCLOSE,sock_str_int },
#else
	{ "SCTP_AUTOCLOSE",		0,			0,				NULL },
#endif
#ifdef	SCTP_MAXBURST
	{ "SCTP_MAXBURST",		IPPROTO_SCTP,SCTP_MAXBURST,	sock_str_int },
#else
	{ "SCTP_MAXBURST",		0,			0,				NULL },
#endif
#ifdef	SCTP_MAXSEG
	{ "SCTP_MAXSEG",		IPPROTO_SCTP,SCTP_MAXSEG,	sock_str_int },
#else
	{ "SCTP_MAXSEG",		0,			0,				NULL },
#endif
#ifdef	SCTP_NODELAY
	{ "SCTP_NODELAY",		IPPROTO_SCTP,SCTP_NODELAY,	sock_str_flag },
#else
	{ "SCTP_NODELAY",		0,			0,				NULL },
#endif
	{ NULL,					0,			0,				NULL }
};


int
main(int argc, char **argv)
{
	int			fd;
	socklen_t		len;
	struct sock_opts	*ptr;

	for (ptr = sock_opts; ptr->opt_str != NULL; ptr++)
	{
		printf("%s: ", ptr->opt_str);
		if (ptr->opt_val_str == NULL)
			printf("(undefined)\n");
		else
		{
			switch(ptr->opt_level)
			{
				case SOL_SOCKET:
				case IPPROTO_IP:
				case IPPROTO_TCP:
					fd = Socket(AF_INET, SOCK_STREAM, 0);
					break;
#ifdef	IPPROTO_SCTP
				case IPPROTO_SCTP:
					fd = Socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP);
					break;
#endif
				default:
					printf("Can't create fd for level %d\n", ptr->opt_level);
					exit(1);
			}

			len = sizeof(val);
			if (getsockopt(fd, ptr->opt_level, ptr->opt_name,&val, &len) == -1){
				printf("getsockopt error\n");
			}else{
				printf("default = %s\n", (*ptr->opt_val_str)(&val, len));
			}
			close(fd);
		}
	}
	exit(0);
}


static char	strres[128];

static char	*
sock_str_flag(union val *ptr, int len)
{
	if (len != sizeof(int))
		snprintf(strres, sizeof(strres), "size (%d) not sizeof(int)", len);
	else
		snprintf(strres, sizeof(strres),
				 "%s", (ptr->i_val == 0) ? "off" : "on");
	return(strres);
}

static char	*
sock_str_int(union val *ptr, int len)
{
	if (len != sizeof(int))
		snprintf(strres, sizeof(strres), "size (%d) not sizeof(int)", len);
	else
		snprintf(strres, sizeof(strres), "%d", ptr->i_val);
	return(strres);
}

static char	*
sock_str_linger(union val *ptr, int len)
{
	struct linger	*lptr = &ptr->linger_val;

	if (len != sizeof(struct linger))
		snprintf(strres, sizeof(strres),
				 "size (%d) not sizeof(struct linger)", len);
	else
		snprintf(strres, sizeof(strres), "l_onoff = %d, l_linger = %d",
				 lptr->l_onoff, lptr->l_linger);
	return(strres);
}

static char	*
sock_str_timeval(union val *ptr, int len)
{
	struct timeval	*tvptr = &ptr->timeval_val;

	if (len != sizeof(struct timeval))
		snprintf(strres, sizeof(strres),
				 "size (%d) not sizeof(struct timeval)", len);
	else
		snprintf(strres, sizeof(strres), "%d sec, %d usec",
				 tvptr->tv_sec, tvptr->tv_usec);
	return(strres);
}