I revised my code. Now the output is:
user@user-desktop:~/Desktop/gossip$ ./a.out
mutex_nodes initialized..path_mutex initialized..nodes[0].f_lock initialized..nodes[1].f_lock initialized..nodes[2].f_lock initialized..nodes[3].f_lock initialized..nodes[4].f_lock initialized..nodes[5].f_lock initialized..nodes[6].f_lock initialized..nodes[7].f_lock initialized..thread started..Thread 0 Activated by -1
Message Delivered to 0 from -1 : returning true..loop 0
loop 1
thread started..Thread 1 Activated by 0
loop 2
thread started..Thread 2 Activated by 0
loop 3
thread started..Thread 3 Activated by 0
loop 4
loop 5
loop 6
loop 7
Here, node 0 finds its neighbours, searches them and creates a thread for each of them. Any of them threads 1, 2 and 3
start executing, but they will wait for the mutex to get unlocked and passed to them. returning false.. here down
means that each node 1 , 2 and 3 will first check whether their sender node 0 has alrady disseminated the message to all its neighbours or not (here, each node 1 , 2 and 3 will first
check if 0 has already disseminated the message to its neighbours). Here, we get false, because each thread would
get blocked behind a mutex and waits untill it gets unlocked.
So, they can't set their msg_received flag to true earlier.
Message Delivered to 1 from 0 : returning false..
Message Delivered to 2 from 0 : returning false..
Message Delivered to 3 from 0 : returning true..loop 0
loop 1
loop 2
loop 3
loop 4
thread started..Thread 4 Activated by 3
loop 5
loop 6
loop 7
Message Delivered to 4 from 3 : returning true..loop 0
loop 1
loop 2
loop 3
loop 4
loop 5
thread started..Thread 5 Activated by 4
loop 6
loop 7
thread started..Thread 7 Activated by 4
Message Delivered to 5 from 4 : returning false..
Message Delivered to 7 from 4 : returning true..loop 0
loop 1
loop 2
loop 3
loop 4
loop 5
loop 6
thread started..Thread 6 Activated by 7
loop 7
Message Delivered to 6 from 7 : returning true..loop 0
loop 1
loop 2
loop 3
loop 4
loop 5
loop 6
loop 7
1 0 Purple 1 2 3
1 1 Purple
1 2 Purple
1 3 Purple 0 1 2 4
1 4 Purple 1 3 5 7
1 5 Purple
1 6 Purple 5 7
1 7 Purple 4 5 6
Exiting Main()..
The code is as following:
#include<iostream>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<list>
#include<pthread.h>
#include<cstring>
#include<stdarg.h>
#include<unistd.h>
using namespace std;
static int round;
#define checkResults(string, val){ \
if(val){ \
print("failed with %d at %s", val, string); \
exit(1); \
} \
} \
#define MSG_SIZE 7
#define _NODES 8
int print(const char *, ...);
typedef struct{ // thread parameter
int id;
int rcvd_from;
string msg;
}thread_param_t;
struct node // node structure
{
pthread_mutex_t f_lock;
thread_param_t packet;
vector<int> edges;
bool msg_received;
} nodes[_NODES];
pthread_t callThd[_NODES]; //keeps track of all thread IDs
pthread_mutex_t list_lock; //locks the array nodes[], which stores node structures
pthread_mutex_t path_mutex; //locks the path array--we don't need it as we always read the path
pthread_cond_t next_round_ready = PTHREAD_COND_INITIALIZER; //statically initializing a condition variable-- it assures the message is sent
// to all other sender's neighbours before one of the neighbours tries to find it's surrounding neighbours
bool msg_sent_to_all(int , int ); // it looks whether the message is sent to all sender's surrounding neighbours
//---------------
int path[_NODES][_NODES] = {
{ 0, 1, 1, 1, 0, 0, 0, 0 },
{ 1, 0, 0, 1, 1, 0, 0, 0 },
{ 1, 0, 0, 1, 0, 1, 0, 0 },
{ 1, 1, 1, 0, 1, 0, 0, 0 },
{ 0, 1, 0, 1, 0, 1, 0, 1 },
{ 0, 0, 1, 0, 1, 0, 1, 1 },
{ 0, 0, 0, 0, 0, 1, 0, 1 },
{ 0, 0, 0, 0, 1, 1, 1, 0 }
}; // keeps the sparse graph
//---------------
void *funcThd(void *arg) //Thread function, the argument is the structure defined above (thread_param..)
{
print("thread started..");
pthread_attr_t attr;
struct timespec t_req, t_rem;
string message;
int index, rc, received_from, neighbours_size;
t_req.tv_sec = 1;
t_req.tv_nsec = 500;
rc = pthread_attr_init(&attr);
checkResults("pthread_attr_init()\n", rc);
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // setting detached as our thread attributes, this way none of the calling threads
//expect the caller thread to wait for them-- so, no need for pthread_join(..)
checkResults("pthread_attr_setdetachstate()\n", rc);
thread_param_t *p = (thread_param_t *) arg;
index = p -> id;
message = p -> msg;
received_from = p -> rcvd_from;
print("Thread %d Activated by %d\n", index, received_from);
//pthread_mutex_lock(&path_mutex); we don need to lock the path as we always read it
pthread_mutex_lock(&list_lock); //lock the list of nodes
pthread_mutex_lock(&nodes[index].f_lock); //lock the particular node in which the message has been sent to, this lock exists in every node structure
nodes[index].msg_received = true; //node got the message
nodes[index].packet.id = index; //index of the node
nodes[index].packet.msg = message; //message content
neighbours_size = nodes[received_from].edges.size(); //number of neighbours staying adjacent to the node
print("Message Delivered to %d from %d : ", index, received_from);
//for optimality, for each round the calling threads are not gonna start finding their neighbouring nodes BEFORE the message is sent to all
//the neighbours of the calling threads, for example is message is sent from 0 to 1, 1 is not gonna send the message to its neighbours 1, 3, 4
// before 0 sends the message to all its neighbours first 1, 3, 5 -- so, we wouldn't have redundant threads
while((msg_sent_to_all(received_from, neighbours_size) == false))
pthread_cond_wait(&next_round_ready, &list_lock);
//the condition variable is locked by the node list mutex, it will then automaticly get unlocked whenever the condition satisfies
// that is when the message is sent to all the neighbours
for(int i = 0; i < _NODES; i++)
{
print("loop %d\n", i);
if(path[index])
{
nodes[index].edges.push_back(i); //finding neighbours from path matrix and push them back to the nodes edge property
if(nodes.msg_received == false) // if the msg is not sent to the node, then try sending it
{
p -> id = i;
p -> msg = message;
p -> rcvd_from = index;
//print("Sending Message to %d from %d \n", i, index);
pthread_create(&callThd, &attr, funcThd, (void *) p); // calling the thread
nanosleep(&t_req, &t_rem);
}
}
round++;
}//end of for
pthread_mutex_unlock(&nodes[index].f_lock); //unlocking, node structure lock
pthread_mutex_unlock(&list_lock); //unlocking node list lock
//pthread_mutex_unlock(&path_mutex);
pthread_exit(NULL);
}
//-----------------
bool msg_sent_to_all(int index, int neighbours_size)
{
int temp;
//pthread_mutex_lock(&list_lock);
//pthread_mutex_lock(&nodes[index].f_lock);
for(int i = 0; i < neighbours_size; i++)
{
temp = nodes[index].edges;
if(nodes[temp].msg_received == false)
{
print("returning false..\n");
return false;
}
}
//pthread_mutex_unlock(&list_lock);
//pthread_mutex_unlock(&nodes[index].f_lock);
print("returning true..");
return true;
}
//-----------------
int main(int argc, char **argv)
{
thread_param_t param;
pthread_attr_t attr;
pthread_t thread;
int rc = 0, detachstate;
//START-->initializing mutexs
if(!pthread_mutex_init(&list_lock, NULL))
print("mutex_nodes initialized..");
if(!pthread_mutex_init(&path_mutex, NULL))
print("path_mutex initialized..");
for(int index = 0; index < _NODES; index++)
if(!pthread_mutex_init(&nodes[index].f_lock, NULL))
print("nodes[%d].f_lock initialized..", index);
//END-->initializing mutex
//START-->setting thread attributes
// create a default thread attributes object
rc = pthread_attr_init(&attr);
checkResults("pthread_attr_init()\n", rc);
//set the detach state thread attribute
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
checkResults("pthread_attr_setdetachstate()\n", rc);
//END-->setting thread attributes
//Initializing the source node -- node 0
param.id = 0;
param.rcvd_from = -1;
param.msg = "Purple";
nodes[0].msg_received = true;
pthread_create(&callThd[0], &attr, funcThd, (void *) ¶m); // starting the first round ;-)
sleep(10);
// cleanin up space
pthread_cond_destroy(&next_round_ready);
pthread_mutex_destroy(&list_lock);
int cap = 0;
for(int i = 0; i < _NODES; i++)
{
print("%d %d ", nodes.msg_received, nodes.packet.id);
for(int k = 0; k < MSG_SIZE; k++)
print("%c", nodes.packet.msg[k]);
cap = nodes.edges.size();
for(int j = 0; j < cap; j++)
{
print(" %d", nodes.edges[j]);
}
print("\n");
}
print("Exiting Main()..\n");
exit(EXIT_SUCCESS);
return 0;
}
//-------------------
int print(const char *ft, ...)
{
va_list args;
/* Initializing arguments to store all values after ft */
va_start(args, ft);
vfprintf(stderr, ft, args);
va_end(args);
//fprintf(stderr, "\n");
}
//-------------------