Links Powered by MicMac
Cocoa-C-Porting
Fink's translation team
Mac OS X, Fink, and Unix
Client/Server Architecture
PowerPC Architecture - UML
System programming
Merise
Pierre-Loïc and Pascaline's wedding

Programming System 06

Last modified: 22/04/2004 05:33:43 CEST
Author: Michèle Garoche contact

Contents - Process scheduling - Fork and Wait - Pthread and Exec - Exec - Pipes - Message queues - Memory management - Paging and segmentation - Synchronization (1) - Synchronization (2) - File System Management - Socket Programming (1) - Revision

Programming System

Message queues

Tutorial class by Philippe Décogné

30/11/2002 (dd/mm/yy)

Message queues

This part does not apply to XDarwin as message queues are handled quite differently on Mac, therefore the code in this page has not been tested.

Message queues are part of IPC (Interprocess Communication) as well as pipes, signals, semaphores, shared memory, and sockets, and their mechanisms are external to the file management system. The system handles a specific table for each of those tools. UNIX mailboxes are based on this mechanism.

Characterictics of message queues

Constitutive parts of a message

Message structure examples

struct message { long type; char real_message[40]; }

struct message_buf { long type; float index; char message_name[15]; }

struct mini_message { long type; }

Primitives to be used

Msgget: int msgget(key_t key, int msgflg)

Returns the identifier of a message queue associated with key or creates a new message queue associated with key.

key is a unique positive integer.

msgflg is constructed by or'ing the constants IPC_CREAT, IPC_EXCL, and permission bits to the message queue.

Returns the identifier of the message queue associated with key as follows:

If key does not match an existing message queue and IPC_CREAT is set in msgflg, a new message queue associated with key is created with permissions bits as defined in msgflg.

If key does not match any existing message queue and IPC_CREAT is not set, an error occurs.

If key matches an existing message queue and IPC_EXCL is set, an error occurs.

If key matches an existing message queue and IPC_EXCL is not set, returns the identifier of the existing message queue.

In any case, if IPC_PRIVATE is set, creates a new message queue associated with key (local communication within a process).

Returns -1 if it fails.

The permission bits set in msgflg are the same as those used for files. They are given in octal:

Type U (user) G (group) O (other)
read 4 4 4
write 2 2 2
execute 1 1 1
  7 7 7

Necessary call prior to any use of message queue.

msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0660);

Libraries to be used: sys/types.h, sys/ipc.h, sys/msg.h

Man page: msgget(3) - subroutines.

Msgsnd: int msgsnd(int msqid, void *msgp, size_t msgsz, int msgflg

Sends a message to the message queue msqid. msgp is a pointer to a structure containing the message.

The structure is made of two members:

long mtype;
char mtext[msgsz];

where mtype is a positive integer that can be used to choose the message,
and mtext is a bytes array of size msgsz less or equal to the system limit MSGMAX.

msgflg can be IPC_NOWAIT or 0 and determines the action to be taken in case of system limits overflow (max bytes in a message queue or max number of open message queues).

If IPC_NOWAIT is not set, then the function returns immediately, otherwise the call will block until:

the overflow of limit(s) does not exist anymore
the message queue is removed
the calling process receives a signal

Libraries to be used: sys/types.h, sys/ipc.h, sys/msg.h

Man page: msgsnd(3) - subroutines.

Msgrcv: int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)

Receives a message from the message queue identified by msqid and places it in the structure pointed to by msgp.

The structure has two members:

long mtype;
char mtext[msgsz];

where mtype is a positive integer that can be used to select the message,
and mtext is a bytes array of size msgsz less or equal to the system limit MSGMAX.

The message type msgtyp determines which message is extracted from the message queue:

if msgtyp is positive, the oldest message of type msgtyp is extracted
if msgtyp is null, the oldest message whatever type it has is extracted
if msgtyp is negative, the oldest message of type less or equal to msgtyp is extracted

Examples:

Caption

• 1 est le plus ancien: 1 is the oldest one

• 6 est le plus récent: 6 is the newest one

Progsys 06-01

