diff options
author | Thomas White <taw@physics.org> | 2012-05-22 17:44:52 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-05-22 17:44:52 +0200 |
commit | 412a3212048ed60645b1b31923b6720d47af9826 (patch) | |
tree | c6f36cddab224c6079e3ad289db25db9bbd21835 /src/indexamajig.c | |
parent | de56cd77205fba6ea388cf209017a2cd1323a9a1 (diff) |
WIP on tidy-up
Diffstat (limited to 'src/indexamajig.c')
-rw-r--r-- | src/indexamajig.c | 423 |
1 files changed, 181 insertions, 242 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c index f4bbb72e..ed84c6a8 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -81,7 +81,7 @@ enum { /* Information about the indexing process which is common to all patterns */ -struct static_index_args +struct index_args { UnitCell *cell; int config_cmfilter; @@ -110,7 +110,6 @@ struct static_index_args double ir_out; /* Output stream */ - pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; const struct copy_hdf5_field *copyme; char *outfile; @@ -118,36 +117,16 @@ struct static_index_args /* Information about the indexing process for one pattern */ -struct index_args +struct pattern_args { /* "Input" */ char *filename; - struct static_index_args static_args; /* "Output" */ int indexable; }; -/* Information needed to choose the next task and dispatch it */ -struct queue_args -{ - FILE *fh; - char *prefix; - int config_basename; - struct static_index_args static_args; - - char *use_this_one_instead; - - int n_indexable; - int n_processed; - int n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; - int updateReader; -}; - - static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -246,16 +225,16 @@ static void show_help(const char *s) } -// Get next pattern in .lst -char* get_pattern(FILE *fh) { - char *rval; - char line[LINE_LENGTH]; - rval = fgets(line, LINE_LENGTH - 1, fh); - if (ferror(fh)) { - printf("Read error\n"); - rval = NULL; - } - return rval; +static char *get_pattern(FILE *fh) +{ + char *rval; + char line[LINE_LENGTH]; + rval = fgets(line, LINE_LENGTH - 1, fh); + if ( ferror(fh) ) { + ERROR("Failed to get next filename from list.\n"); + rval = NULL; + } + return rval; } @@ -416,11 +395,11 @@ static void process_image(void *qp, void *pp, int cookie) outfilename = malloc(strlen(outfile) + 1); snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile); if ((fd = open(outfilename, O_WRONLY)) == -1) { - perror("Error on opening\n"); + ERROR("Error on opening\n"); exit(1); } if (fcntl(fd, F_SETLKW, &fl) == -1) { - perror("Error on setting lock wait\n"); + ERROR("Error on setting lock wait\n"); exit(1); } @@ -428,7 +407,7 @@ static void process_image(void *qp, void *pp, int cookie) FILE *fh; fh = fopen(outfilename, "a"); if (fh == NULL) { - perror("Error inside lock\n"); + ERROR("Error inside lock\n"); } write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); fclose(fh); @@ -436,7 +415,7 @@ static void process_image(void *qp, void *pp, int cookie) /* Unlock stream for other processes */ fl.l_type = F_UNLCK; /* set to unlock same region */ if (fcntl(fd, F_SETLK, &fl) == -1) { - perror("fcntl"); + ERROR("fcntl"); exit(1); } close(fd); @@ -456,6 +435,36 @@ static void process_image(void *qp, void *pp, int cookie) } +static void run_work(const struct index_args *iargs, + int filename_pipe, int results_pipe) +{ + int allDone = 0; + + while ( !allDone ) { + + /* read from pipe and return number of bytes read */ + if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { + ERROR("read1"); + } else if (buff_count > 0) { + /* process image */ + pargs.filename = buffR; + pargs.indexable = 0; + process_image(&qargs, &pargs, batchNum); + /* request another image */ + buff_count = sprintf(buffW, "%d\n", pargs.indexable); + if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) + ERROR("write P0"); + } else if (buff_count == 0) { + allDone = 1; + } + + } + /* close my pipes */ + close(filename_pipe); + close(results_pipe); +} + + #ifdef HAVE_CLOCK_GETTIME static time_t get_monotonic_seconds() @@ -542,7 +551,7 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nProcesses = 1; + int n_proc = 1; char *prepare_line; char prepare_filename[LINE_LENGTH]; struct queue_args qargs; @@ -558,6 +567,12 @@ int main(int argc, char *argv[]) float ir_inn = 4.0; float ir_mid = 5.0; float ir_out = 7.0; + int n_indexable, n_processed, n_indexable_last_stats; + int n_processed_last_stats; + int t_last_stats; + pid_t *pids; + int *filename_pipes; + int *result_pipes; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -638,7 +653,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nProcesses = atoi(optarg); + n_proc = atoi(optarg); break; case 'g' : @@ -909,7 +924,7 @@ int main(int argc, char *argv[]) prepare_line = tmp; } snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line); - qargs.use_this_one_instead = prepare_line; + rewind(fh); /* Prepare the indexer */ if ( indm != NULL ) { @@ -925,223 +940,147 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); - qargs.static_args.cell = cell; - qargs.static_args.config_cmfilter = config_cmfilter; - qargs.static_args.config_noisefilter = config_noisefilter; - qargs.static_args.config_verbose = config_verbose; - qargs.static_args.config_satcorr = config_satcorr; - qargs.static_args.config_closer = config_closer; - qargs.static_args.config_insane = config_insane; - qargs.static_args.config_bgsub = config_bgsub; - qargs.static_args.cellr = cellr; - qargs.static_args.tols[0] = tols[0]; - qargs.static_args.tols[1] = tols[1]; - qargs.static_args.tols[2] = tols[2]; - qargs.static_args.tols[3] = tols[3]; - qargs.static_args.threshold = threshold; - qargs.static_args.min_gradient = min_gradient; - qargs.static_args.min_snr = min_snr; - qargs.static_args.min_int_snr = min_int_snr; - qargs.static_args.det = det; - qargs.static_args.indm = indm; - qargs.static_args.ipriv = ipriv; - qargs.static_args.peaks = peaks; - qargs.static_args.ofh = ofh; - qargs.static_args.beam = beam; - qargs.static_args.element = element; - qargs.static_args.stream_flags = stream_flags; - qargs.static_args.hdf5_peak_path = hdf5_peak_path; - qargs.static_args.copyme = copyme; - qargs.static_args.ir_inn = ir_inn; - qargs.static_args.ir_mid = ir_mid; - qargs.static_args.ir_out = ir_out; - - qargs.fh = fh; - qargs.prefix = prefix; - qargs.config_basename = config_basename; - qargs.n_indexable = 0; - qargs.n_processed = 0; - qargs.n_indexable_last_stats = 0; - qargs.n_processed_last_stats = 0; - qargs.updateReader = 0; /* first process updates */ - qargs.t_last_stats = get_monotonic_seconds(); - - /* Read .lst file */ - register int i; - rewind(fh); /* make sure to read from start */ - - /* Clear output file content */ - char *myOutfilename = NULL; - chomp(prefix); - chomp(outfile); - myOutfilename = malloc(strlen(outfile) + 1); - snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile); - FILE *tfh; - tfh = fopen(myOutfilename, "a+"); - if (tfh == NULL) { - ERROR("No output filename\n"); - } - fclose(tfh); - qargs.static_args.outfile = outfile; - int ready_fd; - int buff_count; - fd_set fdset,tmpset; - char buffR[BUFFER], buffW[BUFFER]; - int fd_pipeIn[nProcesses][2]; /* Process0 In */ - int fd_pipeOut[nProcesses][2]; /* Process0 Out */ - unsigned int opts; - - FD_ZERO(&fdset); /* clear the fd_set */ - /* set pipeIn as non-blocking */ - for ( i=0; i<nProcesses; i++ ) { - opts = fcntl(fd_pipeIn[i][0], F_GETFL); - fcntl(fd_pipeIn[i][0], F_SETFL, opts | O_NONBLOCK); - } + /* Static worker args */ + iargs.cell = cell; + iargs.config_cmfilter = config_cmfilter; + iargs.config_noisefilter = config_noisefilter; + iargs.config_verbose = config_verbose; + iargs.config_satcorr = config_satcorr; + iargs.config_closer = config_closer; + iargs.config_insane = config_insane; + iargs.config_bgsub = config_bgsub; + iargs.cellr = cellr; + iargs.tols[0] = tols[0]; + iargs.tols[1] = tols[1]; + iargs.tols[2] = tols[2]; + iargs.tols[3] = tols[3]; + iargs.threshold = threshold; + iargs.min_gradient = min_gradient; + iargs.min_snr = min_snr; + iargs.min_int_snr = min_int_snr; + iargs.det = det; + iargs.indm = indm; + iargs.ipriv = ipriv; + iargs.peaks = peaks; + iargs.ofh = ofh; + iargs.beam = beam; + iargs.element = element; + iargs.stream_flags = stream_flags; + iargs.hdf5_peak_path = hdf5_peak_path; + iargs.copyme = copyme; + iargs.ir_inn = ir_inn; + iargs.ir_mid = ir_mid; + iargs.ir_out = ir_out; + iargs.outfile = outfile; + + n_indexable = 0; + n_processed = 0; + n_indexable_last_stats = 0; + n_processed_last_stats = 0; + t_last_stats = get_monotonic_seconds(); + + /* Fork the right number of times */ + for ( i=0; i<n_proc; i++ ) { + + pid_t p; + int filename_pipe[2]; + int result_pipe[2]; + + if ( pipe(filename_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return 1; + } - /**** PIPING ****/ - for ( i=0; i<nProcesses; i++ ) { - pipe(fd_pipeIn[i]); - pipe(fd_pipeOut[i]); - } + if ( pipe(results_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return 1; + } - int max_fd = 0; - for ( i=0; i<nProcesses; i++ ) { - FD_SET(fd_pipeIn[i][0], &fdset); - if (fd_pipeIn[i][0] > max_fd) { /* find max_fd */ - max_fd = fd_pipeIn[i][0]; + p = fork(); + if ( p == -1 ) { + ERROR("fork() failed!\n"); + return 1; } - } - max_fd = max_fd+1; - /* copy file set to tmpset */ - memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set)); - - /**** FORKING ****/ - int power = 10; /* 2^power must be larger than nProcesses */ - int pid[power]; - double num = 0; - int batchNum = 0; - /* Fork 2^power times */ - for ( i=0; i<power; i++ ) { - pid[i] = fork(); - } - /* Assign id */ - for ( i=0; i<power; i++ ) { - if (pid[i] == 0) { /* keep parents and kill off children */ - num += pow(2, i); + + if ( p == 0 ) { + /* Child process gets the 'read' end of the filename + * pipe, and the 'write' end of the result pipe. */ + close(filename_pipe[1]); + close(result_pipe[0]); + run_work(&iargs, filename_pipe[0], result_pipe[1]); + exit(0); } + + /* Parent process gets the 'write' end of the filename pipe + * and the 'read' end of the result pipe. */ + pid[i] = p; + close(filename_pipe[0]); + close(result_pipe[1]); + filename_pipes[i] = filename_pipe[1]; + result_pipes[i] = result_pipe[0]; + } - /* Kill if batchNum too high */ - if (num >= nProcesses + 1) { - exit(0); /* kill */ - } - batchNum = (int) num; - /**** PLUMBING ****/ - if (batchNum == qargs.updateReader) { - for ( i=0; i<nProcesses; i++ ) { - close(fd_pipeIn[i][1]); /* close all write pipes In */ - close(fd_pipeOut[i][0]); /* close all read pipes Out */ - } - } else { - for ( i=0; i<nProcesses; i++ ) { - if (i == batchNum - 1) { /* batchNum = 1,2,3 ... */ - close(fd_pipeIn[i][0]); /* close read pipe In */ - close(fd_pipeOut[i][1]); /* close write pipe Out */ - } else { - close(fd_pipeIn[i][0]); // close remaining pipes In - close(fd_pipeIn[i][1]); - close(fd_pipeOut[i][0]); // close remaining pipes Out - close(fd_pipeOut[i][1]); - } - } + /* Send first image to all children */ + char *nextImage = NULL; + for ( i=0; i<nProcesses; i++ ) { + nextImage = get_pattern(fh); + buff_count = sprintf(buffW, "%s",nextImage); + write (fd_pipeOut[i][1], buffW, buff_count); } - /**** INDEXING ****/ - double tStart, tEnd; - tStart = get_monotonic_seconds(); - int allDone = 0; - if (batchNum == qargs.updateReader){ - char *nextImage = NULL; - for ( i=0; i<nProcesses; i++ ) { /* Send out image to all processes*/ - nextImage = get_pattern(fh); - buff_count = sprintf(buffW, "%s",nextImage); - write (fd_pipeOut[i][1], buffW, buff_count); - } - int nFinished = 0; - while (!allDone) { - /* select from file set for reading */ - if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0) - perror("select"); - if (ready_fd > 0) { - for ( i=0; i<nProcesses; i++ ) { - /* is in file set that raised flag? */ - if (FD_ISSET(fd_pipeIn[i][0],&fdset)) { - /* read from pipe and return number of bytes read */ - if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) { - perror("read"); + int nFinished = 0; + while (!allDone) { + /* select from file set for reading */ + if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0) + ERROR("select"); + if (ready_fd > 0) { + for ( i=0; i<nProcesses; i++ ) { + /* is in file set that raised flag? */ + if (FD_ISSET(fd_pipeIn[i][0],&fdset)) { + /* read from pipe and return number of bytes read */ + if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) { + ERROR("read"); + } else { + qargs.n_indexable += atoi(buffR); + qargs.n_processed++; + /* write to pipe */ + if ((nextImage = get_pattern(fh)) == NULL){ + nFinished++; /* no more images */ + if ( nFinished == nProcesses ) + allDone = 1; /* EXIT */ } else { - qargs.n_indexable += atoi(buffR); - qargs.n_processed++; - /* write to pipe */ - if ((nextImage = get_pattern(fh)) == NULL){ - nFinished++; /* no more images */ - if ( nFinished == nProcesses ) - allDone = 1; /* EXIT */ - } else { - /* send out image */ - buff_count = sprintf(buffW, "%s",nextImage); - if (write (fd_pipeOut[i][1], buffW, buff_count)<0) - perror("write pipe"); - } + /* send out image */ + buff_count = sprintf(buffW, "%s",nextImage); + if (write (fd_pipeOut[i][1], buffW, buff_count)<0) + ERROR("write pipe"); } } } } - /* file set is modified, so copy original from tmpset */ - memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set)); - - /* Update to screen */ - double tNow = get_monotonic_seconds(); - if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n\n", - qargs.n_indexable, qargs.n_processed, - qargs.n_indexable - qargs.n_indexable_last_stats, - qargs.n_processed - qargs.n_processed_last_stats); - - qargs.n_indexable_last_stats = qargs.n_indexable; - qargs.n_processed_last_stats = qargs.n_processed; - qargs.t_last_stats = tNow; - } } - /* close my pipes */ - for ( i=0; i<nProcesses; i++ ) { - close(fd_pipeIn[i][0]); - close(fd_pipeOut[i][1]); + /* file set is modified, so copy original from tmpset */ + memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set)); + + /* Update to screen */ + double tNow = get_monotonic_seconds(); + if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + qargs.n_indexable, qargs.n_processed, + qargs.n_indexable - qargs.n_indexable_last_stats, + qargs.n_processed - qargs.n_processed_last_stats); + + qargs.n_indexable_last_stats = qargs.n_indexable; + qargs.n_processed_last_stats = qargs.n_processed; + qargs.t_last_stats = tNow; } - tEnd = get_monotonic_seconds(); - printf("Compute Time: %.2fs\n", tEnd - tStart); - } else { - while(!allDone){ - /* read from pipe and return number of bytes read */ - if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { - perror("read1"); - } else if (buff_count > 0) { - /* process image */ - pargs.filename = buffR; - pargs.indexable = 0; - process_image(&qargs, &pargs, batchNum); - /* request another image */ - buff_count = sprintf(buffW, "%d\n", pargs.indexable); - if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) - perror("write P0"); - } else if (buff_count == 0) { - allDone = 1; /* EXIT */ - } - } - /* close my pipes */ - close(fd_pipeIn[batchNum-1][1]); - close(fd_pipeOut[batchNum-1][0]); } + /* close my pipes */ + for ( i=0; i<nProcesses; i++ ) { + close(fd_pipeIn[i][0]); + close(fd_pipeOut[i][1]); + } + tEnd = get_monotonic_seconds(); cleanup_indexing(ipriv); |