#define NUM_THREADS 512
pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;
void
die(const char *func, int err)
{
 fprintf(stderr,"%s: %s\n",func, strerror(err));
 abort();
}
 
void
bark(const char *func, int err)
{
 fprintf(stderr,"%s: %s\n",func, strerror(err));
}
/* 
 Описание поцедуры ведущего потока , которая возвращает
дескрипторов пассивного сокета, привязанного к адресу
сервера.
*/
int
getServerSocket(unsigned short int port)
{
 int listenSocket;
 struct sockaddr_in listenSockaddr;
 
 if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0)
 die("socket()",errno);
 memset(&listenSockaddr, 0, sizeof(listenSockaddr));
 listenSockaddr.sin_family = PF_INET;
 listenSockaddr.sin_port = htons(port);
 listenSockaddr.sin_addr.s_addr = INADDR_ANY;
 
 if(bind(listenSocket,(struct sockaddr*)&listenSockaddr,
 sizeof(listenSockaddr)) < 0)
 die("bind()",errno);
 
 if(listen(listenSocket,5)<0)
 die("listen()",errno);
 
 return listenSocket;
}
/*
 Описание процедуры выполняемой всеми ведомыми потоками
*/
void *
serv_request(void *data)
{
struct connection_cb
{
 int dataSocket;
 char data[256];
 int dataSent;
 int dataToSend;
 int isReading;
 struct connection_cb *next;
};
 
struct connection_cb *connections = NULL;
int listenSocket = (int)data;
	if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0)
		die("fcntl()",errno);
	
	while(1)
	{
		fd_set readFdSet;
		fd_set writeFdSet;
		struct connection_cb *currentConn, **currentConnPtr, *tempConn;
		int maxFdNum;
		FD_ZERO(&readFdSet);
		FD_ZERO(&writeFdSet);
 
 /*
 Добавление дескриптора к множеству readFdSet
 */
		FD_SET(listenSocket,&readFdSet);
		maxFdNum = listenSocket;
		
 for(currentConn = connections;currentConn!=NULL;currentConn = 
 currentConn->next)
	{
 	if(currentConn->isReading)
	 	FD_SET(currentConn->dataSocket,&readFdSet);
		else
			FD_SET(currentConn->dataSocket,&writeFdSet);
	maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn-
 >dataSocket : maxFdNum; 
	}
 /*
 Получение множества дескрипторов сокетов для обработки
 */		
	if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0)
	{
		if(errno == EINTR)
			continue;
		die("select()",errno);
	}
		
	currentConnPtr=&connections;
	
	while(*currentConnPtr!=NULL)
	{
 /*
 Проверка принадлежности дескриптора 
 (*currentConnPtr)->dataSocket к множеству readFdSet
 */
	if((*currentConnPtr)->isReading && 
 FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet))
	{
 int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data,
 sizeof((*currentConnPtr)->data),0);
	if(result < 0)
	{
	if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK)
	{
		bark("recv()",errno);
 	close((*currentConnPtr)->dataSocket);
		tempConn = *currentConnPtr;
 	*currentConnPtr = (*currentConnPtr)->next;
		free(tempConn);
		continue;
	}
		}
	else 
 if(result==0)
		{
		close((*currentConnPtr)->dataSocket);
 	tempConn = *currentConnPtr;
		*currentConnPtr = (*currentConnPtr)->next;
		free(tempConn);
		continue;
		}
	else
	{
		(*currentConnPtr)->dataToSend = result;
		(*currentConnPtr)->dataSent = 0;
		(*currentConnPtr)->isReading = 0;
 printf("Recieving as Slave Thread id = '%d' \n",pthread_self());
	}
		}
	else 
 /*
 Проверка принадлежности дескриптора 
 (*currentConnPtr)->dataSocket к множеству writedFdSet
 */
 if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet))
	{
	int result = send((*currentConnPtr)->dataSocket, 
 (*currentConnPtr)->data+(*currentConnPtr)->dataSent,
 (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0);
			
	if(result < 0)
	{
	 if(errno!=EINTR && errno!=EAGAIN)
		{
			bark("write()",errno);
			close((*currentConnPtr)->dataSocket);
			tempConn = *currentConnPtr;
	 	*currentConnPtr = (*currentConnPtr)->next;
			free(tempConn);
			continue;
		}
			}
 	else
		{
			(*currentConnPtr)->dataSent +=result;
		if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend)
			(*currentConnPtr)->isReading = 1;
				}
			}
			currentConnPtr = &((*currentConnPtr)->next);
 printf("Sending as Slave Thread id = '%d' \n",pthread_self());
		}
 /*
 Проверка принадлежности дескриптора listenSocket
 к множеству readFdSet,т.е. необходимости обработать
 вызов connect( ) от нового клиента.
 */
 if(FD_ISSET(listenSocket,&readFdSet))
		{
			
 while(1)
	{
 /*
 Вызовы pthread_mutex_lock, pthread_mutex_unlock
 Не нужны в среде Linux
 */
 pthread_mutex_lock(&request_mutex);
	 int result = accept(listenSocket,(struct sockaddr*)NULL,NULL);
 pthread_mutex_unlock(&request_mutex);		
	if(result < 0)
	{
 	if(errno==EAGAIN // errno == EWOULDBLOCK)
			break;
		 die("accept()",errno);
			}
	else
	{
		*currentConnPtr = malloc(sizeof(struct connection_cb));
		if(*currentConnPtr==NULL)
		die("malloc()",0);
 
 	if(fcntl(result,F_SETFL,O_NONBLOCK)<0)
	 	die("fcntl()",errno);
		(*currentConnPtr)->dataSocket = result;
 (*currentConnPtr)->isReading = 1;
		(*currentConnPtr)->next = 0;
	 currentConnPtr = &((*currentConnPtr)->next);
 printf("Accepting as Master Thread id = '%d' \n",pthread_self());
		 	}
			}	
		}
	}
}
int
main(int argc,char *argv[])
{
int k;
int descSock;
char *service="1500";
switch(argc) {
case 1:
 break;
case 2:
 service = argv[1];
 break;
default:
 printf ("Usage: ./ServerBNTH [port]\n");
 exit(1);
}
 
size_t stacksize;
pthread_t p_thread[NUM_THREADS];
/*
 Установка размера стека для ведомых потоков
*/
pthread_attr_t attr;
 pthread_attr_init(&attr);
 stacksize = 500000;
 pthread_attr_setstacksize (&attr, stacksize);
 pthread_attr_getstacksize (&attr, &stacksize);
/*
 Получение значения дескриптора пассивного сокета
*/ 
 descSock = getServerSocket(atoi(service));
/* 
 Запуск ведомых потоков
*/
for(k=0; k pthread_create(&p_thread[k],&attr,serv_request,(void*)descSock);
 printf("Thread %d started \n",k);
}
 
pthread_attr_destroy(&attr);
for(k=0;k}
 Ниже приведен модифицированный код square_svc.c (для шаблона square.x),
позволяющий скомпилировать многопотоковый сервер RPC в среде Red Hat Linux 9.0
Шаблон square.x:
struct square_in {
 long arg1;
};
struct square_out {
 long res1;
};
program SQUARE_PROG {
 version SQUARE_VERS {
 square_out SQUAREPROC(square_in) = 1;
 } = 2 ;
} = 0x31230000;
Вызов rpcgen для генерации заглушек клиента,сервера и xdr файла :
 $ rpcgen -a -M square.x
Код процедур сервера:
 /*
 ServerSideProc.c
 */
 #include "square.h"
#include 
#include 
#include 
#include 
#include 
#include 
#include 
int request=0;
bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp)
{
 printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1);
 /* 
 Имитация работы процедуры , выполняемой потоками сервера
 */
 sleep(5);
 outp->res1=inp->arg1*inp->arg1;
 printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1);
 return(TRUE);
}
int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result)
{
 xdr_free(xdr_result,result);
 return(1);
}
Модифицированный файл square_svc.c:
/* square_svc.c
 * Please do not edit this file.
 * It was generated using rpcgen.
 */
