bounded buffer implementation

Hi Experts,

I have a programming assignment that asks us to implement a pipegrep function. it basically has 5 stages and each stage has a thread and buffers are used between stages.

am currently implementing stage 1 . In stage 1 am suppose to read directory and store the filenames in buffer1 shared with stage2 which applies filter like filesize etc,eg: it puts the files having filesize > 4096 into buffer2.I have implemented but am facing problems , am getting desired out put sometime but most of the times it doesnt work as required.

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/types.h>
#include<string.h>
#include<sys/wait.h>
#include<sys/stat.h>
#include<dirent.h>
#include<semaphore.h>
#include<pthread.h>

char *buffer1[10];
char *buffer2[10];

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

sem_t empty,full;
void *stage1();
void *stage2();

int main(){
	pthread_t thrd1,thrd2;
	
	sem_init(&empty, 0, 10);
	sem_init(&full,0,0);
	int tid1 = pthread_create(&thrd1,NULL,&stage1,(void *)0);
	if(tid1 == -1)
	perror("cant create thread \n");
	int tid2 = pthread_create(&thrd2,NULL,&stage2,(void *)0);
    if(tid2 == -1)
    perror("cant create thread \n");
    
     pthread_join(thrd1, NULL);
     pthread_join(thrd2, NULL);
//getchar();
pthread_exit(NULL);

}

void *stage1(){
	//printf("Inside thread 1 \n");
DIR *directory;
struct dirent *dir;
int in;

directory = opendir(".");

in=0;

while((dir=readdir(directory))!=NULL){
	if (strcmp(dir->d_name, ".") == 0|| strcmp(dir->d_name, "..")==0)
               continue;  
   sem_wait(&empty);
   //printf("buffer is not empty \n");
   pthread_mutex_lock(&mutex);
   //printf("thread 1 got access \n");
    /* if got access enter filename into buffer */ 
   buffer1[in]=dir->d_name;
   //printf(" ^%s^ \n",buffer1[in]);
   in=(in+1)%10;
   sem_post(&full);//indicate consumer filename is available
   pthread_mutex_unlock(&mutex);
   }
   
   closedir(directory);

pthread_exit(NULL);;
}


void *stage2(){
	//printf("Inside thread2 also \n");
int out =0 ;
int in = 0;
struct stat sb;
int fsize = 4096;
int s=0;	
	while(s != -1){
	//printf("File = %s ,File size = %d \n",buffer,(int)sb.st_size);
			sem_wait(&full);
			//printf("buffer is full goin ahead\n");
			pthread_mutex_lock(&mutex);
			//printf("thread2 got access \n");
	        s=stat(buffer1[out],&sb);
	        	
    if(sb.st_size > fsize){
				
    buffer2[in] = buffer1[out];
    printf(" file = %s and size = %d \n",buffer2[in],(int)sb.st_size);
    out = (out+1)%10;
    in = (in+1)%10;
   
    sem_post(&empty);
    pthread_mutex_unlock(&mutex);     
}
else {
	out = (out+1)%10;
	pthread_mutex_unlock(&mutex);
}
}


pthread_exit(NULL);
}

O/p:

ameya@ameya-Dell-System-Inspiron-N4110:~/os_concepts/Advanced_shell$ ./listfiles
 file = advanced_shell.c~ and size = 11141 
 file = advanced_shell and size = 16693 
 file = listfiles and size = 7919 
 file = advanced_shell.c and size = 10840 

program stuck here, i have closed the directory as well 

Please check and provide u r feedback.

Regards,
Ameya

Issue might be with the mutex and semaphores I guess. Why don't you try conditional var in mutex? You can signal when a directory is read into the buffer which can then be processed by the stage2.

--ahamed

If you answer the following question for me, I am pretty sure you will find at least the semaphore issue yourself:

What happens to the "full" semaphore when you find a file, and the file size is greater than 4096?

Also, you need to check what the pointers you're putting into your buffers actually point to....

The full semaphore increments (sem_post) when a file is found after which the stage 2 tries to lock the mutex and stats the file for checking the size.

Hi amejoish,
the main issue in your code is

while(s != -1)

Here you are waiting for the next file accd to the previous stat output. Its not a correct way as after the last file stage1 thread will stop and the stage2 thread will be in wait state forever. Change the condition, then your code will work fine.

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/types.h>
#include<string.h>
#include<sys/wait.h>
#include<sys/stat.h>
#include<dirent.h>
#include<semaphore.h>
#include<pthread.h>

char *buffer1[10];
char *buffer2[10];

int in =0;
int out =0 ;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

