Russian version
English version
ОБ АЛЬЯНСЕ | НАШИ УСЛУГИ | КАТАЛОГ РЕШЕНИЙ | ИНФОРМАЦИОННЫЙ ЦЕНТР | СТАНЬТЕ СПОНСОРАМИ SILICON TAIGA | ISDEF | КНИГИ И CD | ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ | УПРАВЛЕНИЕ КАЧЕСТВОМ | РОССИЙСКИЕ ТЕХНОЛОГИИ | НАНОТЕХНОЛОГИИ | ЮРИДИЧЕСКАЯ ПОДДЕРЖКА | АНАЛИТИКА | КАРТА САЙТА | КОНТАКТЫ
 
Программное обеспечение
 
Для зарегистрированных пользователей
 
РАССЫЛКИ НОВОСТЕЙ
IT-Новости
Новости компаний
Российские технологии
Новости ВПК
Нанотехнологии
 
Поиск по статьям
 
RSS-лента
Подписаться
ОС и Офисное ПО

РЕАЛИЗАЦИЯ МНОГОПОТОКОВОГО "АСИНХРОННОГО СЕРВЕРА TCP" И RPC ДЛЯ ОС LINUX

В заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обмен в данный момент времени. Код для процедуры serv_request , выполняемой ведомыми потоками может быть взят из различных источников ( см. [1],[2],[3]). Параметр , передаваемый в процедуру serv_request , явлется дескриптором пассивного сокета , создаваемый ведущим потоком с помощью вызова процедуры getServerSocket( ).
Необходимо отметить, что приведенное решение основано на свойстве Linux эффективно распараллеливать вызов accept( ). В противном случае возникает необходимость в блокировке мьютекса перед вызовом accept( ),
что влечет за собой последовательное выполнение потоками сервера критической части кода, т.е. вызова accept( ). В среде же Red Hat Linux 9 (например) эта предосторожность не нужна и только снижает производительность. Детальное описание Posix Threads API на русском языке может быть найдено в [2].
Приводится также модифицированный код заглушки sample_svc.c (не за- висящий от шаблона sample.x) , позволяюший скомпилировать многопото- ковый сервер RPC в среде LINUX.

/*
* ServerNBTHR.c
*/
#include 

#include #include #include #include #include #include

#include #include #include #include #define NUM_THREADS 512

pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;

void die(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); abort(); }

void bark(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); } /* Описание поцедуры ведущего потока , которая возвращает дескрипторов пассивного сокета, привязанного к адресу сервера. */ int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0) die("socket()",errno); memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = PF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; if(bind(listenSocket,(struct sockaddr*)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()",errno); if(listen(listenSocket,5)<0) die("listen()",errno); return listenSocket; }

/* Описание процедуры выполняемой всеми ведомыми потоками */

void * serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL;

int listenSocket = (int)data; if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); while(1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; int maxFdNum;

FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); /* Добавление дескриптора к множеству readFdSet */ FD_SET(listenSocket,&readFdSet); maxFdNum = listenSocket; for(currentConn = connections;currentConn!=NULL;currentConn = currentConn->next) {

if(currentConn->isReading) FD_SET(currentConn->dataSocket,&readFdSet); else FD_SET(currentConn->dataSocket,&writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn- >dataSocket : maxFdNum; } /* Получение множества дескрипторов сокетов для обработки */ if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0) {

if(errno == EINTR) continue; die("select()",errno); } currentConnPtr=&connections; while(*currentConnPtr!=NULL) {

/* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству readFdSet */

if((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet)) {

int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data),0);

if(result < 0) { if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK) { bark("recv()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; }

} else if(result==0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; printf("Recieving as Slave Thread id = '%d' \n",pthread_self()); }

} else /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству writedFdSet */ if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data+(*currentConnPtr)->dataSent, (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0); if(result < 0) {

if(errno!=EINTR && errno!=EAGAIN) { bark("write()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent +=result;

if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; }

}

currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%d' \n",pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect( ) от нового клиента. */ if(FD_ISSET(listenSocket,&readFdSet)) { while(1) {

/* Вызовы pthread_mutex_lock, pthread_mutex_unlock Не нужны в среде Linux */ pthread_mutex_lock(&request_mutex); int result = accept(listenSocket,(struct sockaddr*)NULL,NULL); pthread_mutex_unlock(&request_mutex); if(result < 0) {

if(errno==EAGAIN // errno == EWOULDBLOCK) break; die("accept()",errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if(*currentConnPtr==NULL) die("malloc()",0); if(fcntl(result,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno);

(*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%d' \n",pthread_self()); } } }

} } int main(int argc,char *argv[]) { int k; int descSock; char *service="1500"; switch(argc) { case 1: break; case 2: service = argv[1]; break; default: printf ("Usage: ./ServerBNTH [port]\n"); exit(1); } size_t stacksize; pthread_t p_thread[NUM_THREADS];

/* Установка размера стека для ведомых потоков */

pthread_attr_t attr; pthread_attr_init(&attr); stacksize = 500000; pthread_attr_setstacksize (&attr, stacksize); pthread_attr_getstacksize (&attr, &stacksize);

/* Получение значения дескриптора пассивного сокета */

descSock = getServerSocket(atoi(service));

/* Запуск ведомых потоков */

for(k=0; k pthread_create(&p_thread[k],&attr,serv_request,(void*)descSock); printf("Thread %d started \n",k); } pthread_attr_destroy(&attr);

for(k=0;k}

Ниже приведен модифицированный код square_svc.c (для шаблона square.x), позволяющий скомпилировать многопотоковый сервер RPC в среде Red Hat Linux 9.0

Шаблон square.x:

struct square_in { long arg1; }; struct square_out { long res1; }; program SQUARE_PROG { version SQUARE_VERS {

square_out SQUAREPROC(square_in) = 1; } = 2 ; } = 0x31230000;

Вызов rpcgen для генерации заглушек клиента,сервера и xdr файла :

$ rpcgen -a -M square.x

Код процедур сервера:

/* ServerSideProc.c */ #include "square.h" #include #include #include #include

#include #include #include

int request=0;

bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp) { printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1); /* Имитация работы процедуры , выполняемой потоками сервера */ sleep(5); outp->res1=inp->arg1*inp->arg1; printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1); return(TRUE);

} int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result) { xdr_free(xdr_result,result); return(1); }

