From 412a3212048ed60645b1b31923b6720d47af9826 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 22 May 2012 17:44:52 +0200 Subject: WIP on tidy-up --- src/indexamajig.c | 423 +++++++++++++++++++++++------------------------------- 1 file changed, 181 insertions(+), 242 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index f4bbb72e..ed84c6a8 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -81,7 +81,7 @@ enum { /* Information about the indexing process which is common to all patterns */ -struct static_index_args +struct index_args { UnitCell *cell; int config_cmfilter; @@ -110,7 +110,6 @@ struct static_index_args double ir_out; /* Output stream */ - pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; const struct copy_hdf5_field *copyme; char *outfile; @@ -118,36 +117,16 @@ struct static_index_args /* Information about the indexing process for one pattern */ -struct index_args +struct pattern_args { /* "Input" */ char *filename; - struct static_index_args static_args; /* "Output" */ int indexable; }; -/* Information needed to choose the next task and dispatch it */ -struct queue_args -{ - FILE *fh; - char *prefix; - int config_basename; - struct static_index_args static_args; - - char *use_this_one_instead; - - int n_indexable; - int n_processed; - int n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; - int updateReader; -}; - - static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -246,16 +225,16 @@ static void show_help(const char *s) } -// Get next pattern in .lst -char* get_pattern(FILE *fh) { - char *rval; - char line[LINE_LENGTH]; - rval = fgets(line, LINE_LENGTH - 1, fh); - if (ferror(fh)) { - printf("Read error\n"); - rval = NULL; - } - return rval; +static char *get_pattern(FILE *fh) +{ + char *rval; + char line[LINE_LENGTH]; + rval = fgets(line, LINE_LENGTH - 1, fh); + if ( ferror(fh) ) { + ERROR("Failed to get next filename from list.\n"); + rval = NULL; + } + return rval; } @@ -416,11 +395,11 @@ static void process_image(void *qp, void *pp, int cookie) outfilename = malloc(strlen(outfile) + 1); snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile); if ((fd = open(outfilename, O_WRONLY)) == -1) { - perror("Error on opening\n"); + ERROR("Error on opening\n"); exit(1); } if (fcntl(fd, F_SETLKW, &fl) == -1) { - perror("Error on setting lock wait\n"); + ERROR("Error on setting lock wait\n"); exit(1); } @@ -428,7 +407,7 @@ static void process_image(void *qp, void *pp, int cookie) FILE *fh; fh = fopen(outfilename, "a"); if (fh == NULL) { - perror("Error inside lock\n"); + ERROR("Error inside lock\n"); } write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); fclose(fh); @@ -436,7 +415,7 @@ static void process_image(void *qp, void *pp, int cookie) /* Unlock stream for other processes */ fl.l_type = F_UNLCK; /* set to unlock same region */ if (fcntl(fd, F_SETLK, &fl) == -1) { - perror("fcntl"); + ERROR("fcntl"); exit(1); } close(fd); @@ -456,6 +435,36 @@ static void process_image(void *qp, void *pp, int cookie) } +static void run_work(const struct index_args *iargs, + int filename_pipe, int results_pipe) +{ + int allDone = 0; + + while ( !allDone ) { + + /* read from pipe and return number of bytes read */ + if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { + ERROR("read1"); + } else if (buff_count > 0) { + /* process image */ + pargs.filename = buffR; + pargs.indexable = 0; + process_image(&qargs, &pargs, batchNum); + /* request another image */ + buff_count = sprintf(buffW, "%d\n", pargs.indexable); + if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) + ERROR("write P0"); + } else if (buff_count == 0) { + allDone = 1; + } + + } + /* close my pipes */ + close(filename_pipe); + close(results_pipe); +} + + #ifdef HAVE_CLOCK_GETTIME static time_t get_monotonic_seconds() @@ -542,7 +551,7 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nProcesses = 1; + int n_proc = 1; char *prepare_line; char prepare_filename[LINE_LENGTH]; struct queue_args qargs; @@ -558,6 +567,12 @@ int main(int argc, char *argv[]) float ir_inn = 4.0; float ir_mid = 5.0; float ir_out = 7.0; + int n_indexable, n_processed, n_indexable_last_stats; + int n_processed_last_stats; + int t_last_stats; + pid_t *pids; + int *filename_pipes; + int *result_pipes; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -638,7 +653,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nProcesses = atoi(optarg); + n_proc = atoi(optarg); break; case 'g' : @@ -909,7 +924,7 @@ int main(int argc, char *argv[]) prepare_line = tmp; } snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line); - qargs.use_this_one_instead = prepare_line; + rewind(fh); /* Prepare the indexer */ if ( indm != NULL ) { @@ -925,223 +940,147 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); - qargs.static_args.cell = cell; - qargs.static_args.config_cmfilter = config_cmfilter; - qargs.static_args.config_noisefilter = config_noisefilter; - qargs.static_args.config_verbose = config_verbose; - qargs.static_args.config_satcorr = config_satcorr; - qargs.static_args.config_closer = config_closer; - qargs.static_args.config_insane = config_insane; - qargs.static_args.config_bgsub = config_bgsub; - qargs.static_args.cellr = cellr; - qargs.static_args.tols[0] = tols[0]; - qargs.static_args.tols[1] = tols[1]; - qargs.static_args.tols[2] = tols[2]; - qargs.static_args.tols[3] = tols[3]; - qargs.static_args.threshold = threshold; - qargs.static_args.min_gradient = min_gradient; - qargs.static_args.min_snr = min_snr; - qargs.static_args.min_int_snr = min_int_snr; - qargs.static_args.det = det; - qargs.static_args.indm = indm; - qargs.static_args.ipriv = ipriv; - qargs.static_args.peaks = peaks; - qargs.static_args.ofh = ofh; - qargs.static_args.beam = beam; - qargs.static_args.element = element; - qargs.static_args.stream_flags = stream_flags; - qargs.static_args.hdf5_peak_path = hdf5_peak_path; - qargs.static_args.copyme = copyme; - qargs.static_args.ir_inn = ir_inn; - qargs.static_args.ir_mid = ir_mid; - qargs.static_args.ir_out = ir_out; - - qargs.fh = fh; - qargs.prefix = prefix; - qargs.config_basename = config_basename; - qargs.n_indexable = 0; - qargs.n_processed = 0; - qargs.n_indexable_last_stats = 0; - qargs.n_processed_last_stats = 0; - qargs.updateReader = 0; /* first process updates */ - qargs.t_last_stats = get_monotonic_seconds(); - - /* Read .lst file */ - register int i; - rewind(fh); /* make sure to read from start */ - - /* Clear output file content */ - char *myOutfilename = NULL; - chomp(prefix); - chomp(outfile); - myOutfilename = malloc(strlen(outfile) + 1); - snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile); - FILE *tfh; - tfh = fopen(myOutfilename, "a+"); - if (tfh == NULL) { - ERROR("No output filename\n"); - } - fclose(tfh); - qargs.static_args.outfile = outfile; - int ready_fd; - int buff_count; - fd_set fdset,tmpset; - char buffR[BUFFER], buffW[BUFFER]; - int fd_pipeIn[nProcesses][2]; /* Process0 In */ - int fd_pipeOut[nProcesses][2]; /* Process0 Out */ - unsigned int opts; - - FD_ZERO(&fdset); /* clear the fd_set */ - /* set pipeIn as non-blocking */ - for ( i=0; i max_fd) { /* find max_fd */ - max_fd = fd_pipeIn[i][0]; + p = fork(); + if ( p == -1 ) { + ERROR("fork() failed!\n"); + return 1; } - } - max_fd = max_fd+1; - /* copy file set to tmpset */ - memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set)); - - /**** FORKING ****/ - int power = 10; /* 2^power must be larger than nProcesses */ - int pid[power]; - double num = 0; - int batchNum = 0; - /* Fork 2^power times */ - for ( i=0; i= nProcesses + 1) { - exit(0); /* kill */ - } - batchNum = (int) num; - /**** PLUMBING ****/ - if (batchNum == qargs.updateReader) { - for ( i=0; i 0) { - for ( i=0; i 0) { + for ( i=0; i= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n\n", - qargs.n_indexable, qargs.n_processed, - qargs.n_indexable - qargs.n_indexable_last_stats, - qargs.n_processed - qargs.n_processed_last_stats); - - qargs.n_indexable_last_stats = qargs.n_indexable; - qargs.n_processed_last_stats = qargs.n_processed; - qargs.t_last_stats = tNow; - } } - /* close my pipes */ - for ( i=0; i= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + qargs.n_indexable, qargs.n_processed, + qargs.n_indexable - qargs.n_indexable_last_stats, + qargs.n_processed - qargs.n_processed_last_stats); + + qargs.n_indexable_last_stats = qargs.n_indexable; + qargs.n_processed_last_stats = qargs.n_processed; + qargs.t_last_stats = tNow; } - tEnd = get_monotonic_seconds(); - printf("Compute Time: %.2fs\n", tEnd - tStart); - } else { - while(!allDone){ - /* read from pipe and return number of bytes read */ - if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { - perror("read1"); - } else if (buff_count > 0) { - /* process image */ - pargs.filename = buffR; - pargs.indexable = 0; - process_image(&qargs, &pargs, batchNum); - /* request another image */ - buff_count = sprintf(buffW, "%d\n", pargs.indexable); - if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) - perror("write P0"); - } else if (buff_count == 0) { - allDone = 1; /* EXIT */ - } - } - /* close my pipes */ - close(fd_pipeIn[batchNum-1][1]); - close(fd_pipeOut[batchNum-1][0]); } + /* close my pipes */ + for ( i=0; i