aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2012-05-22 17:44:52 +0200
committerThomas White <taw@physics.org>2012-05-22 17:44:52 +0200
commit412a3212048ed60645b1b31923b6720d47af9826 (patch)
treec6f36cddab224c6079e3ad289db25db9bbd21835 /src
parentde56cd77205fba6ea388cf209017a2cd1323a9a1 (diff)
WIP on tidy-up
Diffstat (limited to 'src')
-rw-r--r--src/indexamajig.c423
1 files changed, 181 insertions, 242 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c
index f4bbb72e..ed84c6a8 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -81,7 +81,7 @@ enum {
/* Information about the indexing process which is common to all patterns */
-struct static_index_args
+struct index_args
{
UnitCell *cell;
int config_cmfilter;
@@ -110,7 +110,6 @@ struct static_index_args
double ir_out;
/* Output stream */
- pthread_mutex_t *output_mutex; /* Protects the output stream */
FILE *ofh;
const struct copy_hdf5_field *copyme;
char *outfile;
@@ -118,36 +117,16 @@ struct static_index_args
/* Information about the indexing process for one pattern */
-struct index_args
+struct pattern_args
{
/* "Input" */
char *filename;
- struct static_index_args static_args;
/* "Output" */
int indexable;
};
-/* Information needed to choose the next task and dispatch it */
-struct queue_args
-{
- FILE *fh;
- char *prefix;
- int config_basename;
- struct static_index_args static_args;
-
- char *use_this_one_instead;
-
- int n_indexable;
- int n_processed;
- int n_indexable_last_stats;
- int n_processed_last_stats;
- int t_last_stats;
- int updateReader;
-};
-
-
static void show_help(const char *s)
{
printf("Syntax: %s [options]\n\n", s);
@@ -246,16 +225,16 @@ static void show_help(const char *s)
}
-// Get next pattern in .lst
-char* get_pattern(FILE *fh) {
- char *rval;
- char line[LINE_LENGTH];
- rval = fgets(line, LINE_LENGTH - 1, fh);
- if (ferror(fh)) {
- printf("Read error\n");
- rval = NULL;
- }
- return rval;
+static char *get_pattern(FILE *fh)
+{
+ char *rval;
+ char line[LINE_LENGTH];
+ rval = fgets(line, LINE_LENGTH - 1, fh);
+ if ( ferror(fh) ) {
+ ERROR("Failed to get next filename from list.\n");
+ rval = NULL;
+ }
+ return rval;
}
@@ -416,11 +395,11 @@ static void process_image(void *qp, void *pp, int cookie)
outfilename = malloc(strlen(outfile) + 1);
snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile);
if ((fd = open(outfilename, O_WRONLY)) == -1) {
- perror("Error on opening\n");
+ ERROR("Error on opening\n");
exit(1);
}
if (fcntl(fd, F_SETLKW, &fl) == -1) {
- perror("Error on setting lock wait\n");
+ ERROR("Error on setting lock wait\n");
exit(1);
}
@@ -428,7 +407,7 @@ static void process_image(void *qp, void *pp, int cookie)
FILE *fh;
fh = fopen(outfilename, "a");
if (fh == NULL) {
- perror("Error inside lock\n");
+ ERROR("Error inside lock\n");
}
write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags);
fclose(fh);
@@ -436,7 +415,7 @@ static void process_image(void *qp, void *pp, int cookie)
/* Unlock stream for other processes */
fl.l_type = F_UNLCK; /* set to unlock same region */
if (fcntl(fd, F_SETLK, &fl) == -1) {
- perror("fcntl");
+ ERROR("fcntl");
exit(1);
}
close(fd);
@@ -456,6 +435,36 @@ static void process_image(void *qp, void *pp, int cookie)
}
+static void run_work(const struct index_args *iargs,
+ int filename_pipe, int results_pipe)
+{
+ int allDone = 0;
+
+ while ( !allDone ) {
+
+ /* read from pipe and return number of bytes read */
+ if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) {
+ ERROR("read1");
+ } else if (buff_count > 0) {
+ /* process image */
+ pargs.filename = buffR;
+ pargs.indexable = 0;
+ process_image(&qargs, &pargs, batchNum);
+ /* request another image */
+ buff_count = sprintf(buffW, "%d\n", pargs.indexable);
+ if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0)
+ ERROR("write P0");
+ } else if (buff_count == 0) {
+ allDone = 1;
+ }
+
+ }
+ /* close my pipes */
+ close(filename_pipe);
+ close(results_pipe);
+}
+
+
#ifdef HAVE_CLOCK_GETTIME
static time_t get_monotonic_seconds()
@@ -542,7 +551,7 @@ int main(int argc, char *argv[])
float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */
int cellr;
int peaks;
- int nProcesses = 1;
+ int n_proc = 1;
char *prepare_line;
char prepare_filename[LINE_LENGTH];
struct queue_args qargs;
@@ -558,6 +567,12 @@ int main(int argc, char *argv[])
float ir_inn = 4.0;
float ir_mid = 5.0;
float ir_out = 7.0;
+ int n_indexable, n_processed, n_indexable_last_stats;
+ int n_processed_last_stats;
+ int t_last_stats;
+ pid_t *pids;
+ int *filename_pipes;
+ int *result_pipes;
copyme = new_copy_hdf5_field_list();
if ( copyme == NULL ) {
@@ -638,7 +653,7 @@ int main(int argc, char *argv[])
break;
case 'j' :
- nProcesses = atoi(optarg);
+ n_proc = atoi(optarg);
break;
case 'g' :
@@ -909,7 +924,7 @@ int main(int argc, char *argv[])
prepare_line = tmp;
}
snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line);
- qargs.use_this_one_instead = prepare_line;
+ rewind(fh);
/* Prepare the indexer */
if ( indm != NULL ) {
@@ -925,223 +940,147 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
- qargs.static_args.cell = cell;
- qargs.static_args.config_cmfilter = config_cmfilter;
- qargs.static_args.config_noisefilter = config_noisefilter;
- qargs.static_args.config_verbose = config_verbose;
- qargs.static_args.config_satcorr = config_satcorr;
- qargs.static_args.config_closer = config_closer;
- qargs.static_args.config_insane = config_insane;
- qargs.static_args.config_bgsub = config_bgsub;
- qargs.static_args.cellr = cellr;
- qargs.static_args.tols[0] = tols[0];
- qargs.static_args.tols[1] = tols[1];
- qargs.static_args.tols[2] = tols[2];
- qargs.static_args.tols[3] = tols[3];
- qargs.static_args.threshold = threshold;
- qargs.static_args.min_gradient = min_gradient;
- qargs.static_args.min_snr = min_snr;
- qargs.static_args.min_int_snr = min_int_snr;
- qargs.static_args.det = det;
- qargs.static_args.indm = indm;
- qargs.static_args.ipriv = ipriv;
- qargs.static_args.peaks = peaks;
- qargs.static_args.ofh = ofh;
- qargs.static_args.beam = beam;
- qargs.static_args.element = element;
- qargs.static_args.stream_flags = stream_flags;
- qargs.static_args.hdf5_peak_path = hdf5_peak_path;
- qargs.static_args.copyme = copyme;
- qargs.static_args.ir_inn = ir_inn;
- qargs.static_args.ir_mid = ir_mid;
- qargs.static_args.ir_out = ir_out;
-
- qargs.fh = fh;
- qargs.prefix = prefix;
- qargs.config_basename = config_basename;
- qargs.n_indexable = 0;
- qargs.n_processed = 0;
- qargs.n_indexable_last_stats = 0;
- qargs.n_processed_last_stats = 0;
- qargs.updateReader = 0; /* first process updates */
- qargs.t_last_stats = get_monotonic_seconds();
-
- /* Read .lst file */
- register int i;
- rewind(fh); /* make sure to read from start */
-
- /* Clear output file content */
- char *myOutfilename = NULL;
- chomp(prefix);
- chomp(outfile);
- myOutfilename = malloc(strlen(outfile) + 1);
- snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile);
- FILE *tfh;
- tfh = fopen(myOutfilename, "a+");
- if (tfh == NULL) {
- ERROR("No output filename\n");
- }
- fclose(tfh);
- qargs.static_args.outfile = outfile;
- int ready_fd;
- int buff_count;
- fd_set fdset,tmpset;
- char buffR[BUFFER], buffW[BUFFER];
- int fd_pipeIn[nProcesses][2]; /* Process0 In */
- int fd_pipeOut[nProcesses][2]; /* Process0 Out */
- unsigned int opts;
-
- FD_ZERO(&fdset); /* clear the fd_set */
- /* set pipeIn as non-blocking */
- for ( i=0; i<nProcesses; i++ ) {
- opts = fcntl(fd_pipeIn[i][0], F_GETFL);
- fcntl(fd_pipeIn[i][0], F_SETFL, opts | O_NONBLOCK);
- }
+ /* Static worker args */
+ iargs.cell = cell;
+ iargs.config_cmfilter = config_cmfilter;
+ iargs.config_noisefilter = config_noisefilter;
+ iargs.config_verbose = config_verbose;
+ iargs.config_satcorr = config_satcorr;
+ iargs.config_closer = config_closer;
+ iargs.config_insane = config_insane;
+ iargs.config_bgsub = config_bgsub;
+ iargs.cellr = cellr;
+ iargs.tols[0] = tols[0];
+ iargs.tols[1] = tols[1];
+ iargs.tols[2] = tols[2];
+ iargs.tols[3] = tols[3];
+ iargs.threshold = threshold;
+ iargs.min_gradient = min_gradient;
+ iargs.min_snr = min_snr;
+ iargs.min_int_snr = min_int_snr;
+ iargs.det = det;
+ iargs.indm = indm;
+ iargs.ipriv = ipriv;
+ iargs.peaks = peaks;
+ iargs.ofh = ofh;
+ iargs.beam = beam;
+ iargs.element = element;
+ iargs.stream_flags = stream_flags;
+ iargs.hdf5_peak_path = hdf5_peak_path;
+ iargs.copyme = copyme;
+ iargs.ir_inn = ir_inn;
+ iargs.ir_mid = ir_mid;
+ iargs.ir_out = ir_out;
+ iargs.outfile = outfile;
+
+ n_indexable = 0;
+ n_processed = 0;
+ n_indexable_last_stats = 0;
+ n_processed_last_stats = 0;
+ t_last_stats = get_monotonic_seconds();
+
+ /* Fork the right number of times */
+ for ( i=0; i<n_proc; i++ ) {
+
+ pid_t p;
+ int filename_pipe[2];
+ int result_pipe[2];
+
+ if ( pipe(filename_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return 1;
+ }
- /**** PIPING ****/
- for ( i=0; i<nProcesses; i++ ) {
- pipe(fd_pipeIn[i]);
- pipe(fd_pipeOut[i]);
- }
+ if ( pipe(results_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return 1;
+ }
- int max_fd = 0;
- for ( i=0; i<nProcesses; i++ ) {
- FD_SET(fd_pipeIn[i][0], &fdset);
- if (fd_pipeIn[i][0] > max_fd) { /* find max_fd */
- max_fd = fd_pipeIn[i][0];
+ p = fork();
+ if ( p == -1 ) {
+ ERROR("fork() failed!\n");
+ return 1;
}
- }
- max_fd = max_fd+1;
- /* copy file set to tmpset */
- memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set));
-
- /**** FORKING ****/
- int power = 10; /* 2^power must be larger than nProcesses */
- int pid[power];
- double num = 0;
- int batchNum = 0;
- /* Fork 2^power times */
- for ( i=0; i<power; i++ ) {
- pid[i] = fork();
- }
- /* Assign id */
- for ( i=0; i<power; i++ ) {
- if (pid[i] == 0) { /* keep parents and kill off children */
- num += pow(2, i);
+
+ if ( p == 0 ) {
+ /* Child process gets the 'read' end of the filename
+ * pipe, and the 'write' end of the result pipe. */
+ close(filename_pipe[1]);
+ close(result_pipe[0]);
+ run_work(&iargs, filename_pipe[0], result_pipe[1]);
+ exit(0);
}
+
+ /* Parent process gets the 'write' end of the filename pipe
+ * and the 'read' end of the result pipe. */
+ pid[i] = p;
+ close(filename_pipe[0]);
+ close(result_pipe[1]);
+ filename_pipes[i] = filename_pipe[1];
+ result_pipes[i] = result_pipe[0];
+
}
- /* Kill if batchNum too high */
- if (num >= nProcesses + 1) {
- exit(0); /* kill */
- }
- batchNum = (int) num;
- /**** PLUMBING ****/
- if (batchNum == qargs.updateReader) {
- for ( i=0; i<nProcesses; i++ ) {
- close(fd_pipeIn[i][1]); /* close all write pipes In */
- close(fd_pipeOut[i][0]); /* close all read pipes Out */
- }
- } else {
- for ( i=0; i<nProcesses; i++ ) {
- if (i == batchNum - 1) { /* batchNum = 1,2,3 ... */
- close(fd_pipeIn[i][0]); /* close read pipe In */
- close(fd_pipeOut[i][1]); /* close write pipe Out */
- } else {
- close(fd_pipeIn[i][0]); // close remaining pipes In
- close(fd_pipeIn[i][1]);
- close(fd_pipeOut[i][0]); // close remaining pipes Out
- close(fd_pipeOut[i][1]);
- }
- }
+ /* Send first image to all children */
+ char *nextImage = NULL;
+ for ( i=0; i<nProcesses; i++ ) {
+ nextImage = get_pattern(fh);
+ buff_count = sprintf(buffW, "%s",nextImage);
+ write (fd_pipeOut[i][1], buffW, buff_count);
}
- /**** INDEXING ****/
- double tStart, tEnd;
- tStart = get_monotonic_seconds();
- int allDone = 0;
- if (batchNum == qargs.updateReader){
- char *nextImage = NULL;
- for ( i=0; i<nProcesses; i++ ) { /* Send out image to all processes*/
- nextImage = get_pattern(fh);
- buff_count = sprintf(buffW, "%s",nextImage);
- write (fd_pipeOut[i][1], buffW, buff_count);
- }
- int nFinished = 0;
- while (!allDone) {
- /* select from file set for reading */
- if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0)
- perror("select");
- if (ready_fd > 0) {
- for ( i=0; i<nProcesses; i++ ) {
- /* is in file set that raised flag? */
- if (FD_ISSET(fd_pipeIn[i][0],&fdset)) {
- /* read from pipe and return number of bytes read */
- if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) {
- perror("read");
+ int nFinished = 0;
+ while (!allDone) {
+ /* select from file set for reading */
+ if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0)
+ ERROR("select");
+ if (ready_fd > 0) {
+ for ( i=0; i<nProcesses; i++ ) {
+ /* is in file set that raised flag? */
+ if (FD_ISSET(fd_pipeIn[i][0],&fdset)) {
+ /* read from pipe and return number of bytes read */
+ if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) {
+ ERROR("read");
+ } else {
+ qargs.n_indexable += atoi(buffR);
+ qargs.n_processed++;
+ /* write to pipe */
+ if ((nextImage = get_pattern(fh)) == NULL){
+ nFinished++; /* no more images */
+ if ( nFinished == nProcesses )
+ allDone = 1; /* EXIT */
} else {
- qargs.n_indexable += atoi(buffR);
- qargs.n_processed++;
- /* write to pipe */
- if ((nextImage = get_pattern(fh)) == NULL){
- nFinished++; /* no more images */
- if ( nFinished == nProcesses )
- allDone = 1; /* EXIT */
- } else {
- /* send out image */
- buff_count = sprintf(buffW, "%s",nextImage);
- if (write (fd_pipeOut[i][1], buffW, buff_count)<0)
- perror("write pipe");
- }
+ /* send out image */
+ buff_count = sprintf(buffW, "%s",nextImage);
+ if (write (fd_pipeOut[i][1], buffW, buff_count)<0)
+ ERROR("write pipe");
}
}
}
}
- /* file set is modified, so copy original from tmpset */
- memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set));
-
- /* Update to screen */
- double tNow = get_monotonic_seconds();
- if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) {
- STATUS("%i out of %i indexed so far,"
- " %i out of %i since the last message.\n\n",
- qargs.n_indexable, qargs.n_processed,
- qargs.n_indexable - qargs.n_indexable_last_stats,
- qargs.n_processed - qargs.n_processed_last_stats);
-
- qargs.n_indexable_last_stats = qargs.n_indexable;
- qargs.n_processed_last_stats = qargs.n_processed;
- qargs.t_last_stats = tNow;
- }
}
- /* close my pipes */
- for ( i=0; i<nProcesses; i++ ) {
- close(fd_pipeIn[i][0]);
- close(fd_pipeOut[i][1]);
+ /* file set is modified, so copy original from tmpset */
+ memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set));
+
+ /* Update to screen */
+ double tNow = get_monotonic_seconds();
+ if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) {
+ STATUS("%i out of %i indexed so far,"
+ " %i out of %i since the last message.\n\n",
+ qargs.n_indexable, qargs.n_processed,
+ qargs.n_indexable - qargs.n_indexable_last_stats,
+ qargs.n_processed - qargs.n_processed_last_stats);
+
+ qargs.n_indexable_last_stats = qargs.n_indexable;
+ qargs.n_processed_last_stats = qargs.n_processed;
+ qargs.t_last_stats = tNow;
}
- tEnd = get_monotonic_seconds();
- printf("Compute Time: %.2fs\n", tEnd - tStart);
- } else {
- while(!allDone){
- /* read from pipe and return number of bytes read */
- if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) {
- perror("read1");
- } else if (buff_count > 0) {
- /* process image */
- pargs.filename = buffR;
- pargs.indexable = 0;
- process_image(&qargs, &pargs, batchNum);
- /* request another image */
- buff_count = sprintf(buffW, "%d\n", pargs.indexable);
- if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0)
- perror("write P0");
- } else if (buff_count == 0) {
- allDone = 1; /* EXIT */
- }
- }
- /* close my pipes */
- close(fd_pipeIn[batchNum-1][1]);
- close(fd_pipeOut[batchNum-1][0]);
}
+ /* close my pipes */
+ for ( i=0; i<nProcesses; i++ ) {
+ close(fd_pipeIn[i][0]);
+ close(fd_pipeOut[i][1]);
+ }
+ tEnd = get_monotonic_seconds();
cleanup_indexing(ipriv);