If type = 4, message 2, which is the oldest message of type 4, is extracted

If type = 0, message 1, which is the oldest message sent (of type 2), is extracted

If type = -3, message 3, which is the oldest message sent of type less or equal to 3, i.e. of type 1, is extracted .

Therefore a well-chosen negative type allows you to extract an urgent message.

msgflg is either IPC_NOWAIT or 0 and determines the action to be taken if no message of requested type exists in the message queue.

If IPC_NOWAIT is set, then the call returns immediately with -1 as a return value, otherwise the calling process is blocked until:

it receives a message of requested type
the message queue is removed, return code -1
the calling process receives a signal, return code -1

Libraries to be used: sys/types.h, sys/ipc.h, sys/msg.h

Man page: msgrcv(3) - subroutines.

Msgctl: int msgctl(int msqid, int cmd, struct msqid_ds *buf)

Controls the message queue specified by msqid.

Each message queue has a data structure associated to it which may be altered by msgctl and determines which action will be performed by msgctl.

The data structure is defined in sys/msg.h and contains, amongst others, the following members:

message queue permission bits
first message in the queue
last message in the queue
number of bytes in use in the queue
number of messages in the queue
maximum number of bytes in the queue
pid of last msgsnd()
pid of last msgrcv()
time of last msgsnd()

The actions to be performed on the message queue are specified in cmd and may be one the following ones:

IPC_STAT: gather information about the message queue and put them in buf.

IPC_SET: change the permission bits and number of bytes allowed in the queue according to those specified in buf. Permissions bits changes only may be performed by root or by a process which has an effective user id equals to one of those specified in the data structure associated with the message queue. Changes in number of bytes allowed in the message queue may only be performed by root and is limited by the system limit (the system silently replaces any value greater than system limit with this limit).

IPC_RMID: remove the message queue and delete all data associated with it. Only root user or a process which has an effective user id equals to one of those specified in the data structure associated with the message queue may perform this action.

Returns 0 in case of success, otherwise -1.

Libraries to be used: sys/types.h, sys/ipc.h, sys/msg.h

Man page: msgctl(3) - subroutines.

Exercise 1

Let A and B be two processes which communicate via a message queue with key 17.

Process A creates the message queue, then writes the message "ceci est un message" (this is a message) for the process B.

Process B reads the message and displays it, then deletes the message queue.

Write the corresponding programs.

Solution

Analysis

The data structure associated with the message queue will contain a type long and the message. As only one message is sent, it will be sufficient to define a positive type, say 12. As for the message, it will be a character string of length 20 (length of the message plus end of string bit).

The message queue should be created so that read/write permission bits are set to the process owner and the process group, that's to say 0660.

This code has not been tested on Mac, as message queues are handled quite differently on this system. Moreover no error are handled, in other words that is only rough work.

Program msgessai1.c

/* msgessai1.c

works with msgessai2.c

Create a message queue to send the following message "ceci est un message" to msgessai2.c.
*/

/* Includes */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

/* Defines */
#define cle 17

/* Global variables */
int msqid;
struct msgbuf_EXD
{
   long mtype;
   char mtext[20];
} msgp;

int main()
{
   /* Create the message queue */
   msqid = msgget(cle, IPC_CREAT|IPC_EXCL|0660);
   
   /* Fill in the structure */
   msgp.mtype = 12;
   strcpy(msgp.mtext, "ceci est un message");
   
   /* Send the message */
   msgsnd(msqid, &msgp, strlen(msgp.mtext), 0);
   
   return 0;
}

Program msgessai2.c

/* msgessai2.c

works with msgessai1.c

Retrieve the message msgessai1.c sent to the message queue, display it and kill the message queue
*/

/* Includes */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>

/* Defines */
#define cle 17

/* Global variables */
int msqid;
struct msgbuf_EXD
{
   long mtype;
   char mtext[20];
} msgp;

int main()
{
   /* Retrieve the message queue id */
   msqid = msgget(cle, 0);
      
   /* Read the message */
   msgrcv(msqid, &msgp, 19, 12, 0);
   
   /* Kill the queue */
   msgctl(msqid, IPC_RMID, NULL);

   /* Display the message */
   printf("message *s", msgp.mtext);

   return 0;
}