Модифицированный файл square_svc.c:

/* square_svc.c * Please do not edit this file. * It was generated using rpcgen. */

#include "square.h" #include #include #include

#include #include #include #include

#ifndef SIG_PF #define SIG_PF void(*)(int) #endif

pthread_t p_thread; pthread_attr_t attr; /*

Процедура выполняемая потоком

*/

void * serv_request(void *data) { struct thr_data {

struct svc_req *rqstp; SVCXPRT *transp; } *ptr_data;

{

union { square_in squareproc_2_arg; } argument; union { square_out squareproc_2_res; } result; bool_t retval; xdrproc_t _xdr_argument, _xdr_result; bool_t (*local)(char *, void *, struct svc_req *);

/*

Распаковка данных , переданных в процедуру при запуске потока.

*/

ptr_data = (struct thr_data *)data; struct svc_req *rqstp = ptr_data->rqstp; register SVCXPRT *transp = ptr_data->transp;

switch (rqstp->rq_proc) {

case NULLPROC: (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL); return; case SQUAREPROC: _xdr_argument = (xdrproc_t) xdr_square_in; _xdr_result = (xdrproc_t) xdr_square_out; local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc; break; default: svcerr_noproc (transp); return; } memset ((char *)&argument, 0, sizeof (argument)); if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { svcerr_decode (transp); return; } /*

Стандартный вызов функции сервера. Данные для вызова уже приведены к стандарту.

*/

retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);

if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) {

svcerr_systemerr (transp); } if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { fprintf (stderr, "%s", "unable to free arguments"); exit (1); } if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result)) fprintf (stderr, "%s", "unable to free results"); return; } } /*

Принципиально измененный код square_prog_2 , стартующей теперь новый поток для каждого инициированного клиентом вызова процедуры на удаленном сервере

*/

static void square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)

{ struct data_str { struct svc_req *rqstp; SVCXPRT *transp; } *data_ptr =(struct data_str*)malloc(sizeof(struct data_str);

{

/* Упаковка данных в структуру для передачи ссылки на нее, как параметра запускаемому потоку */

data_ptr->rqstp = rqstp; data_ptr->transp = transp; pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr); } }

int main (int argc, char **argv) { register SVCXPRT *transp;

pmap_unset (SQUARE_PROG, SQUARE_VERS);

transp = svcudp_create(RPC_ANYSOCK); if (transp == NULL) { fprintf (stderr, "%s", "cannot create udp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp)."); exit(1); }

transp = svctcp_create(RPC_ANYSOCK, 0, 0); if (transp == NULL) {

fprintf (stderr, "%s", "cannot create tcp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp)."); exit(1); }

svc_run (); fprintf (stderr, "%s", "svc_run returned"); exit (1); /* NOTREACHED */ }

