РЕАЛИЗАЦИЯ МНОГОПОТОКОВОГО "АСИНХРОННОГО СЕРВЕРА TCP" И RPC ДЛЯ ОС LINUX
В заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обмен в данный момент времени. Код для процедуры serv_request , выполняемой ведомыми потоками может быть взят из различных источников ( см. [1],[2],[3]). Параметр , передаваемый в процедуру serv_request , явлется дескриптором пассивного сокета , создаваемый ведущим потоком с помощью вызова процедуры getServerSocket( ). Необходимо отметить, что приведенное решение основано на свойстве Linux эффективно распараллеливать вызов accept( ). В противном случае возникает необходимость в блокировке мьютекса перед вызовом accept( ), что влечет за собой последовательное выполнение потоками сервера критической части кода, т.е. вызова accept( ). В среде же Red Hat Linux 9 (например) эта предосторожность не нужна и только снижает производительность. Детальное описание Posix Threads API на русском языке может быть найдено в [2]. Приводится также модифицированный код заглушки sample_svc.c (не за- висящий от шаблона sample.x) , позволяюший скомпилировать многопото- ковый сервер RPC в среде LINUX. /*
* ServerNBTHR.c
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define NUM_THREADS 512
pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;
void
die(const char *func, int err)
{
fprintf(stderr,"%s: %s\n",func, strerror(err));
abort();
}
void
bark(const char *func, int err)
{
fprintf(stderr,"%s: %s\n",func, strerror(err));
}
/*
Описание поцедуры ведущего потока , которая возвращает
дескрипторов пассивного сокета, привязанного к адресу
сервера.
*/
int
getServerSocket(unsigned short int port)
{
int listenSocket;
struct sockaddr_in listenSockaddr;
if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0)
die("socket()",errno);
memset(&listenSockaddr, 0, sizeof(listenSockaddr));
listenSockaddr.sin_family = PF_INET;
listenSockaddr.sin_port = htons(port);
listenSockaddr.sin_addr.s_addr = INADDR_ANY;
if(bind(listenSocket,(struct sockaddr*)&listenSockaddr,
sizeof(listenSockaddr)) < 0)
die("bind()",errno);
if(listen(listenSocket,5)<0)
die("listen()",errno);
return listenSocket;
}
/*
Описание процедуры выполняемой всеми ведомыми потоками
*/
void *
serv_request(void *data)
{
struct connection_cb
{
int dataSocket;
char data[256];
int dataSent;
int dataToSend;
int isReading;
struct connection_cb *next;
};
struct connection_cb *connections = NULL;
int listenSocket = (int)data;
if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0)
die("fcntl()",errno);
while(1)
{
fd_set readFdSet;
fd_set writeFdSet;
struct connection_cb *currentConn, **currentConnPtr, *tempConn;
int maxFdNum;
FD_ZERO(&readFdSet);
FD_ZERO(&writeFdSet);
/*
Добавление дескриптора к множеству readFdSet
*/
FD_SET(listenSocket,&readFdSet);
maxFdNum = listenSocket;
for(currentConn = connections;currentConn!=NULL;currentConn =
currentConn->next)
{
if(currentConn->isReading)
FD_SET(currentConn->dataSocket,&readFdSet);
else
FD_SET(currentConn->dataSocket,&writeFdSet);
maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn-
>dataSocket : maxFdNum;
}
/*
Получение множества дескрипторов сокетов для обработки
*/
if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0)
{
if(errno == EINTR)
continue;
die("select()",errno);
}
currentConnPtr=&connections;
while(*currentConnPtr!=NULL)
{
/*
Проверка принадлежности дескриптора
(*currentConnPtr)->dataSocket к множеству readFdSet
*/
if((*currentConnPtr)->isReading &&
FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet))
{
int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data,
sizeof((*currentConnPtr)->data),0);
if(result < 0)
{
if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK)
{
bark("recv()",errno);
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
}
else
if(result==0)
{
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
else
{
(*currentConnPtr)->dataToSend = result;
(*currentConnPtr)->dataSent = 0;
(*currentConnPtr)->isReading = 0;
printf("Recieving as Slave Thread id = '%d' \n",pthread_self());
}
}
else
/*
Проверка принадлежности дескриптора
(*currentConnPtr)->dataSocket к множеству writedFdSet
*/
if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet))
{
int result = send((*currentConnPtr)->dataSocket,
(*currentConnPtr)->data+(*currentConnPtr)->dataSent,
(*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0);
if(result < 0)
{
if(errno!=EINTR && errno!=EAGAIN)
{
bark("write()",errno);
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
}
else
{
(*currentConnPtr)->dataSent +=result;
if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend)
(*currentConnPtr)->isReading = 1;
}
}
currentConnPtr = &((*currentConnPtr)->next);
printf("Sending as Slave Thread id = '%d' \n",pthread_self());
}
/*
Проверка принадлежности дескриптора listenSocket
к множеству readFdSet,т.е. необходимости обработать
вызов connect( ) от нового клиента.
*/
if(FD_ISSET(listenSocket,&readFdSet))
{
while(1)
{
/*
Вызовы pthread_mutex_lock, pthread_mutex_unlock
Не нужны в среде Linux
*/
pthread_mutex_lock(&request_mutex);
int result = accept(listenSocket,(struct sockaddr*)NULL,NULL);
pthread_mutex_unlock(&request_mutex);
if(result < 0)
{
if(errno==EAGAIN // errno == EWOULDBLOCK)
break;
die("accept()",errno);
}
else
{
*currentConnPtr = malloc(sizeof(struct connection_cb));
if(*currentConnPtr==NULL)
die("malloc()",0);
if(fcntl(result,F_SETFL,O_NONBLOCK)<0)
die("fcntl()",errno);
(*currentConnPtr)->dataSocket = result;
(*currentConnPtr)->isReading = 1;
(*currentConnPtr)->next = 0;
currentConnPtr = &((*currentConnPtr)->next);
printf("Accepting as Master Thread id = '%d' \n",pthread_self());
}
}
}
}
}
int
main(int argc,char *argv[])
{
int k;
int descSock;
char *service="1500";
switch(argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
printf ("Usage: ./ServerBNTH [port]\n");
exit(1);
}
size_t stacksize;
pthread_t p_thread[NUM_THREADS];
/*
Установка размера стека для ведомых потоков
*/
pthread_attr_t attr;
pthread_attr_init(&attr);
stacksize = 500000;
pthread_attr_setstacksize (&attr, stacksize);
pthread_attr_getstacksize (&attr, &stacksize);
/*
Получение значения дескриптора пассивного сокета
*/
descSock = getServerSocket(atoi(service));
/*
Запуск ведомых потоков
*/
for(k=0; k pthread_create(&p_thread[k],&attr,serv_request,(void*)descSock);
printf("Thread %d started \n",k);
}
pthread_attr_destroy(&attr);
for(k=0;k}
Ниже приведен модифицированный код square_svc.c (для шаблона square.x),
позволяющий скомпилировать многопотоковый сервер RPC в среде Red Hat Linux 9.0
Шаблон square.x:
struct square_in {
long arg1;
};
struct square_out {
long res1;
};
program SQUARE_PROG {
version SQUARE_VERS {
square_out SQUAREPROC(square_in) = 1;
} = 2 ;
} = 0x31230000;
Вызов rpcgen для генерации заглушек клиента,сервера и xdr файла :
$ rpcgen -a -M square.x
Код процедур сервера:
/*
ServerSideProc.c
*/
#include "square.h"
#include
#include
#include
#include
#include
#include
#include
int request=0;
bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp)
{
printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1);
/*
Имитация работы процедуры , выполняемой потоками сервера
*/
sleep(5);
outp->res1=inp->arg1*inp->arg1;
printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1);
return(TRUE);
}
int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result)
{
xdr_free(xdr_result,result);
return(1);
}
Модифицированный файл square_svc.c:
/* square_svc.c
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "square.h"
#include
#include
#include
#include
#include
#include
#include
#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif
pthread_t p_thread;
pthread_attr_t attr;
/*
Процедура выполняемая потоком
*/
void *
serv_request(void *data)
{
struct thr_data
{
struct svc_req *rqstp;
SVCXPRT *transp;
} *ptr_data;
{
union {
square_in squareproc_2_arg;
} argument;
union {
square_out squareproc_2_res;
} result;
bool_t retval;
xdrproc_t _xdr_argument, _xdr_result;
bool_t (*local)(char *, void *, struct svc_req *);
/*
Распаковка данных , переданных в процедуру при запуске потока.
*/
ptr_data = (struct thr_data *)data;
struct svc_req *rqstp = ptr_data->rqstp;
register SVCXPRT *transp = ptr_data->transp;
switch (rqstp->rq_proc) {
case NULLPROC:
(void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
return;
case SQUAREPROC:
_xdr_argument = (xdrproc_t) xdr_square_in;
_xdr_result = (xdrproc_t) xdr_square_out;
local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc;
break;
default:
svcerr_noproc (transp);
return;
}
memset ((char *)&argument, 0, sizeof (argument));
if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
svcerr_decode (transp);
return;
}
/*
Стандартный вызов функции сервера.
Данные для вызова уже приведены к стандарту.
*/
retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result))
{
svcerr_systemerr (transp);
}
if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
fprintf (stderr, "%s", "unable to free arguments");
exit (1);
}
if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result))
fprintf (stderr, "%s", "unable to free results");
return;
}
}
/*
Принципиально измененный код square_prog_2 , стартующей теперь
новый поток для каждого инициированного клиентом вызова
процедуры на удаленном сервере
*/
static void
square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
struct data_str
{
struct svc_req *rqstp;
SVCXPRT *transp;
} *data_ptr =(struct data_str*)malloc(sizeof(struct data_str);
{
/*
Упаковка данных в структуру для передачи ссылки на нее,
как параметра запускаемому потоку
*/
data_ptr->rqstp = rqstp;
data_ptr->transp = transp;
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr);
}
}
int
main (int argc, char **argv)
{
register SVCXPRT *transp;
pmap_unset (SQUARE_PROG, SQUARE_VERS);
transp = svcudp_create(RPC_ANYSOCK);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create udp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp).");
exit(1);
}
transp = svctcp_create(RPC_ANYSOCK, 0, 0);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create tcp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp).");
exit(1);
}
svc_run ();
fprintf (stderr, "%s", "svc_run returned");
exit (1);
/* NOTREACHED */
}
Компиляция ServerSQUARE:
$ gcc -o ServerSQUARE ServerSideProc.c square_svc.c square_xdr.c -lprthread -lnsl
Код клиента:
/*
* ClientSideProc.c
*/
#include /* for memset */
#include "square.h"
#include
#include
#include
#include
#include
#include
#include
int
main (int argc,char **argv)
{
CLIENT *cl;
square_in in;
square_out out;
if (argc != 3 )
{ printf ("Usage : client \n");
exit(1);
}
cl = clnt_create(argv[1],SQUARE_PROG,SQUARE_VERS,"tcp");
if (cl == NULL) {
clnt_perror (cl, "call failed");
exit (1);
}
in.arg1 = atol(argv[2]);
if (squareproc_2(&in,&out,cl) != RPC_SUCCESS)
{
printf ("%s\n" , clnt_perror (cl,argv[1] ));
exit(1);
}
printf("result: %ld\n",out.res1);
exit(0);
}
Компиляция ClientSQUARE:
$ gcc -o ClientSQUARE ClientSideProc.c square_clnt.c square_xdr.c -lprthread -lnsl
Далее приведем результаты тестирования (сp. [3] ,Глава "SUN RPC"):
[root@dell4500 SQWMT]# cat square.bsh
./ClientSQUARE dell4500.redhat 10 & ./ClientSQUARE dell4500.redhat 11 & ./ClientSQUARE dell4500.redhat 12 & ./ClientSQUARE dell4500.redhat 21 & ./ClientSQUARE dell4500.redhat 13 & ./ClientSQUARE dell4500.redhat 14 & ./ClientSQUARE dell4500.redhat 15 & ./ClientSQUARE dell4500.redhat 16 & ./ClientSQUARE dell4500.redhat 17 & ./ClientSQUARE dell4500.redhat 18 & ./ClientSQUARE dell4500.redhat 19 & ./ClientSQUARE dell4500.redhat 20 &
Вывод на машине клиента:
[root@dell4500 SQWMT]# ./square.bsh
[root@dell4500 SQWMT]# result: 196
result: 225
result: 256
result: 289
result: 121
result: 144
result: 441
result: 169
result: 100
result: 324
result: 361
result: 400
Вывод на машине сервера:
[root@dell4500 SQWMT]# ./ServerSQUARE
Thread id = '1082453184' started, arg = 14
Thread id = '1090841664' started, arg = 15
Thread id = '1099230144' started, arg = 16
Thread id = '1116941120' started, arg = 17
Thread id = '1125329600' started, arg = 11
Thread id = '1133718080' started, arg = 12
Thread id = '1142106560' started, arg = 21
Thread id = '1150495040' started, arg = 13
Thread id = '1158883520' started, arg = 10
Thread id = '1167272000' started, arg = 18
Thread id = '1175660480' started, arg = 19
Thread id = '1184048960' started, arg = 20
Thread id = '1082453184' is done 196
Thread id = '1090841664' is done 225
Thread id = '1099230144' is done 256
Thread id = '1116941120' is done 289
Thread id = '1125329600' is done 121
Thread id = '1133718080' is done 144
Thread id = '1142106560' is done 441
Thread id = '1150495040' is done 169
Thread id = '1158883520' is done 100
Thread id = '1167272000' is done 324
Thread id = '1175660480' is done 361
Thread id = '1184048960' is done 400
Страница сайта http://silicontaiga.ru
Оригинал находится по адресу http://silicontaiga.ru/home.asp?artId=6506
|