Hi all, I have a query about multithreading. What I would like to do is, at the start of my main update() function, start a couple of threads in parallel, once they are all complete carry on with my main update function.
void update() {
thread1->update(); // fluid solver
thread2->update(); // particle system
thread3->update(); // camera1 optical flow
thread4->update(); // camera2 optical flow
// all of the above threads have their own data and work independantly
etc.
.
.. wait till all the above threads are done.
.. once they are done, feed the data from each of them into other ones
.. e.g. fluid velocity info is added to particle systems reedy for next frame's update
.. e.g. camera optical flow data is fed into fluid solver, ready for next frame's update
render all
}
I am having trouble getting this working the way I want. I don't want my threads to loop continuously independent of my main update loop, I want them all to start at the beginning of my update loop, and then once they are all finished, feed the data around, then render, then repeat.
I tried creating a thread every update loop which runs only once (no while() in the function) - it works and behaves fine for a few minutes, then the threads just stop being created - no crash or hang, just no updates from threads. I also tried using an infinte while() but with a flag that determined whether the code ran that frame or not, and only set that flag at the beginning of the main update loop (code for this below) - that one is very stable, but behaviour was weird, I think the thread updates weren't being called when I wanted them to.
I've read a lot of documentation on multithreading in general and all the pthread commands. I understand the fundamentals of protecting data and all that.. but I'm just lacking a lot of experience and strategy .. so any suggestions welcome!
This is my thread class I'm using which has the infinite loop in the threaded function, but only runs the update function if the flag is set. I initialize the flag at the beginning of each main update with thread1->runOnce(); thread2->runOnce(); etc.... but it doesn't run smoothly.
#include "ofxThread.h"
class MSAThread : public ofxThread {
bool bHasRunThisFrame;
bool bAutoLoop;
int interval;
public:
// create the thread, if autoLoop is true, start running straight away and loop,
// otherwise wait for the runOnce command
void start(int frameRate = 30, bool initAutoLoop = false){
interval = 1000/frameRate;
bAutoLoop = initAutoLoop;
if(!bAutoLoop) bHasRunThisFrame = true; // don't run straight away
startThread(true, false); // blocking, verbose
}
// run the update function once in another thread
void runOnce() {
if(lock()) {
bAutoLoop = false;
bHasRunThisFrame = false;
unlock();
}
}
// run the update function in another thread
void threadedFunction() {
while(isThreadRunning()) {
if( lock() ){
if(bAutoLoop || !bHasRunThisFrame) {
update();
bHasRunThisFrame = true;
unlock();
ofSleepMillis(interval);
} else {
unlock();
}
}
}
}
// main thread waits for the update function to finish
void waitForFinish() { if(lock()) { unlock(); } }
// this is the function that gets overvidden
virtual void update() = 0;
};
and it's based on this OpenFrameworks class:
class ofxThread{
public:
ofxThread();
virtual ~ofxThread();
bool isThreadRunning();
void startThread(bool _blocking = true, bool _verbose = true);
bool lock();
bool unlock();
void stopThread();
protected:
//-------------------------------------------------
//you need to overide this with the function you want to thread
virtual void threadedFunction(){
if(verbose)printf("ofxThread: overide threadedFunction with your own\n");
}
//-------------------------------------------------
#ifdef TARGET_WIN32
static unsigned int __stdcall thread(void * objPtr){
ofxThread* me = (ofxThread*)objPtr;
me->threadedFunction();
return 0;
}
#else
static void * thread(void * objPtr){
ofxThread* me = (ofxThread*)objPtr;
me->threadedFunction();
return 0;
}
#endif
#ifdef TARGET_WIN32
HANDLE myThread;
CRITICAL_SECTION critSec; //same as a mutex
#else
pthread_t myThread;
pthread_mutex_t myMutex;
#endif
bool threadRunning;
bool blocking;
bool verbose;
};
//-------------------------------------------------
ofxThread::ofxThread(){
threadRunning = false;
}
//-------------------------------------------------
ofxThread::~ofxThread(){
stopThread();
}
//-------------------------------------------------
bool ofxThread::isThreadRunning(){
//should be thread safe - no writing of vars here
return threadRunning;
}
//-------------------------------------------------
void ofxThread::startThread(bool _blocking, bool _verbose){
//have to put this here because the thread can be running
//before the call to create it returns
threadRunning = true;
#ifdef TARGET_WIN32
InitializeCriticalSection(&critSec);
myThread = (HANDLE)_beginthreadex(NULL, 0, this->thread, (void *)this, 0, NULL);
#else
pthread_mutex_init(&myMutex, NULL);
pthread_create(&myThread, NULL, thread, (void *)this);
#endif
blocking = _blocking;
verbose = _verbose;
}
//-------------------------------------------------
//returns false if it can't lock
bool ofxThread::lock(){
if(!threadRunning){
if(verbose)printf("ofxThread: need to call startThread first\n");
return false;
}
#ifdef TARGET_WIN32
if(blocking)EnterCriticalSection(&critSec);
else {
if(!TryEnterCriticalSection(&critSec)){
if(verbose)printf("ofxThread: mutext is busy \n");
return false;
}
}
if(verbose)printf("ofxThread: we are in -- mutext is now locked \n");
#else
if(blocking){
if(verbose)printf("ofxThread: waiting till mutext is unlocked\n");
pthread_mutex_lock(&myMutex);
if(verbose)printf("ofxThread: we are in -- mutext is now locked \n");
}else{
int value = pthread_mutex_trylock(&myMutex);
if(value == 0){
if(verbose)printf("ofxThread: we are in -- mutext is now locked \n");
}
else{
if(verbose)printf("ofxThread: mutext is busy - already locked\n");
return false;
}
}
#endif
return true;
}
//-------------------------------------------------
bool ofxThread::unlock(){
if(!threadRunning){
if(verbose)printf("ofxThread: need to call startThread first\n");
return false;
}
#ifdef TARGET_WIN32
LeaveCriticalSection(&critSec);
#else
pthread_mutex_unlock(&myMutex);
#endif
if(verbose)printf("ofxThread: we are out -- mutext is now unlocked \n");
return true;
}
//-------------------------------------------------
void ofxThread::stopThread(){
if(threadRunning){
#ifdef TARGET_WIN32
CloseHandle(myThread);
#else
pthread_mutex_destroy(&myMutex);
pthread_detach(myThread);
#endif
if(verbose)printf("ofxThread: thread stopped\n");
threadRunning = false;
}else{
if(verbose)printf("ofxThread: thread already stopped\n");
}
}