diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/indexamajig.c | 275 |
1 files changed, 209 insertions, 66 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c index b09a00aa..3e8ab4de 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -106,9 +106,7 @@ struct index_args double ir_out; /* Output stream */ - FILE *ofh; const struct copy_hdf5_field *copyme; - char *outfile; }; @@ -227,26 +225,30 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, char *line; char *filename; - /* Get the next filename */ - if ( *use_this_one_instead != NULL ) { + do { - line = *use_this_one_instead; - *use_this_one_instead = NULL; + /* Get the next filename */ + if ( *use_this_one_instead != NULL ) { - } else { + line = *use_this_one_instead; + *use_this_one_instead = NULL; - char *rval; + } else { + + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - return NULL; } - } + chomp(line); - chomp(line); + } while ( strlen(line) == 0 ); if ( config_basename ) { char *tmp; @@ -266,7 +268,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, static void process_image(const struct index_args *iargs, - struct pattern_args *pargs, int cookie) + struct pattern_args *pargs, FILE *ofh, + int cookie) { float *data_for_measurement; size_t data_size; @@ -278,12 +281,7 @@ static void process_image(const struct index_args *iargs, struct beam_params *beam = iargs->beam; int r, check; struct hdfile *hdfile; - char *outfile = iargs->outfile; struct image image; - char *outfilename = iargs->outfile; - int fd; - FILE *fh; - struct flock fl; image.features = NULL; image.data = NULL; @@ -408,38 +406,9 @@ static void process_image(const struct index_args *iargs, image.reflections = NULL; } - /* Write Lock */ - fl.l_type = F_WRLCK; - fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = 0; /* Means "lock the whole file" */ - - fd = open(outfile, O_WRONLY); - if ( fd == -1 ) { - ERROR("Couldn't open output stream ('%s').\n", outfile); - exit(1); - } - if ( fcntl(fd, F_SETLKW, &fl) == -1 ) { - ERROR("Couldn't get lock on output stream.\n"); - exit(1); - } - - fh = fdopen(fd, "a"); - if ( fh == NULL ) { - ERROR("Couldn't fdopen() the output stream.\n"); - exit(1); - } - write_chunk(fh, &image, hdfile, iargs->stream_flags); - fflush(fh); - - /* Unlock stream for other processes */ - fl.l_type = F_UNLCK; /* set to unlock same region */ - if ( fcntl(fd, F_SETLK, &fl) == -1 ) { - ERROR("fcntl"); - exit(1); - } - - fclose(fh); /* close(fd) happens as well because fd was not dup'd */ + write_chunk(ofh, &image, hdfile, iargs->stream_flags); + fprintf(ofh, "END\n"); + fflush(ofh); /* Only free cell if found */ cell_free(image.indexed_cell); @@ -454,7 +423,8 @@ static void process_image(const struct index_args *iargs, static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, int cookie) + int filename_pipe, int results_pipe, FILE *ofh, + int cookie) { int allDone = 0; FILE *fh; @@ -489,16 +459,25 @@ static void run_work(const struct index_args *iargs, } chomp(line); - pargs.filename = line; - pargs.indexable = 0; - process_image(iargs, &pargs, cookie); + if ( strlen(line) == 0 ) { + + allDone = 1; + + } else { + + pargs.filename = line; + pargs.indexable = 0; + + process_image(iargs, &pargs, ofh, cookie); + + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { + ERROR("write P0\n"); + } - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0\n"); } free(line); @@ -558,6 +537,118 @@ static int parse_cell_reduction(const char *scellr, int *err, } +static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) +{ + int done = 0; + int *finished; + FILE **fhs; + int i; + int chunk_finished; + + finished = calloc(n_proc, sizeof(int)); + if ( finished == NULL ) { + ERROR("Couldn't allocate memory for flags!\n"); + exit(1); + } + + fhs = calloc(n_proc, sizeof(FILE *)); + if ( fhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + exit(1); + } + + for ( i=0; i<n_proc; i++ ) { + fhs[i] = fdopen(stream_pipe_read[i], "r"); + if ( fhs[i] == NULL ) { + ERROR("Couldn't fdopen() stream!\n"); + exit(1); + } + } + + while ( !done ) { + + int r, i; + struct timeval tv; + fd_set fds; + int fdmax; + + tv.tv_sec = 5; + tv.tv_usec = 0; + + FD_ZERO(&fds); + fdmax = 0; + for ( i=0; i<n_proc; i++ ) { + + int fd; + + if ( finished[i] ) continue; + + fd = stream_pipe_read[i]; + + FD_SET(fd, &fds); + if ( fd > fdmax ) fdmax = fd; + + } + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + + if ( r == -1 ) { + ERROR("select() failed: %s\n", strerror(errno)); + continue; + } + + if ( r == 0 ) continue; /* Nothing this time. Try again */ + + for ( i=0; i<n_proc; i++ ) { + + if ( finished[i] ) continue; + + if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue; + + chunk_finished = 0; + do { + + char line[1024]; + char *rval; + + rval = fgets(line, 1024, fhs[i]); + if ( rval == NULL ) { + if ( feof(fhs[i]) ) { + /* Process died */ + finished[i] = 1; + } else { + ERROR("fgets() failed: %s\n", + strerror(errno)); + } + continue; + } + + + if ( strcmp(line, "END\n") == 0 ) { + chunk_finished = 1; + } else { + fprintf(ofh, "%s", line); + } + + } while ( !chunk_finished ); + + } + + done = 1; + for ( i=0; i<n_proc; i++ ) { + if ( !finished[i] ) done = 0; + } + + } + + free(finished); + + for ( i=0; i<n_proc; i++ ) { + fclose(fhs[i]); + } +} + + int main(int argc, char *argv[]) { int c; @@ -616,11 +707,14 @@ int main(int argc, char *argv[]) int t_last_stats; pid_t *pids; int *filename_pipes; + int *stream_pipe_read; + int *stream_pipe_write; FILE **result_fhs; fd_set fds; int i; int allDone; int *finished; + pid_t pr; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -1007,7 +1101,6 @@ int main(int argc, char *argv[]) iargs.indm = indm; iargs.ipriv = ipriv; iargs.peaks = peaks; - iargs.ofh = ofh; iargs.beam = beam; iargs.element = element; iargs.stream_flags = stream_flags; @@ -1016,7 +1109,6 @@ int main(int argc, char *argv[]) iargs.ir_inn = ir_inn; iargs.ir_mid = ir_mid; iargs.ir_out = ir_out; - iargs.outfile = outfile; n_indexable = 0; n_processed = 0; @@ -1027,6 +1119,8 @@ int main(int argc, char *argv[]) FD_ZERO(&fds); filename_pipes = calloc(n_proc, sizeof(int)); result_fhs = calloc(n_proc, sizeof(FILE *)); + stream_pipe_read = calloc(n_proc, sizeof(int)); + stream_pipe_write = calloc(n_proc, sizeof(int)); if ( filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return 1; @@ -1035,6 +1129,14 @@ int main(int argc, char *argv[]) ERROR("Couldn't allocate memory for pipe file handles.\n"); return 1; } + if ( stream_pipe_read == NULL ) { + ERROR("Couldn't allocate memory for pipes.\n"); + return 1; + } + if ( stream_pipe_write == NULL ) { + ERROR("Couldn't allocate memory for pipes.\n"); + return 1; + } pids = calloc(n_proc, sizeof(pid_t)); if ( pids == NULL ) { @@ -1048,6 +1150,28 @@ int main(int argc, char *argv[]) return 1; } + for ( i=0; i<n_proc; i++ ) { + + int stream_pipe[2]; + + if ( pipe(stream_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return 1; + } + + stream_pipe_read[i] = stream_pipe[0]; + stream_pipe_write[i] = stream_pipe[1]; + + } + + pr = fork(); + if ( pr == - 1 ) { + ERROR("fork() failed (for reader process)\n"); + return 1; + } + + if ( pr == 0 ) run_reader(stream_pipe_read, n_proc, ofh); + /* Fork the right number of times */ for ( i=0; i<n_proc; i++ ) { @@ -1072,11 +1196,19 @@ int main(int argc, char *argv[]) } if ( p == 0 ) { + + FILE *fh; + /* Child process gets the 'read' end of the filename * pipe, and the 'write' end of the result pipe. */ close(filename_pipe[1]); close(result_pipe[0]); - run_work(&iargs, filename_pipe[0], result_pipe[1], i); + + fh = fdopen(stream_pipe_write[i], "w"); + run_work(&iargs, filename_pipe[0], result_pipe[1], + fh, i); + fclose(fh); + exit(0); } @@ -1190,7 +1322,10 @@ int main(int argc, char *argv[]) if ( nextImage == NULL ) { /* No more images */ - finished[i] = 1; + r = write(filename_pipes[i], "\n", 1); + if ( r < 0 ) { + ERROR("Write pipe\n"); + } } else { r = write(filename_pipes[i], nextImage, @@ -1227,11 +1362,19 @@ int main(int argc, char *argv[]) } for ( i=0; i<n_proc; i++ ) { + int status; + waitpid(pids[i], &status, 0); + } + + for ( i=0; i<n_proc; i++ ) { close(filename_pipes[i]); fclose(result_fhs[i]); } + free(filename_pipes); free(result_fhs); + free(stream_pipe_read); + free(stream_pipe_write); free(pids); free(finished); |