sem_t empty,full;
void *stage1();
void *stage2();

int main(){
	pthread_t thrd1,thrd2;
	
	sem_init(&empty, 0, 10);
	sem_init(&full,0,0);
	int tid1 = pthread_create(&thrd1,NULL,&stage1,(void *)0);
	if(tid1 == -1)
	perror("cant create thread \n");
	int tid2 = pthread_create(&thrd2,NULL,&stage2,(void *)0);
    if(tid2 == -1)
    perror("cant create thread \n");
    
     pthread_join(thrd1, NULL);
     pthread_join(thrd2, NULL);
//getchar();
pthread_exit(NULL);

}

void *stage1(){
	//printf("Inside thread 1 \n");
DIR *directory;
struct dirent *dir;
int in = 0;

directory = opendir(".");



while((dir=readdir(directory))!=NULL){
	if (strcmp(dir->d_name, ".") == 0|| strcmp(dir->d_name, "..")==0)
               continue;  
   sem_wait(&empty);
   printf("buffer is not empty \n");
   pthread_mutex_lock(&mutex);
   //printf("thread 1 got access \n");
    /* if got access enter filename into buffer */ 
   buffer1[in]=dir->d_name;
   printf(" %s in buffer 1 now \n",buffer1[in]);
   in=(in+1)%10;
   pthread_mutex_unlock(&mutex);
   sem_post(&full);//indicate consumer filename is available
      }
   
   closedir(directory);

pthread_exit(NULL);
}


void *stage2(){
	//printf("Inside thread2 also \n");
int in2 = 0;

struct stat sb;
int fsize = 4096;
	
	while(1){
	//printf("File = %s ,File size = %d \n",buffer,(int)sb.st_size);
			sem_wait(&full);
			printf("buffer is full goin ahead\n");
			pthread_mutex_lock(&mutex);
			//printf("thread2 got access \n");
	        if(stat(buffer1[out],&sb)==-1){
				pthread_mutex_unlock(&mutex);
			    break;
		   }	
    else if((int)sb.st_size > fsize){
				
    buffer2[in2] = buffer1[out];
    buffer1[out] = NULL;
    printf(" file = %s and size = %d \n",buffer2[in2],(int)sb.st_size);
    in = (in+1)%10;
    out = (out+1)%10;
    
   pthread_mutex_unlock(&mutex);
   printf("thread 2 unlocked mutex \n");
   sem_post(&empty);
     
}else{
	out = (out+1)%10;
	pthread_mutex_unlock(&mutex);

}
}


pthread_exit(NULL);
}

i changed the condition but i think i am missing something.

Regards,
Ameya

Is this a classroom assignment?

yes it is an assignment !

Then post in the homework forum using the homework forum rules as you agreed to do when you registered. That's the only way we can help you.

Actually its a project not a homework assignment

Hi amejoish,
nice try, now in stage1 fn send some data in buffer1 after closing the directory (like "No more files") and in stage2 fn break the loop whenever u get this line. Be careful that all ur locks are removed correctly before coming out of loop.
All the best...

HI Sivasankar,

Additionally i added sleep funtion , am delayin both the threads for random no of time and its working exactly the way i want it.

Now am implementing next 2 stages
stage3: read lines from file and store in buffer
stage4: find a substr in the line and then store that line in buffer2

even in this case after introducing sleep function , the output is proper. But whats weird is am facing race conditn even after using mutexes.

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/types.h>
#include<string.h>
#include<sys/wait.h>
#include<sys/stat.h>
#include<dirent.h>
#include<semaphore.h>
#include<pthread.h>
#include<fcntl.h>

char **buffer3,**buffer4;

sem_t empty,full;
int in=0;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

void *stage3();
void *stage4();

int main(){
pthread_t thrd1,thrd2;

buffer3 = (char **)malloc(10);
buffer4 = (char **)malloc(10);

pthread_mutex_init(&mutex,NULL);
sem_init(&empty,0,10);
sem_init(&full,0,0);

int tid1 = pthread_create(&thrd1,NULL,&stage3,(void *)0);
	if(tid1 == -1)
	perror("cant create thread \n");
	int tid2 = pthread_create(&thrd2,NULL,&stage4,(void *)0);
    if(tid2 == -1)
    perror("cant create thread \n");
    
    pthread_join(thrd1, NULL);
     pthread_join(thrd2, NULL);
//getchar();

sem_destroy(&empty);
sem_destroy(&full);
exit(0);

}

