aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2012-07-08 22:09:50 +0200
committerThomas White <taw@bitwiz.org.uk>2012-07-08 22:09:50 +0200
commit61b69a8c846cda4506db71b285dafbef79f41bac (patch)
treec15f25457c46714568087378a527f087d7f26581 /src
parentfa3a72ba31301b625eef3fc01c5a9d3fbf7ef296 (diff)
Write stream without using a lock at all
Diffstat (limited to 'src')
-rw-r--r--src/indexamajig.c275
1 files changed, 209 insertions, 66 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c
index b09a00aa..3e8ab4de 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -106,9 +106,7 @@ struct index_args
double ir_out;
/* Output stream */
- FILE *ofh;
const struct copy_hdf5_field *copyme;
- char *outfile;
};
@@ -227,26 +225,30 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
char *line;
char *filename;
- /* Get the next filename */
- if ( *use_this_one_instead != NULL ) {
+ do {
- line = *use_this_one_instead;
- *use_this_one_instead = NULL;
+ /* Get the next filename */
+ if ( *use_this_one_instead != NULL ) {
- } else {
+ line = *use_this_one_instead;
+ *use_this_one_instead = NULL;
- char *rval;
+ } else {
+
+ char *rval;
+
+ line = malloc(1024*sizeof(char));
+ rval = fgets(line, 1023, fh);
+ if ( rval == NULL ) {
+ free(line);
+ return NULL;
+ }
- line = malloc(1024*sizeof(char));
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) {
- free(line);
- return NULL;
}
- }
+ chomp(line);
- chomp(line);
+ } while ( strlen(line) == 0 );
if ( config_basename ) {
char *tmp;
@@ -266,7 +268,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
static void process_image(const struct index_args *iargs,
- struct pattern_args *pargs, int cookie)
+ struct pattern_args *pargs, FILE *ofh,
+ int cookie)
{
float *data_for_measurement;
size_t data_size;
@@ -278,12 +281,7 @@ static void process_image(const struct index_args *iargs,
struct beam_params *beam = iargs->beam;
int r, check;
struct hdfile *hdfile;
- char *outfile = iargs->outfile;
struct image image;
- char *outfilename = iargs->outfile;
- int fd;
- FILE *fh;
- struct flock fl;
image.features = NULL;
image.data = NULL;
@@ -408,38 +406,9 @@ static void process_image(const struct index_args *iargs,
image.reflections = NULL;
}
- /* Write Lock */
- fl.l_type = F_WRLCK;
- fl.l_whence = SEEK_SET;
- fl.l_start = 0;
- fl.l_len = 0; /* Means "lock the whole file" */
-
- fd = open(outfile, O_WRONLY);
- if ( fd == -1 ) {
- ERROR("Couldn't open output stream ('%s').\n", outfile);
- exit(1);
- }
- if ( fcntl(fd, F_SETLKW, &fl) == -1 ) {
- ERROR("Couldn't get lock on output stream.\n");
- exit(1);
- }
-
- fh = fdopen(fd, "a");
- if ( fh == NULL ) {
- ERROR("Couldn't fdopen() the output stream.\n");
- exit(1);
- }
- write_chunk(fh, &image, hdfile, iargs->stream_flags);
- fflush(fh);
-
- /* Unlock stream for other processes */
- fl.l_type = F_UNLCK; /* set to unlock same region */
- if ( fcntl(fd, F_SETLK, &fl) == -1 ) {
- ERROR("fcntl");
- exit(1);
- }
-
- fclose(fh); /* close(fd) happens as well because fd was not dup'd */
+ write_chunk(ofh, &image, hdfile, iargs->stream_flags);
+ fprintf(ofh, "END\n");
+ fflush(ofh);
/* Only free cell if found */
cell_free(image.indexed_cell);
@@ -454,7 +423,8 @@ static void process_image(const struct index_args *iargs,
static void run_work(const struct index_args *iargs,
- int filename_pipe, int results_pipe, int cookie)
+ int filename_pipe, int results_pipe, FILE *ofh,
+ int cookie)
{
int allDone = 0;
FILE *fh;
@@ -489,16 +459,25 @@ static void run_work(const struct index_args *iargs,
}
chomp(line);
- pargs.filename = line;
- pargs.indexable = 0;
- process_image(iargs, &pargs, cookie);
+ if ( strlen(line) == 0 ) {
+
+ allDone = 1;
+
+ } else {
+
+ pargs.filename = line;
+ pargs.indexable = 0;
+
+ process_image(iargs, &pargs, ofh, cookie);
+
+ /* Request another image */
+ c = sprintf(buf, "%i\n", pargs.indexable);
+ w = write(results_pipe, buf, c);
+ if ( w < 0 ) {
+ ERROR("write P0\n");
+ }
- /* Request another image */
- c = sprintf(buf, "%i\n", pargs.indexable);
- w = write(results_pipe, buf, c);
- if ( w < 0 ) {
- ERROR("write P0\n");
}
free(line);
@@ -558,6 +537,118 @@ static int parse_cell_reduction(const char *scellr, int *err,
}
+static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
+{
+ int done = 0;
+ int *finished;
+ FILE **fhs;
+ int i;
+ int chunk_finished;
+
+ finished = calloc(n_proc, sizeof(int));
+ if ( finished == NULL ) {
+ ERROR("Couldn't allocate memory for flags!\n");
+ exit(1);
+ }
+
+ fhs = calloc(n_proc, sizeof(FILE *));
+ if ( fhs == NULL ) {
+ ERROR("Couldn't allocate memory for file handles!\n");
+ exit(1);
+ }
+
+ for ( i=0; i<n_proc; i++ ) {
+ fhs[i] = fdopen(stream_pipe_read[i], "r");
+ if ( fhs[i] == NULL ) {
+ ERROR("Couldn't fdopen() stream!\n");
+ exit(1);
+ }
+ }
+
+ while ( !done ) {
+
+ int r, i;
+ struct timeval tv;
+ fd_set fds;
+ int fdmax;
+
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+
+ FD_ZERO(&fds);
+ fdmax = 0;
+ for ( i=0; i<n_proc; i++ ) {
+
+ int fd;
+
+ if ( finished[i] ) continue;
+
+ fd = stream_pipe_read[i];
+
+ FD_SET(fd, &fds);
+ if ( fd > fdmax ) fdmax = fd;
+
+ }
+
+ r = select(fdmax+1, &fds, NULL, NULL, &tv);
+
+ if ( r == -1 ) {
+ ERROR("select() failed: %s\n", strerror(errno));
+ continue;
+ }
+
+ if ( r == 0 ) continue; /* Nothing this time. Try again */
+
+ for ( i=0; i<n_proc; i++ ) {
+
+ if ( finished[i] ) continue;
+
+ if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue;
+
+ chunk_finished = 0;
+ do {
+
+ char line[1024];
+ char *rval;
+
+ rval = fgets(line, 1024, fhs[i]);
+ if ( rval == NULL ) {
+ if ( feof(fhs[i]) ) {
+ /* Process died */
+ finished[i] = 1;
+ } else {
+ ERROR("fgets() failed: %s\n",
+ strerror(errno));
+ }
+ continue;
+ }
+
+
+ if ( strcmp(line, "END\n") == 0 ) {
+ chunk_finished = 1;
+ } else {
+ fprintf(ofh, "%s", line);
+ }
+
+ } while ( !chunk_finished );
+
+ }
+
+ done = 1;
+ for ( i=0; i<n_proc; i++ ) {
+ if ( !finished[i] ) done = 0;
+ }
+
+ }
+
+ free(finished);
+
+ for ( i=0; i<n_proc; i++ ) {
+ fclose(fhs[i]);
+ }
+}
+
+
int main(int argc, char *argv[])
{
int c;
@@ -616,11 +707,14 @@ int main(int argc, char *argv[])
int t_last_stats;
pid_t *pids;
int *filename_pipes;
+ int *stream_pipe_read;
+ int *stream_pipe_write;
FILE **result_fhs;
fd_set fds;
int i;
int allDone;
int *finished;
+ pid_t pr;
copyme = new_copy_hdf5_field_list();
if ( copyme == NULL ) {
@@ -1007,7 +1101,6 @@ int main(int argc, char *argv[])
iargs.indm = indm;
iargs.ipriv = ipriv;
iargs.peaks = peaks;
- iargs.ofh = ofh;
iargs.beam = beam;
iargs.element = element;
iargs.stream_flags = stream_flags;
@@ -1016,7 +1109,6 @@ int main(int argc, char *argv[])
iargs.ir_inn = ir_inn;
iargs.ir_mid = ir_mid;
iargs.ir_out = ir_out;
- iargs.outfile = outfile;
n_indexable = 0;
n_processed = 0;
@@ -1027,6 +1119,8 @@ int main(int argc, char *argv[])
FD_ZERO(&fds);
filename_pipes = calloc(n_proc, sizeof(int));
result_fhs = calloc(n_proc, sizeof(FILE *));
+ stream_pipe_read = calloc(n_proc, sizeof(int));
+ stream_pipe_write = calloc(n_proc, sizeof(int));
if ( filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return 1;
@@ -1035,6 +1129,14 @@ int main(int argc, char *argv[])
ERROR("Couldn't allocate memory for pipe file handles.\n");
return 1;
}
+ if ( stream_pipe_read == NULL ) {
+ ERROR("Couldn't allocate memory for pipes.\n");
+ return 1;
+ }
+ if ( stream_pipe_write == NULL ) {
+ ERROR("Couldn't allocate memory for pipes.\n");
+ return 1;
+ }
pids = calloc(n_proc, sizeof(pid_t));
if ( pids == NULL ) {
@@ -1048,6 +1150,28 @@ int main(int argc, char *argv[])
return 1;
}
+ for ( i=0; i<n_proc; i++ ) {
+
+ int stream_pipe[2];
+
+ if ( pipe(stream_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return 1;
+ }
+
+ stream_pipe_read[i] = stream_pipe[0];
+ stream_pipe_write[i] = stream_pipe[1];
+
+ }
+
+ pr = fork();
+ if ( pr == - 1 ) {
+ ERROR("fork() failed (for reader process)\n");
+ return 1;
+ }
+
+ if ( pr == 0 ) run_reader(stream_pipe_read, n_proc, ofh);
+
/* Fork the right number of times */
for ( i=0; i<n_proc; i++ ) {
@@ -1072,11 +1196,19 @@ int main(int argc, char *argv[])
}
if ( p == 0 ) {
+
+ FILE *fh;
+
/* Child process gets the 'read' end of the filename
* pipe, and the 'write' end of the result pipe. */
close(filename_pipe[1]);
close(result_pipe[0]);
- run_work(&iargs, filename_pipe[0], result_pipe[1], i);
+
+ fh = fdopen(stream_pipe_write[i], "w");
+ run_work(&iargs, filename_pipe[0], result_pipe[1],
+ fh, i);
+ fclose(fh);
+
exit(0);
}
@@ -1190,7 +1322,10 @@ int main(int argc, char *argv[])
if ( nextImage == NULL ) {
/* No more images */
- finished[i] = 1;
+ r = write(filename_pipes[i], "\n", 1);
+ if ( r < 0 ) {
+ ERROR("Write pipe\n");
+ }
} else {
r = write(filename_pipes[i], nextImage,
@@ -1227,11 +1362,19 @@ int main(int argc, char *argv[])
}
for ( i=0; i<n_proc; i++ ) {
+ int status;
+ waitpid(pids[i], &status, 0);
+ }
+
+ for ( i=0; i<n_proc; i++ ) {
close(filename_pipes[i]);
fclose(result_fhs[i]);
}
+
free(filename_pipes);
free(result_fhs);
+ free(stream_pipe_read);
+ free(stream_pipe_write);
free(pids);
free(finished);