Exercise 2

Consider the following software architecture:

Caption

• serveur: server

• envoie n nombre aléatoires: send n random numbers

• envoie n, n alétoire: send n, n is a random number

Progsys 06-02

A server is waiting for client requests.

A question is made of a request for n random numbers within a range 1, NMAX. The server draws lots for those numbers.

The client draws lots for n and sends n to the server.

Write the corresponding programs.

See rand (3) et srand(3) about pseudo-random numbers generation.

Solution

Analysis

Message queue identification: define the key. For example, 17. This is not very wise. It would be better to define the key with the process id, so that the key is unique and no other process can intercept the message.

Define the data structure of the message (requests and answers):

Request structure:

As each client sends the same message, we define a unique structure for all requests.

The server will extract the messages at arrival time, as there is no dependancy between clients. Therefore, the type will be 1.

To give the server the ability to perform the request, it should know the random number n generated by the client. Therefore, we will put an int n in the message body.

The server has to send the requesting client the n random numbers generated. To make it possible, it will be sufficient to put the client process id in the message body.

The messages sent by the clients will contain an option IPC_NOWAIT, so that each client be blocked till it receives the answer from the server.

Structure of answers:

We will create a message queue with read/write permission bits for any, i.e. 0666, so that each client be able to read the message sent by the server.

The type of the messages will be the process id sent in the client message, so that only the requesting client be able to read the message.

The message will contain an array of n random numbers.

The messages sent by the server will contain an option equal to 0, so that the server will not be blocked awaiting for requests.

To avoid typing the same code twice, we will use a header file dataessai.h which will be included in the server code source and the client code source.

This code has not been tested on Mac, as message queues are handled quite differently on this system. Moreover, there is no error handling, there is an infinite loop in the server code source, and so on.. In other words, it should be deeply revised.

Header dataessai.h

struct requete
{
   long type;
   int nb_tir;
   int destinataire;
};

struct reponse
{
   long type;
   int tab[NMAX];
};

Program msgclientessai.c

/* msgclientessai.c

works with msgserveressai.c and dataessai.h

Generate n and send a request to msgserveressai.c to obtain n random numbers.
*/

/* Includes */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stlib.h>
#include "dataessai.h"


/* Defines */
#define cle 17

/* Global variables */
int msqid;
struct requete req;
struct reponse rep;

int main
{
   /* Retrieve the id of message queue */
   msqid = msgget(cle, 0);
   
   /* Seed the pseudo-random numbers */
   srand(time(NULL));
   
   /* Build the question*/
   req.destinataire = getpid();
   req.type = 1;
   req.nb_tir = rand() % NBMAX;
   
   /* Write the question */
   msgsnd(msqid, &req, 2 * sizeof(int) , 0);
   
   /* Read the answer */
   msgrcv(msqid, &rep, sizeof(int) * req.nb_tir, getpid(), 0);
   
   return 0;
}

Program msgserveressai.c

/* msgserveressai.c

works with msgclientessai.c and dataessai.h

Generate n random numbers and send them to the client which has previously sent n to it.
*/

/* Includes */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stlib.h>
#include "dataessai.h"


/* Defines */
#define cle 17

/* Global variables */
int msqid;
struct requete req;
struct reponse rep;

int main
{
   /* Create the message queue */
   msqid = msgget(cle, IPC_CREAT|IPC_EXCL|0666);
   
   /* Seed the pseudo-random numbers */
   srand(time(NULL));
   
   /* Infinite loop */
   while (true)
   {
      /* Wait for the client question */
      msgrcv(msqid, &req, 2 * sizeof(int), 1, 0);
   
      /* Build the answer */
      rep.type = req.destinataire;
      
      /* Build the int array to be inserted in the body of the answer /
      for (int i = 0; i < req.nb_tir; i++)
      {
         rep.tab[i] = rand();
      }
      
      /* Write the answer */
      msgsnd(msqid, &rep, sizeof(int) * req.nb_tir, 0);
   }
   
   return 0;
}