#include "square.h"
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif
pthread_t p_thread;
pthread_attr_t attr;
 
/*
 Процедура выполняемая потоком
 */
void *
serv_request(void *data)
{
struct thr_data
{
struct svc_req *rqstp;
SVCXPRT *transp;
} *ptr_data;
{
 union {
 square_in squareproc_2_arg;
 } argument;
 union {
 square_out squareproc_2_res;
 } result;
 bool_t retval;
 xdrproc_t _xdr_argument, _xdr_result;
 bool_t (*local)(char *, void *, struct svc_req *);
/*
 Распаковка данных , переданных в процедуру при запуске потока.
*/
ptr_data = (struct thr_data *)data;
struct svc_req *rqstp = ptr_data->rqstp;
register SVCXPRT *transp = ptr_data->transp;
 
 switch (rqstp->rq_proc) {
 case NULLPROC:
 (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
 return;
 
 case SQUAREPROC:
 _xdr_argument = (xdrproc_t) xdr_square_in;
 _xdr_result = (xdrproc_t) xdr_square_out;
 local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc;
 break;
 
 default:
 svcerr_noproc (transp);
 return;
 }
 memset ((char *)&argument, 0, sizeof (argument));
 if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
 svcerr_decode (transp);
 return;
 }
 
 /* 
 Стандартный вызов функции сервера.
 Данные для вызова уже приведены к стандарту.
 */
 retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
 if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) 
 {
 svcerr_systemerr (transp);
 }
 if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
 fprintf (stderr, "%s", "unable to free arguments");
 exit (1);
 }
 if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result))
 fprintf (stderr, "%s", "unable to free results");
 return;
}
}
 