void *stage3(){
	
	int r;
	char file[] = "sample.txt";
	char str[256];
	FILE *fd;
	fd = fopen(file,"r");
	
	while(1){
		
		r = random()%30000;
        usleep(r);
     
		sem_wait(&empty);
		pthread_mutex_lock(&mutex);
	
	    if(fgets(str,sizeof(str),fd)==NULL)
	    pthread_exit(NULL);
	    //printf("str = %s \n",str);
	         str[strlen(str)-1] = '\0';
	         buffer3[in] = str;
	         //printf("\n buffer3in[%d] = %s\n",in,buffer3[in]);
	         in = (in+1)%10;	           
	    pthread_mutex_unlock(&mutex);
	    sem_post(&full);		    	
	}
	printf("\n buffer3in[3] = %s\n",buffer3[3]);
	pthread_exit(NULL);
}

void *stage4(){
	//printf("In stage 4 \n");
	char *str2 = "asd";
	int out = 0;
	int in2 = 0;
	int r;
	
	while(1){
		r = random()%30000;
        usleep(r);
        //printf("buffer is not full \n");
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        //printf(" comapring $$ buffer3out[%d] %s $$  \n",out,buffer3[out]);    
        if(strstr(buffer3[out],str2)){
			buffer4[in2] = buffer3[out];
			printf(" \n buffer4 = %s \n",buffer4[in2]);
			//buffer3[out] = NULL;
			in2 = (in2+1)%10;
			

		}
		
		out = (out+1)%10;
		
		pthread_mutex_unlock(&mutex);
		sem_post(&empty);
	}
	
	pthread_exit(NULL);
	
}

Check the output:

ameya@ameya-Dell-System-Inspiron-N4110:~/os_concepts/Advanced_shell$ ./read

 buffer3in[0] = cat deer dog bando moron bahavar.
 comapring $$ buffer3out[0] cat deer dog bando moron bahavar. $$  

 buffer3in[1] = asdsdas
 comapring $$ buffer3out[1] asdsdas $$  

 buffer3in[2] = asdsdasbhafd

 buffer3in[3] = asdsgetg

 buffer3in[4] = adgrg
 comapring $$ buffer3out[2] adgrg $$  
 comapring $$ buffer3out[3] adgrg $$  

if u see the stage 3 has inserted buffer[3](in=3) = asdgetg but the out pointer used by stage4
buffer[3] (i.e out =3) value is adgrg which is weird. Increasing the delay time for thread 1 somehow fixes the problem.

If i dont use the sleep then output is entirely messed up.

ameya@ameya-Dell-System-Inspiron-N4110:~/os_concepts/Advanced_shell$ ./read

 buffer3in[0] = cat deer dog bando moron bahavar.

 buffer3in[1] = asdsdas

 buffer3in[2] = asdsdasbhafd

 buffer3in[3] = asdsgetg

 buffer3in[4] = adgrg
 comapring $$ buffer3out[0] adgrg $$  
 comapring $$ buffer3out[1] adgrg $$  
 comapring $$ buffer3out[2] adgrg $$  
 comapring $$ buffer3out[3] adgrg $$  
 comapring $$ buffer3out[4] adgrg $$  

 buffer3in[5] = grgdfdsfds

Can you please explain this concept or how come not delaying threads messes up the output

Check the pointer concepts once.

buffer3 = (char **)malloc(10);
buffer4 = (char **)malloc(10);

Here you are creating a pointer to pointer, which can hold the address of a char pointer, later you are copying the data into str

fgets(str,sizeof(str),fd)==NULL

and storing str address in respective location.

buffer3[in] = str;

So, buffer3[in] will hold the addr of str everytime and you are changing the data in it everytime.

Yes , you are right but i am modifying the 'in' value as well, so it should work correctly right?

no my friend,
it won't work as u expected. Try this

void *stage3(){
	
	int r;
	char file[] = "sample.txt";
	char *str;
	FILE *fd;
	fd = fopen(file,"r");
	
	while(1){
		
		r = random()%30000;
        usleep(r);
     
		sem_wait(&empty);
		pthread_mutex_lock(&mutex);
	
	    str=(char*)malloc(256*sizeof(char));
	    if(fgets(str,sizeof(str),fd)==NULL)
	    pthread_exit(NULL);
	    //printf("str = %s \n",str);
	         str[strlen(str)-1] = '\0';
	         buffer3[in] = str;
	         //printf("\n buffer3in[%d] = %s\n",in,buffer3[in]);
	         in = (in+1)%10;	           
	    pthread_mutex_unlock(&mutex);
	    sem_post(&full);		    	
	}
	printf("\n buffer3in[3] = %s\n",buffer3[3]);
	pthread_exit(NULL);
}

and don't forget to release the memory in stage4.