Exercise 3

Same as exercise 2, but this time we use pipes to send messages.

Give the changes in architecture.

Write the code for two clients.

Solution

Analysis

We need to fork two children (the clients), the parent will be the server.

For each client communicating with the server, we need two pipes: one for reading, another for writing.

Therefore, we create two pipes in the parent process:

Progsys 06-03

Then we fork the first child, the pipes' descriptors are duplicated, so we close the unused ends of pipes both in child and parent processes.

Progsys 06-04

Thereafter we create another two pipes in the parent process.

Progsys 06-05

Finally we fork the second child, the pipes' descriptors are duplicated, so we close the unused ends of pipes both in second child process and parent process.

Progsys 06-06

We also need a way to wait for questions from clients on the server, so that we will not block the server (reading is blocking). Therefore we will use select with a set of descriptors for reading from pip21[0] and pip1[0].

It could be that under some circumstances the pipes limits be reached, so that the results are not displayed correctly.

Program tirage.c

/* tirage.c */
/*
A client sends a random integer n to a server.
The server replies with n random integers.
There are two clients. Clients and server communicate via pipes and server uses select to allow the kernel wake it up when pipes' read ends are ready.

/* Includes */
#include <sys/types.h>	/* for pid_t, fd_set, FD_ZERO, FD_SET, FD_ISSET */
#include <unistd.h>	/* for pipe, getpid, fork, close, select, read, write */
#include <stdlib.h>	/* for srandom, random */
#include <string.h> /* for bzero */
#include <stdio.h>	/* for printf */

/* Defines */
#define NMAX 20	/* used for random number generation */
#define MGMAX(a,b) (((a) > (b)) ? (a) : (b)) /* simple max function */

/* Global variables */
int question;	/* number sent by client */
int reponse[NMAX]; /* array of numbers sent by server */