/* 
Принципиально измененный код square_prog_2 , стартующей теперь
новый поток для каждого инициированного клиентом вызова
процедуры на удаленном сервере
*/
static void
square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
struct data_str 
{
struct svc_req *rqstp;
SVCXPRT *transp;
} *data_ptr =(struct data_str*)malloc(sizeof(struct data_str);
{
/* 
 Упаковка данных в структуру для передачи ссылки на нее,
 как параметра запускаемому потоку
*/
data_ptr->rqstp = rqstp;
data_ptr->transp = transp;
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); 
pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr);
 }
}
int
main (int argc, char **argv)
{
	register SVCXPRT *transp;
	pmap_unset (SQUARE_PROG, SQUARE_VERS);
	transp = svcudp_create(RPC_ANYSOCK);
	if (transp == NULL) {
		fprintf (stderr, "%s", "cannot create udp service.");
		exit(1);
	}
	if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) {
		fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp).");
		exit(1);
	}
	transp = svctcp_create(RPC_ANYSOCK, 0, 0);
	if (transp == NULL) {
		fprintf (stderr, "%s", "cannot create tcp service.");
		exit(1);
	}
	if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) {
		fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp).");
		exit(1);
	}
	svc_run ();
	fprintf (stderr, "%s", "svc_run returned");
	exit (1);
	/* NOTREACHED */
}
Компиляция ServerSQUARE:
$ gcc -o ServerSQUARE ServerSideProc.c square_svc.c square_xdr.c -lprthread -lnsl
Код клиента:
/* 
* ClientSideProc.c
*/
#include  /* for memset */
#include "square.h"
#include 
#include 
#include 
#include 
#include 
#include 
#include 
int 
main (int argc,char **argv)
{
CLIENT *cl;
square_in in;
square_out out;
if (argc != 3 )
{ printf ("Usage : client  \n");
 exit(1);
}
cl = clnt_create(argv[1],SQUARE_PROG,SQUARE_VERS,"tcp");
 if (cl == NULL) {
 clnt_perror (cl, "call failed");
 exit (1);
 }
in.arg1 = atol(argv[2]);
if (squareproc_2(&in,&out,cl) != RPC_SUCCESS)
{
 printf ("%s\n" , clnt_perror (cl,argv[1] ));
 exit(1);
}
printf("result: %ld\n",out.res1);
exit(0);
}
Компиляция ClientSQUARE:
$ gcc -o ClientSQUARE ClientSideProc.c square_clnt.c square_xdr.c -lprthread -lnsl
Далее приведем результаты тестирования (сp. [3] ,Глава "SUN RPC"):
[root@dell4500 SQWMT]# cat square.bsh
./ClientSQUARE dell4500.redhat 10 & ./ClientSQUARE dell4500.redhat 11 & ./ClientSQUARE dell4500.redhat 12 & ./ClientSQUARE dell4500.redhat 21 & ./ClientSQUARE dell4500.redhat 13 & ./ClientSQUARE dell4500.redhat 14 & ./ClientSQUARE dell4500.redhat 15 & ./ClientSQUARE dell4500.redhat 16 & ./ClientSQUARE dell4500.redhat 17 & ./ClientSQUARE dell4500.redhat 18 & ./ClientSQUARE dell4500.redhat 19 & ./ClientSQUARE dell4500.redhat 20 &
Вывод на машине клиента:
[root@dell4500 SQWMT]# ./square.bsh
[root@dell4500 SQWMT]# result: 196
result: 225
result: 256
result: 289
result: 121
result: 144
result: 441
result: 169
result: 100
result: 324
result: 361
result: 400
Вывод на машине сервера:
[root@dell4500 SQWMT]# ./ServerSQUARE
Thread id = '1082453184' started, arg = 14
Thread id = '1090841664' started, arg = 15
Thread id = '1099230144' started, arg = 16
Thread id = '1116941120' started, arg = 17
Thread id = '1125329600' started, arg = 11
Thread id = '1133718080' started, arg = 12
Thread id = '1142106560' started, arg = 21
Thread id = '1150495040' started, arg = 13
Thread id = '1158883520' started, arg = 10
Thread id = '1167272000' started, arg = 18
Thread id = '1175660480' started, arg = 19
Thread id = '1184048960' started, arg = 20
Thread id = '1082453184' is done 196
Thread id = '1090841664' is done 225
Thread id = '1099230144' is done 256
Thread id = '1116941120' is done 289
Thread id = '1125329600' is done 121
Thread id = '1133718080' is done 144
Thread id = '1142106560' is done 441
Thread id = '1150495040' is done 169
Thread id = '1158883520' is done 100
Thread id = '1167272000' is done 324
Thread id = '1175660480' is done 361
Thread id = '1184048960' is done 400