Компиляция ServerSQUARE:

$ gcc -o ServerSQUARE ServerSideProc.c square_svc.c square_xdr.c -lprthread -lnsl

Код клиента:

/* * ClientSideProc.c */

#include /* for memset */ #include "square.h" #include #include #include #include #include

#include #include

int main (int argc,char **argv) { CLIENT *cl; square_in in; square_out out; if (argc != 3 ) { printf ("Usage : client \n"); exit(1); }

cl = clnt_create(argv[1],SQUARE_PROG,SQUARE_VERS,"tcp"); if (cl == NULL) { clnt_perror (cl, "call failed"); exit (1); } in.arg1 = atol(argv[2]); if (squareproc_2(&in,&out,cl) != RPC_SUCCESS) { printf ("%s\n" , clnt_perror (cl,argv[1] )); exit(1);

} printf("result: %ld\n",out.res1); exit(0); }

Компиляция ClientSQUARE:

$ gcc -o ClientSQUARE ClientSideProc.c square_clnt.c square_xdr.c -lprthread -lnsl

Далее приведем результаты тестирования (сp. [3] ,Глава "SUN RPC"):

[root@dell4500 SQWMT]# cat square.bsh

./ClientSQUARE dell4500.redhat 10 & ./ClientSQUARE dell4500.redhat 11 & ./ClientSQUARE dell4500.redhat 12 & ./ClientSQUARE dell4500.redhat 21 & ./ClientSQUARE dell4500.redhat 13 & ./ClientSQUARE dell4500.redhat 14 & ./ClientSQUARE dell4500.redhat 15 & ./ClientSQUARE dell4500.redhat 16 & ./ClientSQUARE dell4500.redhat 17 & ./ClientSQUARE dell4500.redhat 18 & ./ClientSQUARE dell4500.redhat 19 & ./ClientSQUARE dell4500.redhat 20 &

Вывод на машине клиента:

[root@dell4500 SQWMT]# ./square.bsh [root@dell4500 SQWMT]# result: 196 result: 225 result: 256 result: 289 result: 121 result: 144 result: 441 result: 169 result: 100 result: 324 result: 361 result: 400

Вывод на машине сервера:

[root@dell4500 SQWMT]# ./ServerSQUARE

Thread id = '1082453184' started, arg = 14 Thread id = '1090841664' started, arg = 15 Thread id = '1099230144' started, arg = 16 Thread id = '1116941120' started, arg = 17 Thread id = '1125329600' started, arg = 11 Thread id = '1133718080' started, arg = 12 Thread id = '1142106560' started, arg = 21 Thread id = '1150495040' started, arg = 13 Thread id = '1158883520' started, arg = 10 Thread id = '1167272000' started, arg = 18 Thread id = '1175660480' started, arg = 19 Thread id = '1184048960' started, arg = 20 Thread id = '1082453184' is done 196 Thread id = '1090841664' is done 225 Thread id = '1099230144' is done 256 Thread id = '1116941120' is done 289 Thread id = '1125329600' is done 121 Thread id = '1133718080' is done 144 Thread id = '1142106560' is done 441 Thread id = '1150495040' is done 169 Thread id = '1158883520' is done 100 Thread id = '1167272000' is done 324 Thread id = '1175660480' is done 361 Thread id = '1184048960' is done 400


  Рекомендовать страницу   Обсудить материал Написать редактору  
  Распечатать страницу
 
  Дата публикации: 24.06.2006  

ОБ АЛЬЯНСЕ | НАШИ УСЛУГИ | КАТАЛОГ РЕШЕНИЙ | ИНФОРМАЦИОННЫЙ ЦЕНТР | СТАНЬТЕ СПОНСОРАМИ SILICON TAIGA | ISDEF | КНИГИ И CD | ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ | УПРАВЛЕНИЕ КАЧЕСТВОМ | РОССИЙСКИЕ ТЕХНОЛОГИИ | НАНОТЕХНОЛОГИИ | ЮРИДИЧЕСКАЯ ПОДДЕРЖКА | АНАЛИТИКА | КАРТА САЙТА | КОНТАКТЫ

Дизайн и поддержка: Silicon Taiga   Обратиться по техническим вопросам  
Rambler's Top100 Rambler's Top100