From 30ddb045dfaaf867bda7122d76ceb12d24d248e7 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 22 May 2014 16:42:33 +0200 Subject: Rationalise and document stream "open for write" functions A nice side-effect is that streams now only have one set of headers --- libcrystfel/src/stream.c | 87 +++++++++++++++++++++++++++++++++++++++--------- libcrystfel/src/stream.h | 2 +- src/im-sandbox.c | 20 +++++------ src/im-sandbox.h | 8 ++--- src/indexamajig.c | 11 +++--- 5 files changed, 92 insertions(+), 36 deletions(-) diff --git a/libcrystfel/src/stream.c b/libcrystfel/src/stream.c index fa893a30..13b9065d 100644 --- a/libcrystfel/src/stream.c +++ b/libcrystfel/src/stream.c @@ -840,6 +840,20 @@ Stream *open_stream_for_read(const char *filename) } +/** + * open_stream_fd_for_write + * @fd: File descriptor (e.g. from open()) to use for stream data. + * + * Creates a new %Stream from @fd, so that stream data can be written to @fd + * using write_chunk(). + * + * In contrast to open_stream_for_write(), this function does not write any of + * the usual headers. This function is mostly for use when multiple substreams + * need to be multiplexed into a single master stream. The master would be + * opened using open_stream_for_write(), and the substreams using this function. + * + * Returns: a %Stream, or NULL on failure. + */ Stream *open_stream_fd_for_write(int fd) { Stream *st; @@ -856,25 +870,64 @@ Stream *open_stream_fd_for_write(int fd) st->major_version = LATEST_MAJOR_VERSION; st->minor_version = LATEST_MINOR_VERSION; - fprintf(st->fh, "CrystFEL stream format %i.%i\n", - st->major_version, st->minor_version); - fprintf(st->fh, "Generated by CrystFEL "CRYSTFEL_VERSIONSTRING"\n"); return st; } +/** + * open_stream_for_write + * @filename: Filename of new stream + * + * Creates a new stream with name @filename, and adds the stream format + * and version headers. + * + * You may want to follow this with a call to write_command() to record the + * command line. + * + * Returns: a %Stream, or NULL on failure. + */ Stream *open_stream_for_write(const char *filename) { - int fd; + Stream *st; + + st = malloc(sizeof(struct _stream)); + if ( st == NULL ) return NULL; - fd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, - S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); - if ( fd == -1 ) { + st->fh = fopen(filename, "w"); + if ( st->fh == NULL ) { ERROR("Failed to open stream.\n"); + free(st); return NULL; } - return open_stream_fd_for_write(fd); + st->major_version = LATEST_MAJOR_VERSION; + st->minor_version = LATEST_MINOR_VERSION; + + fprintf(st->fh, "CrystFEL stream format %i.%i\n", + st->major_version, st->minor_version); + fprintf(st->fh, "Generated by CrystFEL "CRYSTFEL_VERSIONSTRING"\n"); + fflush(st->fh); + + return st; +} + + +/** + * get_stream_fd + * @st: A %Stream + * + * This function gets the integer file descriptor for @st, a bit like fileno(). + * + * This is useful in conjunction with open_stream_fd_for_write(), to get the + * underlying file descriptor to which the multiplexed stream data should be + * written. In this case, the only other operations you should ever do (or have + * done) on @st are open_stream_for_write() and close_stream(). + * + * Returns: an integer file descriptor + */ +int get_stream_fd(Stream *st) +{ + return fileno(st->fh); } @@ -906,13 +959,16 @@ int is_stream(const char *filename) } -void write_line(Stream *st, const char *line) -{ - fprintf(st->fh, "%s\n", line); - fflush(st->fh); -} - - +/** + * write_command + * @st: A %Stream + * @argc: number of arguments + * @argv: command-line arguments + * + * Writes the command line to @st. @argc and @argv should be exactly as were + * given to main(). This should usually be called immediately after + * open_stream_for_write(). + */ void write_command(Stream *st, int argc, char *argv[]) { int i; @@ -924,6 +980,7 @@ void write_command(Stream *st, int argc, char *argv[]) fprintf(st->fh, "%s", argv[i]); } fprintf(st->fh, "\n"); + fflush(st->fh); } diff --git a/libcrystfel/src/stream.h b/libcrystfel/src/stream.h index 9f852c60..78c27b44 100644 --- a/libcrystfel/src/stream.h +++ b/libcrystfel/src/stream.h @@ -77,6 +77,7 @@ typedef enum { extern Stream *open_stream_for_read(const char *filename); extern Stream *open_stream_for_write(const char *filename); extern Stream *open_stream_fd_for_write(int fd); +extern int get_stream_fd(Stream *st); extern void close_stream(Stream *st); extern int read_chunk(Stream *st, struct image *image); @@ -84,7 +85,6 @@ extern int read_chunk_2(Stream *st, struct image *image, StreamReadFlags srf); extern void write_chunk(Stream *st, struct image *image, struct hdfile *hdfile, int include_peaks, int include_reflections); -extern void write_line(Stream *st, const char *line); extern void write_command(Stream *st, int argc, char *argv[]); extern int rewind_stream(Stream *st); diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 3558c8ea..3e2ab2b3 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -74,8 +74,8 @@ struct sb_reader FILE **fhs; int *fds; - /* Final output fd */ - int ofd; + /* Final output */ + Stream *stream; }; @@ -376,6 +376,7 @@ static void remove_pipe(struct sb_reader *rd, int d) static void *run_reader(void *rdv) { struct sb_reader *rd = rdv; + const int ofd = get_stream_fd(rd->stream); while ( 1 ) { @@ -425,7 +426,7 @@ static void *run_reader(void *rdv) /* If the chunk cannot be read, assume the connection * is broken and that the process will die soon. */ - if ( pump_chunk(rd->fhs[i], rd->ofd) ) { + if ( pump_chunk(rd->fhs[i], ofd) ) { /* remove_pipe() assumes that the caller is * holding rd->lock ! */ remove_pipe(rd, i); @@ -440,8 +441,7 @@ static void *run_reader(void *rdv) } -static void start_worker_process(struct sandbox *sb, int slot, - int argc, char *argv[]) +static void start_worker_process(struct sandbox *sb, int slot) { pid_t p; int filename_pipe[2]; @@ -537,8 +537,6 @@ static void start_worker_process(struct sandbox *sb, int slot, close(result_pipe[0]); st = open_stream_fd_for_write(stream_pipe[1]); - write_command(st, argc, argv); - write_line(st, "FLUSH"); run_work(sb->iargs, filename_pipe[0], result_pipe[1], st, slot, tmp); close_stream(st); @@ -606,7 +604,7 @@ static void handle_zombie(struct sandbox *sb) STATUS("Last filename was: %s\n", sb->last_filename[i]); sb->n_processed++; - start_worker_process(sb, i, 0, NULL); + start_worker_process(sb, i); } } @@ -618,7 +616,7 @@ static void handle_zombie(struct sandbox *sb) void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, - int ofd, int argc, char *argv[], const char *tempdir) + Stream *stream, const char *tempdir) { int i; int allDone; @@ -657,7 +655,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->reader->fds = NULL; sb->reader->fhs = NULL; - sb->reader->ofd = ofd; + sb->reader->stream = stream; sb->stream_pipe_write = calloc(n_proc, sizeof(int)); if ( sb->stream_pipe_write == NULL ) { @@ -742,7 +740,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Fork the right number of times */ lock_sandbox(sb); for ( i=0; i + * 2010-2014 Thomas White * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -37,5 +37,5 @@ #include "process_image.h" extern void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, - int config_basename, FILE *fh, int streamfd, - int argc, char *argv[], const char *tempdir); + int config_basename, FILE *fh, Stream *stream, + const char *tempdir); diff --git a/src/indexamajig.c b/src/indexamajig.c index 9102ccc5..607e8b0c 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -178,7 +178,7 @@ int main(int argc, char *argv[]) char *filename = NULL; char *outfile = NULL; FILE *fh; - int ofd; + Stream *st; int config_checkprefix = 1; int config_basename = 0; int integrate_saturated = 0; @@ -627,14 +627,15 @@ int main(int argc, char *argv[]) } - ofd = open(outfile, O_CREAT | O_TRUNC | O_WRONLY, - S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); - if ( ofd == -1 ) { + st = open_stream_for_write(outfile); + if ( st == NULL ) { ERROR("Failed to open stream '%s'\n", outfile); return 1; } free(outfile); + write_command(st, argc, argv); + /* Prepare the indexer */ if ( indm != NULL ) { ipriv = prepare_indexing(indm, iargs.cell, iargs.det, @@ -653,7 +654,7 @@ int main(int argc, char *argv[]) iargs.ipriv = ipriv; create_sandbox(&iargs, n_proc, prefix, config_basename, fh, - ofd, argc, argv, tempdir); + st, tempdir); free(prefix); free(tempdir); -- cgit v1.2.3