From 61b69a8c846cda4506db71b285dafbef79f41bac Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 8 Jul 2012 22:09:50 +0200 Subject: Write stream without using a lock at all --- src/indexamajig.c | 275 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 209 insertions(+), 66 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index b09a00aa..3e8ab4de 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -106,9 +106,7 @@ struct index_args double ir_out; /* Output stream */ - FILE *ofh; const struct copy_hdf5_field *copyme; - char *outfile; }; @@ -227,26 +225,30 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, char *line; char *filename; - /* Get the next filename */ - if ( *use_this_one_instead != NULL ) { + do { - line = *use_this_one_instead; - *use_this_one_instead = NULL; + /* Get the next filename */ + if ( *use_this_one_instead != NULL ) { - } else { + line = *use_this_one_instead; + *use_this_one_instead = NULL; - char *rval; + } else { + + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - return NULL; } - } + chomp(line); - chomp(line); + } while ( strlen(line) == 0 ); if ( config_basename ) { char *tmp; @@ -266,7 +268,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, static void process_image(const struct index_args *iargs, - struct pattern_args *pargs, int cookie) + struct pattern_args *pargs, FILE *ofh, + int cookie) { float *data_for_measurement; size_t data_size; @@ -278,12 +281,7 @@ static void process_image(const struct index_args *iargs, struct beam_params *beam = iargs->beam; int r, check; struct hdfile *hdfile; - char *outfile = iargs->outfile; struct image image; - char *outfilename = iargs->outfile; - int fd; - FILE *fh; - struct flock fl; image.features = NULL; image.data = NULL; @@ -408,38 +406,9 @@ static void process_image(const struct index_args *iargs, image.reflections = NULL; } - /* Write Lock */ - fl.l_type = F_WRLCK; - fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = 0; /* Means "lock the whole file" */ - - fd = open(outfile, O_WRONLY); - if ( fd == -1 ) { - ERROR("Couldn't open output stream ('%s').\n", outfile); - exit(1); - } - if ( fcntl(fd, F_SETLKW, &fl) == -1 ) { - ERROR("Couldn't get lock on output stream.\n"); - exit(1); - } - - fh = fdopen(fd, "a"); - if ( fh == NULL ) { - ERROR("Couldn't fdopen() the output stream.\n"); - exit(1); - } - write_chunk(fh, &image, hdfile, iargs->stream_flags); - fflush(fh); - - /* Unlock stream for other processes */ - fl.l_type = F_UNLCK; /* set to unlock same region */ - if ( fcntl(fd, F_SETLK, &fl) == -1 ) { - ERROR("fcntl"); - exit(1); - } - - fclose(fh); /* close(fd) happens as well because fd was not dup'd */ + write_chunk(ofh, &image, hdfile, iargs->stream_flags); + fprintf(ofh, "END\n"); + fflush(ofh); /* Only free cell if found */ cell_free(image.indexed_cell); @@ -454,7 +423,8 @@ static void process_image(const struct index_args *iargs, static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, int cookie) + int filename_pipe, int results_pipe, FILE *ofh, + int cookie) { int allDone = 0; FILE *fh; @@ -489,16 +459,25 @@ static void run_work(const struct index_args *iargs, } chomp(line); - pargs.filename = line; - pargs.indexable = 0; - process_image(iargs, &pargs, cookie); + if ( strlen(line) == 0 ) { + + allDone = 1; + + } else { + + pargs.filename = line; + pargs.indexable = 0; + + process_image(iargs, &pargs, ofh, cookie); + + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { + ERROR("write P0\n"); + } - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0\n"); } free(line); @@ -558,6 +537,118 @@ static int parse_cell_reduction(const char *scellr, int *err, } +static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) +{ + int done = 0; + int *finished; + FILE **fhs; + int i; + int chunk_finished; + + finished = calloc(n_proc, sizeof(int)); + if ( finished == NULL ) { + ERROR("Couldn't allocate memory for flags!\n"); + exit(1); + } + + fhs = calloc(n_proc, sizeof(FILE *)); + if ( fhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + exit(1); + } + + for ( i=0; i fdmax ) fdmax = fd; + + } + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + + if ( r == -1 ) { + ERROR("select() failed: %s\n", strerror(errno)); + continue; + } + + if ( r == 0 ) continue; /* Nothing this time. Try again */ + + for ( i=0; i