00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "threadutil.h"
00021
00022 #include <opie2/odebug.h>
00023 #include <qsocketnotifier.h>
00024
00025 #include <pthread.h>
00026 #include <assert.h>
00027 #include <unistd.h>
00028 #include <errno.h>
00029
00030 using namespace ThreadUtil;
00031
00032 struct Mutex::Data
00033 {
00034 Data()
00035 {
00036 pthread_mutex_init( &mutex, 0 );
00037 }
00038 ~Data()
00039 {
00040 pthread_mutex_destroy( &mutex );
00041 }
00042
00043 pthread_mutex_t mutex;
00044 };
00045
00046 Mutex::Mutex()
00047 : d( new Data )
00048 {
00049 }
00050
00051 Mutex::~Mutex()
00052 {
00053 delete d;
00054 }
00055
00056 void Mutex::lock()
00057 {
00058 pthread_mutex_lock( &d->mutex );
00059 }
00060
00061 void Mutex::unlock()
00062 {
00063 pthread_mutex_unlock( &d->mutex );
00064 }
00065
00066 bool Mutex::tryLock()
00067 {
00068 return pthread_mutex_trylock( &d->mutex ) == 0;
00069 }
00070
00071 bool Mutex::isLocked()
00072 {
00073 if ( !tryLock() )
00074 return true;
00075
00076 unlock();
00077 return false;
00078 }
00079
00080 struct WaitCondition::Data
00081 {
00082 Data()
00083 {
00084 int result = pthread_cond_init( &waitCondition, 0 );
00085 assert( result == 0 );
00086 }
00087 ~Data()
00088 {
00089 pthread_cond_destroy( &waitCondition );
00090 }
00091
00092 pthread_cond_t waitCondition;
00093 };
00094
00095 WaitCondition::WaitCondition()
00096 : d( new Data )
00097 {
00098 }
00099
00100 WaitCondition::~WaitCondition()
00101 {
00102 delete d;
00103 }
00104
00105 bool WaitCondition::wait()
00106 {
00107 Mutex m;
00108 m.lock();
00109 return wait( m );
00110 }
00111
00112 bool WaitCondition::wait( Mutex &mutex )
00113 {
00114 return pthread_cond_wait( &d->waitCondition, &mutex.d->mutex );
00115 }
00116
00117 void WaitCondition::wakeOne()
00118 {
00119 pthread_cond_signal( &d->waitCondition );
00120 }
00121
00122 void WaitCondition::wakeAll()
00123 {
00124 pthread_cond_broadcast( &d->waitCondition );
00125 }
00126
00127 struct Thread::Data
00128 {
00129 Data() : isRunning( false )
00130 {}
00131
00132 pthread_t self;
00133 Mutex guard;
00134 bool isRunning;
00135
00136 WaitCondition finishCondition;
00137
00138 Thread *thr;
00139
00140 void run() { thr->run(); }
00141 };
00142
00143 extern "C"
00144 {
00145
00146 static void terminate_thread( void *arg )
00147 {
00148 Thread::Data *data = ( Thread::Data* )arg;
00149
00150 assert( data );
00151
00152 AutoLock locker( data->guard );
00153 data->isRunning = false;
00154 data->finishCondition.wakeAll();
00155 }
00156
00157 static void *start_thread( void *arg )
00158 {
00159 Thread::Data *data = ( Thread::Data* )arg;
00160
00161 pthread_cleanup_push( terminate_thread, data );
00162
00163 data->isRunning = true;
00164 data->run();
00165
00166 pthread_cleanup_pop( true );
00167
00168 Thread::exit();
00169 return 0;
00170 }
00171
00172 }
00173
00174 Thread::Thread()
00175 : d( new Data )
00176 {
00177 d->thr = this;
00178 }
00179
00180 Thread::~Thread()
00181 {
00182 assert( d->isRunning == false );
00183 delete d;
00184 }
00185
00186 void Thread::start()
00187 {
00188 AutoLock lock( d->guard );
00189
00190 if ( d->isRunning ) {
00191 odebug << "ThreadUtil::Thread::start() called for running thread." << oendl;
00192 return;
00193 }
00194
00195 pthread_attr_t attributes;
00196 pthread_attr_init( &attributes );
00197 pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
00198 int err = pthread_create( &d->self, &attributes, start_thread, ( void* )d );
00199 if ( err != 0 ) {
00200 odebug << "ThreadUtil::Thread::start() : can't create thread: " << strerror( err ) << "" << oendl;
00201 pthread_attr_destroy( &attributes );
00202 return;
00203 }
00204 pthread_attr_destroy( &attributes );
00205 }
00206
00207 void Thread::terminate()
00208 {
00209 AutoLock lock( d->guard );
00210 if ( !d->isRunning )
00211 return;
00212
00213 pthread_cancel( d->self );
00214 }
00215
00216 bool Thread::wait()
00217 {
00218 AutoLock lock( d->guard );
00219 if ( !d->isRunning )
00220 return true;
00221
00222 return d->finishCondition.wait( d->guard );
00223 }
00224
00225 bool Thread::isRunning() const
00226 {
00227 AutoLock lock( d->guard );
00228 return d->isRunning;
00229 }
00230
00231 void Thread::exit()
00232 {
00233 pthread_exit( 0 );
00234 }
00235
00236 OnewayNotifier::OnewayNotifier()
00237 {
00238 int fds[ 2 ];
00239 pipe( fds );
00240 m_readFd = fds[ 0 ];
00241 m_writeFd = fds[ 1 ];
00242
00243 m_notifier = new QSocketNotifier( m_readFd, QSocketNotifier::Read );
00244 connect( m_notifier, SIGNAL( activated(int) ),
00245 this, SLOT( wakeUp() ) );
00246 }
00247
00248 OnewayNotifier::~OnewayNotifier()
00249 {
00250 delete m_notifier;
00251
00252 ::close( m_readFd );
00253 ::close( m_writeFd );
00254 }
00255
00256 void OnewayNotifier::notify()
00257 {
00258 const char c = 42;
00259 ::write( m_writeFd, &c, 1 );
00260 }
00261
00262 void OnewayNotifier::wakeUp()
00263 {
00264 char c = 0;
00265
00266 if ( ::read( m_readFd, &c, 1 ) != 1 )
00267 return;
00268
00269 emit awake();
00270 }
00271
00272 ChannelMessage::ChannelMessage( int type, int data, const char* msg )
00273 : m_type( type ), m_data( data ), m_msg( msg ),
00274 m_isCall( false ), m_replied( false ), m_inEventHandler( false )
00275 {}
00276
00277 ChannelMessage::~ChannelMessage()
00278 {
00279 if ( m_guard.isLocked() )
00280 m_guard.unlock();
00281 }
00282
00283 void ChannelMessage::reply()
00284 {
00285 if ( !m_isCall )
00286 {
00287 odebug << "ChannelMessage::reply() - can't reply oneway message!" << oendl;
00288 return;
00289 }
00290
00291 if ( m_inEventHandler )
00292 {
00293 m_replied = true;
00294 return;
00295 }
00296
00297 m_condition.wakeOne();
00298 m_guard.unlock();
00299 }
00300
00301 struct Channel::Private
00302 {
00303 Private()
00304 {
00305 ownerThread = pthread_self();
00306 }
00307
00308 pthread_t ownerThread;
00309 };
00310
00311 Channel::Channel( QObject *parent, const char *name )
00312 : QObject( parent, name ), d( new Private )
00313 {
00314 connect( &m_notifier, SIGNAL( awake() ),
00315 this, SLOT( deliver() ) );
00316 }
00317
00318 Channel::~Channel()
00319 {
00320 delete d;
00321 }
00322
00323 void Channel::send( ChannelMessage *message, SendType type )
00324 {
00325 if ( type == WaitForReply )
00326 {
00327 message->m_guard.lock();
00328 message->m_isCall = true;
00329 }
00330
00331 m_pendingMessagesGuard.lock();
00332 m_pendingMessages << MsgEnvelope( type, message );
00333 m_pendingMessagesGuard.unlock();
00334
00335 if ( d->ownerThread == pthread_self() ) {
00336 assert( type != WaitForReply );
00337
00338 deliver();
00339 }
00340 else
00341 m_notifier.notify();
00342
00343
00344 if ( type == WaitForReply )
00345 {
00346 message->m_condition.wait( message->m_guard );
00347 message->m_guard.unlock();
00348 }
00349 }
00350
00351 void Channel::deliver()
00352 {
00353 AutoLock lock( m_pendingMessagesGuard );
00354
00355 while ( !m_pendingMessages.isEmpty() ) {
00356 MsgEnvelope envelope = m_pendingMessages.first();
00357
00358 m_pendingMessages.remove( m_pendingMessages.begin() );
00359
00360 m_pendingMessagesGuard.unlock();
00361 deliverOne( envelope );
00362 m_pendingMessagesGuard.lock();
00363 }
00364 }
00365
00366 void Channel::deliverOne( const MsgEnvelope &envelope )
00367 {
00368 ChannelMessage *msg = envelope.msg;
00369
00370 assert( msg );
00371
00372 if ( envelope.type == WaitForReply )
00373 {
00374 msg->m_guard.lock();
00375 msg->m_inEventHandler = true;
00376 }
00377
00378 receiveMessage( msg, envelope.type );
00379
00380 if ( envelope.type == WaitForReply )
00381 {
00382 msg->m_inEventHandler = false;
00383 if ( msg->m_replied )
00384 {
00385 msg->m_condition.wakeOne();
00386
00387
00388 msg->m_guard.unlock();
00389 }
00390 }
00391 }
00392
00393
00394