int main()
{
	int pip1[2], pip2[2], pip21[2], pip22[2]; /* file descriptors for pipes */
	int ind;	/* counter for loops */
	int maxpip; /* number of descriptors to test for select */
	pid_t pid1, pid2;	/* file descriptors for child processes */
	fd_set lire; /* descriptor set for select */
	
	/* Creates the pipes */
	if ( pipe(pip1) == -1)
	{
		printf("Error while creating pip1\n");
		exit(EXIT_FAILURE);
	}
	if ( pipe(pip2) == -1)
	{
		printf("Error while creating pip2\n");
		exit(EXIT_FAILURE);
	}
	
	/* Generate a random number based on parent's file descriptor */
	srandom(getpid());
	
	/* Fork the first child */
	if ((pid1 = fork())== -1)
	{
		printf("Error while forking the first child\n");
		exit(EXIT_FAILURE);
	}
	
	/* We are in the parent process */
	if (pid1 != 0)
	{
		/* Close the unused ends of pipes */
		if ( close(pip1[1]) == -1 )
		{
			printf("Error while closing pip1[1]\n");
		}
		if ( close(pip2[0]) == -1 )
		{
			printf("Error while closing pip2[0]\n");
		}
		
		/* Create new pipes for children */
		if ( pipe(pip21) == -1 )
		{
			printf("Error while creating pip21\n");
			exit(EXIT_FAILURE);
		}
		if ( pipe(pip22) == -1 )
		{
			printf("Error while creating pip22\n");
			exit(EXIT_FAILURE);
		}
		
		/* Fork the second child */
		if ( (pid2 = fork()) == -1)
		{
			printf("Error while creating the second child process\n");
			exit(EXIT_FAILURE);
		}
		
		/* We are in the parent process */
		if (pid2 != 0)
		{
			/* Close unused ends of pipes */
			if ( close(pip21[1]) == -1)
			{
				printf("Error while closing pip21[1]\n");
			}
			if ( close(pip22[0]) == -1 )
			{
				printf("Error while closing pip22[0]\n");
			}
			
			/* Reset the descriptors set for select */
			FD_ZERO(&lire);
			
			/* Infinite loop to read questions from children and sending answers */
			for(;;)
			{				
				/* Set the bits in descriptors set to catch either first or second child process 
				sending values to parent process */
				FD_SET(pip1[0], &lire);
				FD_SET(pip21[0], &lire);
				
				/* Compute the maximum range of descriptors to test */
				maxpip = MGMAX(pip1[0], pip21[0]) + 1;
				
				/* Wake up parent when any descriptor in descriptor set is ready */
				select (maxpip, &lire, NULL, NULL, NULL);
				
				/* Read from pip1[0] if ready and write answer to pip2[1] */
				if (FD_ISSET(pip1[0],&lire))
				{
					read(pip1[0], &question, sizeof(int));
					
					/* Generate answer */
					for (ind = 0; ind < question; ind++)
					{
						reponse[ind] = random();
					}
					
					write(pip2[1], &reponse, question*sizeof(int));
				}
				
				/* Read from pip21[0] if ready and write answer to pip22[1] */
				if (FD_ISSET(pip21[0],&lire))
				{
					read(pip21[0], &question, sizeof(int));
					
					/* Generate answer */
					for (ind = 0; ind < question; ind++)
					{
						reponse[ind] = random();
					}
					
					write(pip22[1], &reponse, question*sizeof(int));
				}
			}
		}
		else
		/* We're in the second child process */
		{
			/* Close unused ends of pipes */
			if (close(pip21[0]) == -1)
			{
				printf("Error while closing pip21[0]\n");
			}
			if (close(pip22[1]) == -1)
			{
				printf("Error while closing pip22[1]\n");
			}
			if (close(pip1[0]) == -1)
			{
				printf("Error while closing pip1[0]\n");
			}
			if (close(pip2[1]) == -1)
			{
				printf("Error while closing pip2[1])\n");
			}
			/* Infinite loop for sending question and read answer */
			for (;;)
			{
				/* Generate question */
				question = 1 + random() % NMAX;
				
				/* Write question into pip21[1] */
				write(pip21[1], &question, sizeof(int));
				
				/* Read answer from pip22[0] */
				read(pip22[0], &reponse, question*sizeof(int));
				
				/* Print answer to sdtout */
				for (ind = 0; ind < question; ind++)
				{
					if (ind == 0)
					{
						printf("Second child, your %d number(s):\n", question);
					}
					
					if ((ind % 4) == 0 && ind != 0)
					{
						printf("\n");
					}
					printf("%d\t", reponse[ind]);
				}
				printf("\n");
			}
		}
	}
	else
	/* We are in the first child process */
	{
		/* Close unused ends of pipes */
		if (close(pip1[0]) == -1)
		{
			printf("Error while closing pip1[0]\n");
		}
		if (close(pip2[1]) == -1)
		{
			printf("Error while closing pip2[1]\n");
		}
		
		/* Infinite loop for sending question and reading answer */
		for (;;)
		{
			/* Generate question */
			question = 1 + random() % NMAX;
			
			/* Send question to parent */
			write(pip1[1], &question, sizeof(int));
			
			/* Read answer from parent */
			read(pip2[0], &reponse, question*sizeof(int));
			
			/* Print answer to stdout */
			for (ind= 0; ind < question; ind++)
			{
				if (ind == 0)
				{
					printf("First child, your %d number(s):\n", question);
				}
				
				if ((ind % 4) == 0 && ind != 0)
				{
					printf("\n");
				}
				printf("%d\t", reponse[ind]);
			}
			
			printf("\n");
		}
	}
	
	return 0;
}

Contents - Process scheduling - Fork and Wait - Pthread and Exec - Exec - Pipes - Message queues - Memory management - Paging and segmentation - Synchronization (1) - Synchronization (2) - File System Management - Socket Programming (1) Revision

Powered by Apache/1.3.41 (Darwin) PHP/4.4.8 on Mac OS X bluefish distributed.net Cssed icon Conglomerate icon Valid HTML 4.0.1 Valid CSS
Local date (dd/mm/yyyy): 29/08/2008 05:58:28 CEST