00001 /* 00002 FSF: Flow Scheduling Framework library 00003 00004 Version 1.0 Copyright (C) 2010 Alexandre R.J. Francois 00005 00006 Previous versions Copyright (C) 2001-2005 University of Southern California 00007 00008 Author: Alexandre R.J. Francois - alexandrefrancois@yahoo.com 00009 www.alexandrefrancois.org 00010 00011 Modular Flow Scheduling Middleware 00012 mfsm.sourceForge.net 00013 00014 This library is free software; you can redistribute it and/or 00015 modify it under the terms of the GNU Lesser General Public 00016 License as published by the Free Software Foundation; either 00017 version 2.1 of the License, or (at your option) any later version. 00018 00019 This library is distributed in the hope that it will be useful, 00020 but WITHOUT ANY WARRANTY; without even the implied warranty of 00021 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00022 Lesser General Public License for more details. 00023 00024 You should have received a copy of the GNU Lesser General Public 00025 License along with this library; if not, write to the Free Software 00026 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00027 */ 00028 00029 #ifndef FSF_CELL_H 00030 #define FSF_CELL_H 00031 00032 #include "Fsf.h" 00033 #include "FsfFilter.h" 00034 #include "FsfRepository.h" 00035 00036 00037 namespace fsf{ 00038 00068 00069 00072 class CCell { 00073 private: 00074 bool m_bActive; 00075 bool m_bReset; 00076 00077 00078 CMutex m_csOutputName; 00079 std::string m_strOutputName; 00080 bool m_bPassThrough; 00081 00082 00083 // Active path 00084 CCondition m_condActiveThreads; 00085 long m_nNbActiveThreads; 00086 // Filter 00087 fsf::CMutex m_csActiveFilter; 00088 CActiveFilterBase *m_pActiveFilter; 00089 // Connections 00090 CCell *m_pUpstreamCell; 00091 fsf::CMutex m_csDownstream; 00092 std::list<CCell*> m_listDownstream; 00093 00094 // Passive path 00095 CCondition m_condPassiveThreads; 00096 long m_nNbPassiveThreads; 00097 // Filter 00098 fsf::CMutex m_csPassiveFilter; 00099 CPassiveFilterBase *m_pPassiveFilter; 00100 // Handle: results of the filtering 00101 fsf::CMutex m_csPassiveHandle; 00102 CPassiveHandle *m_pPassiveHandle; 00103 // Connection 00104 CRepository *m_pRepository; 00105 00106 00107 protected: 00110 00111 00112 void lockOutputName() { m_csOutputName.lock(); } 00114 void unlockOutputName() { m_csOutputName.unlock(); } 00116 const std::string &getOutputName() const { return m_strOutputName; } 00117 00118 00120 void setActive(bool bActive) { m_bActive=bActive; } 00121 00123 void lockDownstream() { m_csDownstream.lock(); } 00125 void unlockDownstream() { m_csDownstream.unlock(); } 00127 std::list<CCell*> &getListDownstream() { return m_listDownstream; } 00128 00130 void lockActiveFilter() { m_csActiveFilter.lock(); } 00132 void unlockActiveFilter() { m_csActiveFilter.unlock(); } 00134 CActiveFilterBase *getActiveFilter() { return m_pActiveFilter; } 00135 00137 void lockPassiveFilter() { m_csPassiveFilter.lock(); } 00139 void unlockPassiveFilter() { m_csPassiveFilter.unlock(); } 00140 00142 void lockPassiveHandle() { m_csPassiveHandle.lock(); } 00144 void unlockPassiveHandle() { m_csPassiveHandle.unlock(); } 00146 CPassiveHandle *getPassiveHandle() { return m_pPassiveHandle; } 00148 void deletePassiveHandle(){ 00149 m_csPassiveHandle.lock(); 00150 delete m_pPassiveHandle; 00151 m_pPassiveHandle=NULL; 00152 m_csPassiveHandle.unlock(); 00153 } 00154 00156 00157 00158 public: 00159 00161 00162 CCell(); 00163 virtual ~CCell(); 00164 00165 00167 00168 00170 void setRepository(CRepository *pRepository) { m_pRepository=pRepository; } 00172 void setActiveFilter(CActiveFilterBase *pFilter); 00174 void setPassiveFilter(CPassiveFilterBase *pFilter); 00176 void setOutputName(const std::string &strOutputName){ 00177 lockOutputName(); 00178 m_strOutputName.assign(strOutputName); 00179 unlockOutputName(); 00180 } 00181 00182 00184 bool connectDownstreamCell(CCell *pCell); 00186 bool disconnectDownstreamCell(CCell *pCell=NULL); 00187 00188 // name Flow control 00189 virtual bool switchCell(bool bOn); 00190 bool on(); 00191 bool off(); 00192 virtual void setReset(bool b=true) { m_bReset=b; } 00193 00195 CActiveFilterBase *cloneActiveFilter(); 00197 CPassiveFilterBase *clonePassiveFilter(); 00200 void getOutputName(std::string &strOutputName){ 00201 lockOutputName(); 00202 strOutputName.assign(m_strOutputName); 00203 unlockOutputName(); 00204 } 00205 00207 00208 00210 00211 00213 virtual void getTypeID(std::string &str) const { str.assign("FSF_CELL"); } 00214 00216 CRepository *getRepository() const { return m_pRepository; } 00217 // Stream connections 00219 CCell *getUpstreamCell() const { return m_pUpstreamCell; } 00220 00222 bool isActive() const { return m_bActive; } 00223 00225 bool doReset() const { return m_bReset; } 00226 //*} 00227 00228 00231 00232 00233 bool connectRepository(fsf::CRepository *pRepository); 00235 bool disconnectRepository(); 00237 bool connectUpstreamCell(fsf::CCell *pUpstreamCell); 00239 bool disconnectStream(); 00241 00243 00244 00245 virtual void process(CPassiveHandle *pPassiveHandle, CActiveHandle *pActiveHandle, CActivePulse *pActivePulse) {} 00247 00248 00250 00251 00252 00254 void lockActiveThreads() { m_condActiveThreads.lock(); } 00256 void unlockActiveThreads() { m_condActiveThreads.unlock(); } 00258 void waitActiveThreads() { m_condActiveThreads.wait(); } 00260 void broadcastActiveThreads() { m_condActiveThreads.broadcast(); } 00261 00262 00264 void lockPassiveThreads() { m_condPassiveThreads.lock(); } 00266 void unlockPassiveThreads() { m_condPassiveThreads.unlock(); } 00268 void waitPassiveThreads() { m_condPassiveThreads.wait(); } 00270 void broadcastPassiveThreads() { m_condPassiveThreads.broadcast(); } 00271 00272 00273 00276 inline void startActiveThread(CActivePulse *pPulse=NULL); 00277 00279 virtual void activeThread(CActivePulse *pPulse=NULL); 00280 00283 inline void startPassiveThread(CPassivePulse *pPulse=NULL); 00284 00286 virtual bool passiveThread(CPassivePulse *pPulse=NULL); 00287 00290 inline static void* activeThreadProc(void* pParam); 00292 //. Routes the pulse if necessary and calls <code>passiveThread</code> 00293 inline static void* passiveThreadProc(void* pParam); 00294 00296 00297 }; 00298 00299 inline void* CCell::activeThreadProc(void* pParam){ 00300 void **pThreadParam=(void**)pParam; 00301 CCell *pCell=(CCell*)pThreadParam[0]; 00302 00303 CActivePulse *pPulse=(CActivePulse*)pThreadParam[1]; 00304 pCell->activeThread((CActivePulse*)pPulse); 00305 00306 // cleanup 00307 delete [] pThreadParam; 00308 00309 return NULL; 00310 } 00311 00312 inline void CCell::startActiveThread(CActivePulse *pPulse){ 00313 if(m_bActive){ 00314 // start active thread 00315 void **pParam=new void*[2]; 00316 pParam[0]=this; 00317 pParam[1]=pPulse; 00318 00319 pthread_t thread; 00320 pthread_create(&thread,NULL,&activeThreadProc,pParam); 00321 pthread_detach(thread); 00322 } 00323 else{ 00324 // complete pulse routing 00325 if(m_bPassThrough && !m_listDownstream.empty()){ 00326 std::list<CCell*>::iterator it; 00327 lockDownstream(); 00328 pPulse->incNbPaths(m_listDownstream.size()-1); 00329 for(it=m_listDownstream.begin();it!=m_listDownstream.end();++it){ 00330 (*it)->startActiveThread(pPulse); 00331 } 00332 unlockDownstream(); 00333 } 00334 else{ 00335 // release the Pulse and delete if no longer needed 00336 if(pPulse->decNbPaths()==0) 00337 delete pPulse; 00338 } 00339 } 00340 } 00341 00342 inline void* CCell::passiveThreadProc(void* pParam){ 00343 // restore calling instance pointer 00344 void **pThreadParam=reinterpret_cast<void**>(pParam); 00345 CCell *pCell=static_cast<CCell*>(pThreadParam[0]); 00346 CPassivePulse *pPulse=static_cast<CPassivePulse*>(pThreadParam[1]); 00347 00348 // call thread function 00349 pCell->passiveThread(pPulse); 00350 00351 // cleanup 00352 delete [] pThreadParam; 00353 00354 return NULL; 00355 } 00356 00357 inline void CCell::startPassiveThread(CPassivePulse *pPulse){ 00358 void **pParam=new void*[2]; 00359 pParam[0]=this; 00360 pParam[1]=pPulse; 00361 00362 pthread_t thread; 00363 pthread_create(&thread,NULL,&passiveThreadProc,pParam); 00364 pthread_detach(thread); 00365 } 00366 00367 } // namespace fsf 00368 00369 #endif // FSF_CELL_H 00370