semaforos - procesos en c
Cómo compartir semáforos entre procesos que usan memoria compartida (4)
Escribí una aplicación de muestra donde un padre (productor) engendra N hilos hijo (consumidor) y utiliza una matriz global para comunicar datos entre ellos. La matriz global es una memoria circular y está protegida cuando se escriben datos en la matriz.
Archivo de cabecera:
#define TRUE 1
#define FALSE 0
#define SUCCESS 0
#define FAILURE -1
/*
* Function Declaration.
*/
void Parent( void );
int CalcBitmap(unsigned int Num);
void InitSemaphore( void );
void DeInitSemaphore( void );
int ComputeComplexCalc( int op1, int op2);
/*
* Thread functions.
*/
void *Child( void *arg );
void *Report( void *arg1 );
/*
* Macro Definition.
*/
#define INPUT_FILE "./input.txt"
#define NUM_OF_CHILDREN 10
#define SIZE_CIRCULAR_BUF 256
#define BUF_SIZE 128
//Set the bits corresponding to all threads.
#define SET_ALL_BITMAP( a ) a = uiBitmap;
//Clears the bit corresponding to a thread after
//every read.
#define CLEAR_BIT( a, b ) a &= ~( 1 << b );
/*
* Have a dedicated semaphore for each buffer
* to synchronize the acess to the shared memory
* between the producer (parent) and the consumers.
*/
sem_t ReadCntrMutex;
sem_t semEmptyNode[SIZE_CIRCULAR_BUF];
/*
* Global Variables.
*/
unsigned int uiBitmap = 0;
//Counter to track the number of processed buffers.
unsigned int Stats;
unsigned int uiParentCnt = 0;
char inputfile[BUF_SIZE];
int EndOfFile = FALSE;
/*
* Data Structure Definition. ( Circular Buffer )
*/
struct Message
{
int id;
unsigned int BufNo1;
unsigned int BufNo2;
/* Counter to check if all threads read the buffer. */
unsigned int uiBufReadCntr;
};
struct Message ShdBuf[SIZE_CIRCULAR_BUF];
Código fuente:
/*
#
# ipc_communicator is a IPC binary where a parent
# create ten child threads, read the input parameters
# from a file, sends that parameters to all the children.
# Each child computes the operations on those input parameters
# and writes them on to their own file.
#
# Author: Ashok Vairavan Date: 8/8/2014
*/
/*
* Generic Header Files.
*/
#include "stdio.h"
#include "unistd.h"
#include "string.h"
#include "pthread.h"
#include "errno.h"
#include "malloc.h"
#include "fcntl.h"
#include "getopt.h"
#include "stdlib.h"
#include "math.h"
#include "semaphore.h"
/*
* Private Header Files.
*/
#include "ipc*.h"
/*
* Function to calculate the bitmap based on the
* number of threads. This bitmap is used to
* track which thread read the buffer and the
* pending read threads.
*/
int CalcBitmap(unsigned int Num)
{
unsigned int uiIndex;
for( uiIndex = 0; uiIndex < Num; uiIndex++ )
{
uiBitmap |= 1 << uiIndex;
}
return uiBitmap;
}
/*
* Function that performs complex computation on
* numbers.
*/
int ComputeComplexCalc( int op1, int op2)
{
return sqrt(op1) + log(op2);
}
/*
* Function to initialise the semaphores.
* semEmptyNode indicates if the buffer is available
* for write. semFilledNode indicates that the buffer
* is available for read.
*/
void InitSemaphore( void )
{
unsigned int uiIndex=0;
CalcBitmap(NUM_OF_CHILDREN);
sem_init( &ReadCntrMutex, 0, 1);
while( uiIndex<SIZE_CIRCULAR_BUF )
{
sem_init( &semEmptyNode[uiIndex], 0, 1 );
uiIndex++;
}
return;
}
/*
* Function to de-initilaise semaphore.
*/
void DeInitSemaphore( void )
{
unsigned int uiIndex=0;
sem_destroy( &ReadCntrMutex);
while( uiIndex<SIZE_CIRCULAR_BUF )
{
sem_destroy( &semEmptyNode[uiIndex] );
uiIndex++;
}
return;
}
/* Returns TRUE if the parent had reached end of file */
int isEOF( void )
{
return __sync_fetch_and_add(&EndOfFile, 0);
}
/*
* Thread functions that prints the number of buffers
* processed per second.
*/
void *Report( void *arg1 )
{
int CumStats = 0;
while( TRUE )
{
sleep( 1 );
CumStats += __sync_fetch_and_sub(&Stats, 0);
printf("Processed Per Second: %d Cumulative Stats: %d/n",
__sync_fetch_and_sub(&Stats, Stats), CumStats);
}
return NULL;
}
/*
* Function that reads the data from the input file
* fills the shared buffer and signals the children to
* read the data.
*/
void Parent( void )
{
unsigned int op1, op2;
unsigned int uiCnt = 0;
/* Read the input parameters and process it in
* while loop till the end of file.
*/
FILE *fp = fopen( inputfile, "r" );
while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
{
/* Wait for the buffer to become empty */
sem_wait(&semEmptyNode[uiCnt]);
/* Access the shared buffer */
ShdBuf[uiCnt].BufNo1 = op1;
ShdBuf[uiCnt].BufNo2 = op2;
/* Bitmap to track which thread read the buffer */
sem_wait( &ReadCntrMutex );
SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
sem_post( &ReadCntrMutex );
__sync_fetch_and_add( &uiParentCnt, 1);
uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;
}
/* If it is end of file, indicate it to other
* children using global variable.
*/
if( feof( fp ) )
{
__sync_add_and_fetch(&EndOfFile, TRUE);
fclose( fp );
return;
}
return;
}
/*
* Child thread function which takes threadID as an
* argument, creates a file using threadID, fetches the
* parameters from the shared memory, computes the calculation,
* writes the output to their own file and will release the
* semaphore when all the threads read the buffer.
*/
void *Child( void *ThreadPtr )
{
unsigned int uiCnt = 0, uiTotalCnt = 0;
unsigned int op1, op2;
char filename[BUF_SIZE];
int ThreadID;
ThreadID = *((int *) ThreadPtr);
free( ThreadPtr );
snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );
FILE *fp = fopen( filename, "w" );
if( fp == NULL )
{
perror( "fopen" );
return NULL;
}
while (TRUE)
{
/* Wait till the parent fills the buffer */
if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
{
/* Access the shared memory */
op1 = ShdBuf[uiCnt].BufNo1;
op2 = ShdBuf[uiCnt].BufNo2;
fprintf(fp, "%d %d = %d/n", op1, op2,
ComputeComplexCalc( op1, op2) );
sem_wait( &ReadCntrMutex );
ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
{
__sync_add_and_fetch(&Stats, 1);
/* Release the semaphore lock if
all readers read the data */
sem_post(&semEmptyNode[uiCnt]);
}
sem_post( &ReadCntrMutex );
uiTotalCnt++;
uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;
/* If the parent reaches the end of file and
if the child thread read all the data then break out */
if( isEOF() && ( uiTotalCnt == uiParentCnt ) )
{
//printf("Thid %d p %d c %d/n", ThreadID, uiParentCnt, uiCnt );
break;
}
}
else
{
/* Sleep for ten micro seconds before checking
the shared memory */
usleep(10);
}
}
fclose( fp );
return NULL;
}
void usage( void )
{
printf(" Usage:/n");
printf(" -f - the absolute path of the input file where the input parameters are read from./n/t/t Default input file: ./input.txt");
printf(" -? - Help Menu. /n" );
exit(1);
}
int main( int argc, char *argv[])
{
pthread_attr_t ThrAttr;
pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
unsigned int uiThdIndex = 0;
unsigned int *pThreadID;
int c;
strncpy( inputfile, INPUT_FILE, BUF_SIZE );
while( ( c = getopt( argc, argv, "f:" )) != -1 )
{
switch( c )
{
case ''f'':
strncpy( inputfile, optarg, BUF_SIZE );
break;
default:
usage();
}
}
if( access( inputfile, F_OK ) == -1 )
{
perror( "access" );
return FAILURE;
}
InitSemaphore();
pthread_attr_init( &ThrAttr );
while( uiThdIndex< NUM_OF_CHILDREN )
{
/* Allocate the memory from the heap and pass it as an argument
* to each thread to avoid race condition and invalid reference
* issues with the local variables.
*/
pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
if( pThreadID == NULL )
{
perror( "malloc" );
/* Cancel pthread operation */
return FAILURE;
}
*pThreadID = uiThdIndex;
if( pthread_create(
&ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
{
printf( "pthread %d creation failed. Error: %d/n", uiThdIndex, errno);
perror( "pthread_create" );
return FAILURE;
}
uiThdIndex++;
}
/* Report thread reports the statistics of IPC communication
* between parent and childs threads.
*/
if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
{
perror( "pthread_create" );
return FAILURE;
}
Parent();
uiThdIndex = 0;
while( uiThdIndex < NUM_OF_CHILDREN )
{
pthread_join( ThrChild[uiThdIndex], NULL );
uiThdIndex++;
}
/*
* Cancel the report thread function when all the
* children exits.
*/
pthread_cancel( ThrReport );
DeInitSemaphore();
return SUCCESS;
}
Tengo que sincronizar N procesos de cliente con un servidor. Estos procesos son bifurcados por una función principal en la cual declaro 3 semáforos. Decidí usar semáforos POSIX pero no sé cómo compartirlos entre estos procesos. Pensé que la memoria compartida debería funcionar correctamente, pero tengo algunas preguntas:
- ¿Cómo puedo asignar el espacio correcto de memoria en mi segmento?
- ¿Puedo usar
sizeof(sem_t)
en el camposize_t
deshmget
para asignar exactamente el espacio que necesito? - ¿Alguien tiene algunos ejemplos similares a esta situación?
Realmente dudo que el enfoque de memoria compartida alguna vez funcione.
AFAIK, el semáforo es en realidad solo un identificador, válido para el proceso en el que se abre. La información real del semáforo se encuentra en la memoria del kernel.
Entonces, usar el mismo valor de control en otro proceso no va a funcionar.
Sí, podemos usar el semáforo nombrado entre los dos procesos. Podemos abrir un archivo.
snprintf(sem_open_file, 128, "/sem_16");
ret_value = sem_open((const char *)sem_open_file, 0, 0, 0);
if (ret_value == SEM_FAILED) {
/*
* No such file or directory
*/
if (errno == ENOENT) {
ret_value = sem_open((const char *)sem_open_file,
O_CREAT | O_EXCL, 0777, 1);
}
}
Y otro proceso puede usar esto.
Es fácil compartir semáforos POSIX
nombre
Elija un nombre para su semáforo
#define SNAME "/mysem"
Use
sem_open
conO_CREAT
en el proceso que los creasem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */
Semáforos abiertos en los otros procesos
sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */
Si insistes en usar memoria compartida, ciertamente es posible.
int fd = shm_open("shmname", O_CREAT, O_RDWR);
ftruncate(fd, sizeof(sem_t));
sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
sem_init(sem, 1, 1);
No he probado lo anterior por lo que podría ser completamente loco.