Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

qioarg.C

Go to the documentation of this file.
00001 #include <config.h>
00002 #include <iostream>
00003 #include <unistd.h>
00004 #include <util/gjp.h>
00005 #include <sys/time.h>
00006 #include <iomanip>
00007 #include <cstring>
00008 #include <comms/glb.h>
00009 using namespace std;
00010 
00011 #include <util/qioarg.h>
00012 
00013 #if TARGET == QCDOC
00014 #include <util/gsum64ext.h>
00015 #endif
00016 
00017 CPS_START_NAMESPACE
00018 
00019 
00021 // QioArg members////////////////////////////////////////
00023 void QioArg::init(const char * file, const int concur_io_number, const Float chk_prec,
00024                   const FP_FORMAT file_format, const INT_FORMAT file_int_format,
00025                   const int recon_row_3) {
00026   for(int dir=0;dir<5;dir++) {
00027     nodes[dir] = GJP.Nodes(dir);
00028     node_sites[dir] = GJP.NodeSites(dir);
00029     coor[dir] = GJP.NodeCoor(dir);
00030   }
00031 
00032 // Make it all periodic as NERSC header specifies gauge boundary condition, 
00033 // 04/03/05 CJ
00034   for(int dir=0;dir<4;dir++) 
00035     bc[dir] = BND_CND_PRD;
00036 //    bc[dir] = GJP.Bc(dir);
00037 
00038   StartConfLoadAddr = GJP.StartConfLoadAddr();
00039 
00040   // user set params
00041   ConcurIONumber = concur_io_number;
00042   strcpy(FileName, file);
00043   CheckPrecision = chk_prec;
00044   FileFpFormat = file_format;
00045   FileIntFormat = file_int_format;
00046   ReconRow3 = recon_row_3;
00047 }
00048 
00049 
00051 // QioControl members////////////////////////////////////////////////
00053 QioControl::QioControl() 
00054   : num_concur_io(0), do_log(0), cname("QioControl"), io_good(false)
00055 {
00056   //  cout << "I am on a " << GJP.Xnodes() << "x"<< GJP.Ynodes() << "x"<< GJP.Znodes() 
00057   //       << "x"<< GJP.Tnodes() <<"x"<<GJP.Snodes() << " machine" << endl;
00058   //  cout << "My pos is (" << GJP.XnodeCoor() << ","<< GJP.YnodeCoor() << ","<< GJP.ZnodeCoor() << ","
00059   //       << GJP.TnodeCoor() << "," << GJP.SnodeCoor() << ")" << endl;
00060 
00061   unique_id = GJP.XnodeCoor() + GJP.Xnodes() * (GJP.YnodeCoor() + GJP.Ynodes() * (GJP.ZnodeCoor() + GJP.Znodes() * (GJP.TnodeCoor() + GJP.Tnodes() * GJP.SnodeCoor() ) ) );
00062   //  cout << "My UniqueID() = " << unique_id << endl;
00063 
00064   number_nodes = GJP.Xnodes() * GJP.Ynodes() * GJP.Znodes() * GJP.Tnodes() * GJP.Snodes();
00065   //  cout << "Total number of nodes = " << number_nodes << endl;
00066 
00067 }
00068 
00069 QioControl::~QioControl() {
00070 
00071 }
00072 
00073 #ifndef USE_QMP
00074 // The following are NEW functions added to QioControl class 
00075 // to enable message passing between parallel processors, based on QMP calls
00076 // (some are pretty useful...)
00077 
00078 int QioControl::synchronize(const int errorStatus)  const {
00079   const char * fname = "synchronize()";
00080   int error = errorStatus;
00081  
00082   if(NumNodes()>1) {
00083     error = globalSumInt(error);
00084     if(error > 0) {
00085       VRB.Flow(cname,fname,"Totally %d nodes reported error!\n",error);
00086     }
00087   }
00088   return error;
00089 }
00090 
00091 void QioControl::broadcastInt(int * data, int fromID)  const {
00092   if(NumNodes() > 1) {
00093     if(unique_id != fromID) {
00094       *data = 0;
00095     }
00096 
00097     *data = globalSumInt(*data);
00098   }
00099 }
00100 
00101 void QioControl::broadcastFloat(Float * data, int fromID) const {
00102   if(NumNodes() > 1) {
00103     if(unique_id != fromID) {
00104       * data = 0;
00105     }
00106 
00107     *data = globalSumFloat(*data);
00108   }
00109 }
00110 
00111 int QioControl::round(const Float fdata) const{
00112   int ndata = (int)fdata;
00113   if(fdata - ndata >= 0.5) ndata++;
00114   if(fdata - ndata < -0.5) ndata--;
00115   return ndata;
00116 }
00117 
00118 int QioControl::globalSumInt(const int data) const{
00119 #ifdef PARALLEL
00120   //  Gsum64Ext  gsum;
00121   //  return gsum.Sum(data);
00122   int hfbits = sizeof(unsigned int) * 8 / 2;
00123   unsigned int mask = (1 << hfbits) - 1;
00124 
00125   int sumd = data;
00126   int hi = sumd >> hfbits;
00127   int lo = sumd & mask;
00128   hi = round(globalSumFloat(hi));
00129   lo = round(globalSumFloat(lo));
00130   sumd = (hi<<hfbits)+lo;
00131   return sumd;
00132 #else
00133   return data;
00134 #endif
00135 }
00136 
00137 unsigned int QioControl::globalSumUint(const unsigned int data) const{
00138 #ifdef  PARALLEL 
00139   //  Gsum64Ext  gsum;
00140   //  return gsum.Sum(data);
00141   int hfbits = sizeof(unsigned int) * 8 / 2;
00142   unsigned int mask = (1 << hfbits) - 1;
00143 
00144   unsigned int sumd = data;
00145   unsigned int hi = sumd >> hfbits;
00146   unsigned int lo = sumd & mask;
00147   hi = round(globalSumFloat(hi));
00148   lo = round(globalSumFloat(lo));
00149   sumd = (hi<<hfbits)+lo;
00150   return sumd;
00151 #else
00152   return data;
00153 #endif
00154 }
00155 
00156 Float QioControl::globalSumFloat(const Float data) const {
00157 #ifdef  PARALLEL
00158   //  Gsum64Ext  gsum;
00159   //  return gsum.Sum(data);
00160   Float sumdata = data;
00161   glb_sum_five(&sumdata);
00162   return sumdata;
00163 #else
00164   return data;
00165 #endif
00166 }
00167 
00168 int QioControl::globalMinInt(const int data) const{
00169 #ifdef  PARALLEL
00170   Float fdata = data;
00171   glb_min(&fdata);
00172   int res = round(fdata);
00173   return res;
00174 #else
00175   return data;
00176 #endif
00177 }
00178 #endif
00179 
00180 // IO control pattern:  two broadcast to set id range who got control
00181 //                      read/write
00182 //                      one sync to indicate finish, ret<0 means all finished
00183 int QioControl::getIOTimeSlot() const {
00184   const char * fname = "getIOTimeSlot()";
00185 
00186 //  printf("Node %d: GetIOTimeSlot()\n",UniqueID());
00187   if(NumNodes() > 1) {
00188     // using intelligent commander(node-0), dumb server(others) mode
00189     if(unique_id == 0) {
00190       return  IOCommander(0);
00191     }
00192     else {
00193       int firstID, lastID;
00194       while(1) {
00195         broadcastInt(&firstID);
00196         broadcastInt(&lastID);
00197         if(unique_id >= firstID && unique_id <= lastID){ // got time slot
00198 //          printf("Node %d: Got time slot!\n",UniqueID());
00199           return 1;
00200         }
00201         
00202         synchronize();
00203       }
00204     }
00205   }
00206 
00207   return 1;
00208 }
00209 
00210 int QioControl::finishIOTimeSlot() const {
00211 //  printf("Node %d: finishIOTimeSlot()\n",UniqueID());
00212   if(NumNodes() > 1) {
00213     if(unique_id == 0) {
00214       return IOCommander(1);
00215     }
00216     else {
00217       if(synchronize()<0) return 0; // io finished
00218       while(1) {
00219         int dummy;
00220         broadcastInt(&dummy);
00221         broadcastInt(&dummy);  
00222         if(synchronize()<0) break;
00223       }
00224     }
00225   }
00226 
00227   return 0;
00228 }
00229 
00230 
00231 int QioControl::IOCommander(int caller) const {
00232   const char * fname = "IOCommander()";
00233   int totalnodes = NumNodes();
00234   int do_concur_io = num_concur_io;
00235   if(do_concur_io <= 0) do_concur_io = totalnodes;
00236 
00237   int batches = totalnodes / do_concur_io;
00238   if(do_concur_io * batches < totalnodes)  batches ++;
00239 
00240   int firstID, lastID;
00241 
00242   if(caller == 0)  { // let node 0 finish its task first (w/ the first batch)
00243     firstID = 0;
00244     lastID = do_concur_io-1;
00245     if(lastID > totalnodes-1)  lastID = totalnodes-1;
00246     printf("Node %d: IOCommander(%d) batches=%d firstID=%d lastID=%d\n",UniqueID(),caller, batches, firstID,lastID);
00247 
00248     broadcastInt(&firstID);
00249     broadcastInt(&lastID);
00250     VRB.Flow(cname, fname, "Parallel IO: Group 1, Node %d thru Node %d\n",firstID,lastID);
00251     return 1;
00252   }
00253   else { // now node 0 finished his own io, can control others
00254     if(batches==1) {
00255       synchronize(-1);  // io finished
00256       return 0;
00257     }
00258 
00259     for(int i=1;i<batches;i++) {
00260       synchronize(0);  // one batch done, but still more
00261       firstID = i * do_concur_io;
00262       lastID = (i+1) * do_concur_io - 1;
00263 //    printf("Node %d: IOCommander(%d) firstID=%d lastID=%d\n",UniqueID(),caller, firstID,lastID);
00264       if(lastID > totalnodes-1)  lastID = totalnodes-1;
00265 
00266       broadcastInt(&firstID);
00267       broadcastInt(&lastID);
00268       VRB.Flow(cname,fname,"Parallel IO: Group %d, Node %d thru Node %d\n",i+1,firstID,lastID);
00269     }
00270 
00271     synchronize(-1);  // io finished
00272     return 0;
00273   }
00274 }
00275 
00276 
00277 void QioControl::buildNodesList(int * active_num, int * active_node_list, int this_active) const {
00278   *active_num = globalSumInt(this_active?1:0);
00279   for(int i=0;i < *active_num; i++) {
00280     int sendid;
00281     if(this_active)  sendid = uniqueID();
00282     else             sendid = NumNodes(); // > all possible uniqueID();
00283     active_node_list[i] = globalMinInt(sendid);
00284     if(active_node_list[i] == uniqueID()) this_active = 0;  // exclude the nodes already in list
00285   }
00286 }
00287 
00288 int QioControl::syncError(int this_error) const {
00289   const char * fname = "testError()";
00290   TempBufAlloc nodes_list_buf(NumNodes()*sizeof(int));
00291   int * nodes_list = nodes_list_buf.IntPtr();
00292 
00293   int error_nodes;
00294   buildNodesList(&error_nodes, nodes_list, this_error);
00295   if(error_nodes>0) {
00296     VRB.Flow(cname, fname,
00297              "%d nodes report error! They are (if more than 10 nodes, only list first 10 ids):\n",
00298              error_nodes);
00299     for(int i=0;i<10 && i<error_nodes; i++)      VRB.Flow(cname,fname,"error id = %d\n",nodes_list[i]);
00300     VRB.Flow(cname,fname,"####\n");
00301   }
00302   return error_nodes;
00303 }
00304 
00305 void QioControl::setLogDir(const char * LogDir) {
00306   do_log = 1;
00307   logging = 0;
00308   strcpy(log_dir,LogDir);
00309 }
00310 
00311 
00312 void QioControl::startLogging(const char * action) {
00313   const char * fname = "startLogging()";
00314 
00315   int error = 0;
00316 
00317   if(!do_log) return;
00318   logs.clear();
00319 
00320   char logname[256];
00321   sprintf(logname,    "%s/qcdio.log.%d",log_dir,uniqueID());
00322   //  sprintf(oldlogname, "%s/%d.log.old",log_dir,uniqueID());
00323 
00324   /*  
00325   // copy logs to oldlogs
00326   logs.open(oldlogname);
00327   if(!logs.is_open()) {
00328     cout << "LOG file [" << oldlogname << "] open failed!" << endl;
00329     logging = 0;
00330     return;
00331   }
00332 
00333   ifstream prevlogs(logname);
00334   if(prevlogs.is_open()) {
00335     logs << prevlogs.rdbuf();
00336     prevlogs.close();
00337   }
00338   logs.close();
00339 
00340   logs.clear();
00341   prevlogs.clear();
00342 
00343   */
00344 
00345   // clear new logs, copy oldlogs to new logs thus to
00346   // set file pointer to the end so that we can append new logs
00347   //  VRB.Flow(cname,fname,"Try open file %s",logname);
00348   logs.open(logname, ios_base::in | ios_base::out | ios_base::ate);
00349   if(!logs.good()) { // file doesn't exist?
00350     logs.clear();
00351     logs.open(logname, ios_base::out | ios_base::trunc); 
00352     if(!logs.good()) {
00353       logs.clear();
00354       VRB.Flow(cname,fname,"LOG file [%s] open failed!\n",logname);
00355       logging = 0;
00356       error = 1;
00357     }
00358   }
00359   if(syncError(error)>0) {
00360     ERR.FileA(cname,fname,logname); 
00361   }
00362 
00363   /*
00364   prevlogs.open(oldlogname);
00365   if(prevlogs.is_open()) {
00366     logs << prevlogs.rdbuf();
00367     prevlogs.close();
00368   }
00369 
00370   logs.clear(); // if prevlogs is empty, the logs may have a error bit set
00371   */
00372   
00373   /*
00374   char logfile[200];
00375   strcpy(logfile,log_dir);
00376   strcat(logfile,"/qcdio.log");
00377   logs = Fopen(ADD_ID, logfile, "a");
00378   if(!logs) error = 1;
00379   if(testError(error) > 0) {
00380     ERR.FileA(cname,fname,logfile);
00381   }
00382   */
00383 
00384   //  cout << "start logging..." << endl;
00385 
00386   // start logging  
00387   struct timeval tp;
00388   gettimeofday(&tp,NULL);
00389   log_start = tp.tv_sec;
00390   char logtime[100];
00391   strcpy(logtime,ctime(&log_start));
00392   logtime[strlen(logtime)-1] = '\0';  // cut the last '\n'
00393 
00394   logs << "LOG<" << uniqueID() << ">["<< logtime << "] ";
00395   if(action) logs << action;
00396   logs<<" : \t";
00397   log_point = logs.tellp();
00398   logs << "Processing" << endl << flush;
00399 
00400   logging = 1;
00401 }
00402 
00403 void QioControl::log(const char * short_note) {
00404   const char * fname = "log()";
00405   int error = 0;
00406 
00407   if(!do_log || !logging) return;
00408   //  if(!logs.is_open() || !logs.good())  error = 1;
00409   if(syncError(error)>0) {
00410     ERR.Hardware(cname,fname,"Wrinting to file qcdio.log.* failed");
00411   }
00412 
00413   //  cout << "continue logging..." << endl;
00414   struct timeval tp;
00415   gettimeofday(&tp,NULL);
00416   time_t tm_elapse = tp.tv_sec - log_start;
00417 
00418   logs.seekp(log_point);
00419   logs << tm_elapse;
00420   if(short_note) logs << "(" << short_note << ")";
00421   logs<<"\t";
00422   log_point = logs.tellp();
00423   logs<<"Processing" << endl << flush;
00424 }
00425 
00426 void QioControl::finishLogging(const char * ending_word) {
00427   const char * fname = "finishLogging()";
00428   int error = 0;
00429 
00430   if(!do_log || !logging) return;
00431   //  if(!logs.is_open() || !logs.good()) error=1;
00432   if(syncError(error)>0) {
00433     ERR.Hardware(cname,fname,"Closing file qcdio.log.* failed");
00434   }
00435 
00436   //  cout << "finish logging..." << endl;
00437 
00438   struct timeval tp;
00439   gettimeofday(&tp,NULL);
00440   time_t tm_elapse = tp.tv_sec - log_start;
00441 
00442   logs.seekp(log_point);
00443   if(ending_word) logs << ending_word;
00444   logs<< "["<<tm_elapse << " sec]";
00445   logs<< "          " << endl << flush; // erase any chars not overwritten
00446   logs.close();
00447 
00448   logging = 0;
00449 }
00450 
00451 
00452 
00453 
00454 
00455 
00456 CPS_END_NAMESPACE

Generated on Sat Oct 10 14:11:36 2009 for Columbia Physics System by  doxygen 1.3.9.1