Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

threadutil.cpp

Go to the documentation of this file.
00001 /* This file is part of the KDE project
00002    Copyright (C) 2002 Simon Hausmann <hausmann@kde.org>
00003 
00004    This library is free software; you can redistribute it and/or
00005    modify it under the terms of the GNU Library General Public
00006    License as published by the Free Software Foundation; either
00007    version 2 of the License, or (at your option) any later version.
00008 
00009    This library is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012    Library General Public License for more details.
00013 
00014    You should have received a copy of the GNU Library General Public License
00015    along with this library; see the file COPYING.LIB.  If not, write to
00016    the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
00017    Boston, MA 02111-1307, USA.
00018 */
00019 
00020 #include "threadutil.h"
00021 
00022 #include <opie2/odebug.h>
00023 #include <qsocketnotifier.h>
00024 
00025 #include <pthread.h>
00026 #include <assert.h>
00027 #include <unistd.h>
00028 #include <errno.h>
00029 
00030 using namespace ThreadUtil;
00031 
00032 struct Mutex::Data
00033 {
00034     Data()
00035     {
00036         pthread_mutex_init( &mutex, 0 );
00037     }
00038     ~Data()
00039     {
00040         pthread_mutex_destroy( &mutex );
00041     }
00042 
00043     pthread_mutex_t mutex;
00044 };
00045 
00046 Mutex::Mutex()
00047     : d( new Data )
00048 {
00049 }
00050 
00051 Mutex::~Mutex()
00052 {
00053     delete d;
00054 }
00055 
00056 void Mutex::lock()
00057 {
00058     pthread_mutex_lock( &d->mutex );
00059 }
00060 
00061 void Mutex::unlock()
00062 {
00063     pthread_mutex_unlock( &d->mutex );
00064 }
00065 
00066 bool Mutex::tryLock()
00067 {
00068     return pthread_mutex_trylock( &d->mutex ) == 0;
00069 }
00070 
00071 bool Mutex::isLocked()
00072 {
00073     if ( !tryLock() )
00074         return true;
00075 
00076     unlock();
00077     return false;
00078 }
00079 
00080 struct WaitCondition::Data
00081 {
00082     Data()
00083     {
00084         int result = pthread_cond_init( &waitCondition, 0 );
00085         assert( result == 0 );
00086     }
00087     ~Data()
00088     {
00089         pthread_cond_destroy( &waitCondition );
00090     }
00091 
00092     pthread_cond_t waitCondition;
00093 };
00094 
00095 WaitCondition::WaitCondition()
00096     : d( new Data )
00097 {
00098 }
00099 
00100 WaitCondition::~WaitCondition()
00101 {
00102     delete d;
00103 }
00104 
00105 bool WaitCondition::wait()
00106 {
00107     Mutex m;
00108     m.lock();
00109     return wait( m );
00110 }
00111 
00112 bool WaitCondition::wait( Mutex &mutex )
00113 {
00114     return pthread_cond_wait( &d->waitCondition, &mutex.d->mutex );
00115 }
00116 
00117 void WaitCondition::wakeOne()
00118 {
00119     pthread_cond_signal( &d->waitCondition );
00120 }
00121 
00122 void WaitCondition::wakeAll()
00123 {
00124     pthread_cond_broadcast( &d->waitCondition );
00125 }
00126 
00127 struct Thread::Data
00128 {
00129     Data() : isRunning( false )
00130     {}
00131 
00132     pthread_t self;
00133     Mutex guard;
00134     bool isRunning;
00135 
00136     WaitCondition finishCondition;
00137 
00138     Thread *thr;
00139 
00140     void run() { thr->run(); }
00141 };
00142 
00143 extern "C"
00144 {
00145 
00146 static void terminate_thread( void *arg )
00147 {
00148     Thread::Data *data = ( Thread::Data* )arg;
00149 
00150     assert( data );
00151 
00152     AutoLock locker( data->guard );
00153     data->isRunning = false;
00154     data->finishCondition.wakeAll();
00155 }
00156 
00157 static void *start_thread( void *arg )
00158 {
00159     Thread::Data *data = ( Thread::Data* )arg;
00160 
00161     pthread_cleanup_push( terminate_thread, data );
00162 
00163     data->isRunning = true;
00164     data->run();
00165 
00166     pthread_cleanup_pop( true );
00167 
00168     Thread::exit();
00169     return 0; // never reached
00170 }
00171 
00172 }
00173 
00174 Thread::Thread()
00175     : d( new Data )
00176 {
00177     d->thr = this;
00178 }
00179 
00180 Thread::~Thread()
00181 {
00182     assert( d->isRunning == false );
00183     delete d;
00184 }
00185 
00186 void Thread::start()
00187 {
00188     AutoLock lock( d->guard );
00189 
00190     if ( d->isRunning ) {
00191         odebug << "ThreadUtil::Thread::start() called for running thread." << oendl;
00192         return;
00193     }
00194 
00195     pthread_attr_t attributes;
00196     pthread_attr_init( &attributes );
00197     pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
00198     int err = pthread_create( &d->self, &attributes, start_thread, ( void* )d );
00199     if ( err != 0 ) {
00200         odebug << "ThreadUtil::Thread::start() : can't create thread: " << strerror( err ) << "" << oendl;
00201         pthread_attr_destroy( &attributes );
00202         return;
00203     }
00204     pthread_attr_destroy( &attributes );
00205 }
00206 
00207 void Thread::terminate()
00208 {
00209     AutoLock lock( d->guard );
00210     if ( !d->isRunning )
00211         return;
00212 
00213     pthread_cancel( d->self );
00214 }
00215 
00216 bool Thread::wait()
00217 {
00218     AutoLock lock( d->guard );
00219     if ( !d->isRunning )
00220         return true;
00221 
00222     return d->finishCondition.wait( d->guard );
00223 }
00224 
00225 bool Thread::isRunning() const
00226 {
00227     AutoLock lock( d->guard );
00228     return d->isRunning;
00229 }
00230 
00231 void Thread::exit()
00232 {
00233     pthread_exit( 0 );
00234 }
00235 
00236 OnewayNotifier::OnewayNotifier()
00237 {
00238     int fds[ 2 ];
00239     pipe( fds );
00240     m_readFd = fds[ 0 ];
00241     m_writeFd = fds[ 1 ];
00242 
00243     m_notifier = new QSocketNotifier( m_readFd, QSocketNotifier::Read );
00244     connect( m_notifier, SIGNAL( activated(int) ),
00245              this, SLOT( wakeUp() ) );
00246 }
00247 
00248 OnewayNotifier::~OnewayNotifier()
00249 {
00250     delete m_notifier;
00251 
00252     ::close( m_readFd );
00253     ::close( m_writeFd );
00254 }
00255 
00256 void OnewayNotifier::notify()
00257 {
00258     const char c = 42;
00259     ::write( m_writeFd, &c, 1 );
00260 }
00261 
00262 void OnewayNotifier::wakeUp()
00263 {
00264     char c = 0;
00265 
00266     if ( ::read( m_readFd, &c, 1 ) != 1 )
00267         return;
00268 
00269     emit awake();
00270 }
00271 
00272 ChannelMessage::ChannelMessage( int type, int data, const char* msg )
00273     : m_type( type ), m_data( data ), m_msg( msg ),
00274       m_isCall( false ), m_replied( false ), m_inEventHandler( false )
00275 {}
00276 
00277 ChannelMessage::~ChannelMessage()
00278 {
00279     if ( m_guard.isLocked() )
00280         m_guard.unlock();
00281 }
00282 
00283 void ChannelMessage::reply()
00284 {
00285     if ( !m_isCall )
00286     {
00287         odebug << "ChannelMessage::reply() - can't reply oneway message!" << oendl;
00288         return;
00289     }
00290 
00291     if ( m_inEventHandler )
00292     {
00293         m_replied = true;
00294         return;
00295     }
00296 
00297     m_condition.wakeOne();
00298     m_guard.unlock();
00299 }
00300 
00301 struct Channel::Private
00302 {
00303     Private()
00304     {
00305         ownerThread = pthread_self();
00306     }
00307 
00308     pthread_t ownerThread;
00309 };
00310 
00311 Channel::Channel( QObject *parent, const char *name )
00312     : QObject( parent, name ), d( new Private )
00313 {
00314     connect( &m_notifier, SIGNAL( awake() ),
00315              this, SLOT( deliver() ) );
00316 }
00317 
00318 Channel::~Channel()
00319 {
00320     delete d;
00321 }
00322 
00323 void Channel::send( ChannelMessage *message, SendType type )
00324 {
00325     if ( type == WaitForReply )
00326     {
00327         message->m_guard.lock();
00328         message->m_isCall = true;
00329     }
00330 
00331     m_pendingMessagesGuard.lock();
00332     m_pendingMessages << MsgEnvelope( type, message );
00333     m_pendingMessagesGuard.unlock();
00334 
00335     if ( d->ownerThread == pthread_self() ) {
00336         assert( type != WaitForReply );
00337 
00338         deliver();
00339     }
00340     else
00341         m_notifier.notify();
00342     //QThread::postEvent( this, new QCustomEvent( QEvent::User, envelope ) );
00343 
00344     if ( type == WaitForReply )
00345     {
00346         message->m_condition.wait( message->m_guard );
00347         message->m_guard.unlock();
00348     }
00349 }
00350 
00351 void Channel::deliver()
00352 {
00353     AutoLock lock( m_pendingMessagesGuard );
00354 
00355     while ( !m_pendingMessages.isEmpty() ) {
00356         MsgEnvelope envelope = m_pendingMessages.first();
00357 
00358         m_pendingMessages.remove( m_pendingMessages.begin() );
00359 
00360         m_pendingMessagesGuard.unlock();
00361         deliverOne( envelope );
00362         m_pendingMessagesGuard.lock();
00363     }
00364 }
00365 
00366 void Channel::deliverOne( const MsgEnvelope &envelope )
00367 {
00368     ChannelMessage *msg = envelope.msg;
00369 
00370     assert( msg );
00371 
00372     if ( envelope.type == WaitForReply )
00373     {
00374         msg->m_guard.lock();
00375         msg->m_inEventHandler = true;
00376     }
00377 
00378     receiveMessage( msg, envelope.type );
00379 
00380     if ( envelope.type == WaitForReply )
00381     {
00382         msg->m_inEventHandler = false;
00383         if ( msg->m_replied )
00384         {
00385             msg->m_condition.wakeOne();
00386             // this is a bit tricky. we unlock only when we reply.
00387             // reply() does an unlock as well.
00388             msg->m_guard.unlock();
00389         }
00390     }
00391 }
00392 
00393 /* vim: et sw=4 ts=4
00394  */

Generated on Sat Nov 5 16:17:33 2005 for OPIE by  doxygen 1.4.2