diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 726 | ||||
-rw-r--r-- | src/im-sandbox.h | 39 | ||||
-rw-r--r-- | src/process_image.c | 17 | ||||
-rw-r--r-- | src/process_image.h | 11 |
4 files changed, 247 insertions, 546 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index f796b8b7..747c7d03 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -49,6 +49,7 @@ #include <sys/stat.h> #include <assert.h> #include <sys/mman.h> +#include <semaphore.h> #ifdef HAVE_CLOCK_GETTIME #include <time.h> @@ -67,7 +68,6 @@ /* Write statistics at APPROXIMATELY this interval */ #define STATS_EVERY_N_SECONDS (5) - struct sb_reader { pthread_mutex_t lock; @@ -86,19 +86,8 @@ struct sb_reader }; -struct sb_shm -{ - pthread_mutex_t term_lock; -}; - - struct sandbox { - pthread_mutex_t lock; - - int n_processed; - int n_hadcrystals; - int n_crystals; int n_processed_last_stats; int n_hadcrystals_last_stats; int n_crystals_last_stats; @@ -110,10 +99,7 @@ struct sandbox pid_t *pids; int *running; - FILE **result_fhs; - int *filename_pipes; - int *stream_pipe_write; - struct filename_plus_event **last_filename; + struct filename_plus_event **last_event; int serial; struct sb_shm *shared; @@ -125,19 +111,7 @@ struct sandbox /* Horrible global variable for signal handler */ -int signal_pipe[2]; - - -static void lock_sandbox(struct sandbox *sb) -{ - pthread_mutex_lock(&sb->lock); -} - - -static void unlock_sandbox(struct sandbox *sb) -{ - pthread_mutex_unlock(&sb->lock); -} +sem_t zombie_sem; static struct filename_plus_event *get_pattern(FILE *fh, int config_basename, @@ -169,6 +143,8 @@ static struct filename_plus_event *get_pattern(FILE *fh, int config_basename, rval = fgets(line, 1023, fh); if ( rval == NULL ) { free(line); + free(filename); + filename = NULL; return NULL; } @@ -273,263 +249,84 @@ static struct filename_plus_event *get_pattern(FILE *fh, int config_basename, } -struct buffer_data -{ - char *rbuffer; - char *line; - int fd; - int rbufpos; - int rbuflen; - int eof; - int err; -}; - - -static int read_fpe_data(struct buffer_data *bd) +static void shuffle_events(struct sb_shm *sb_shared) { - int rval; - int no_line = 0; - - bd->eof = 0; - bd->err = 0; - - rval = read(bd->fd, bd->rbuffer+bd->rbufpos, bd->rbuflen-bd->rbufpos); - if ( rval == 0 ) { - bd->eof = 1; - return 1; - } - if ( rval == -1 ) { - bd->err = 1; - return 1; - } - - bd->rbufpos += rval; - assert(bd->rbufpos <= bd->rbuflen); - - while ( (!no_line) && (bd->rbufpos > 0) ) { - - int i; - int line_ready = 0; - int line_length = 0; - - /* See if there's a full line in the buffer yet */ - for ( i=0; i<bd->rbufpos; i++ ) { - - /* Is there a line in the buffer? */ - if ( bd->rbuffer[i] == '\n' ) { - line_length = i+1; - line_ready = 1; - break; - } - - } - - if ( line_ready ) { - - int new_rbuflen; - - if ( bd->line != NULL ) { - free(bd->line); - } - - bd->line = malloc(line_length+1); - strncpy(bd->line, bd->rbuffer, line_length); - bd->line[line_length] = '\0'; - - /* Now the block's been parsed, it should be - * forgotten about */ - memmove(bd->rbuffer, - bd->rbuffer + line_length, - bd->rbuflen - line_length); - - /* Subtract the number of bytes removed */ - bd->rbufpos = bd->rbufpos - line_length; - new_rbuflen = bd->rbuflen - line_length; - if ( new_rbuflen == 0 ) new_rbuflen = 256; - bd->rbuffer = realloc(bd->rbuffer, - new_rbuflen*sizeof(char)); - bd->rbuflen = new_rbuflen; - - return 1; - - } else { - - if ( bd->rbufpos == bd->rbuflen ) { - bd->rbuffer = realloc(bd->rbuffer, - bd->rbuflen + 256); - bd->rbuflen = bd->rbuflen + 256; - } - no_line = 1; - - } + int i; + for ( i=1; i<sb_shared->n_events; i++ ) { + memcpy(sb_shared->queue[i-1], sb_shared->queue[i], MAX_EV_LEN); } - - return 0; + sb_shared->n_events--; } -static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, Stream *st, - int cookie, const char *tmpdir, pthread_mutex_t *term_lock) +static void run_work(const struct index_args *iargs, Stream *st, + int cookie, const char *tmpdir, struct sb_shm *sb_shared) { - FILE *fh; int allDone = 0; - int w; - unsigned int opts; - struct buffer_data bd; - - bd.rbuffer = malloc(256*sizeof(char)); - bd.rbuflen = 256; - bd.rbufpos = 0; - bd.line = NULL; - bd.fd = 0; - bd.eof = 0; - bd.err = 1; - - fh = fdopen(filename_pipe, "r"); - if ( fh == NULL ) { - ERROR("Failed to fdopen() the filename pipe!\n"); - return; - } - - w = write(results_pipe, "\n", 1); - if ( w < 0 ) { - ERROR("Failed to send request for first filename.\n"); - } - - bd.fd = fileno(fh); - - /* Set non-blocking */ - opts = fcntl(bd.fd, F_GETFL); - fcntl(bd.fd, F_SETFL, opts | O_NONBLOCK); while ( !allDone ) { struct pattern_args pargs; - int c; - int rval; - char buf[1024]; - - pargs.filename_p_e = initialize_filename_plus_event(); - - rval = 0; - do { - - fd_set fds; - struct timeval tv; - int sval; - - FD_ZERO(&fds); - FD_SET(bd.fd, &fds); - - tv.tv_sec = 30; - tv.tv_usec = 0; - - sval = select(bd.fd+1, &fds, NULL, NULL, &tv); - - if ( sval == -1 ) { - - const int err = errno; - - switch ( err ) { - - case EINTR: - STATUS("Restarting select()\n"); - break; - - default: - ERROR("select() failed: %s\n", - strerror(err)); - rval = 1; - - } - - } else if ( sval != 0 ) { - rval = read_fpe_data(&bd); - } else { - ERROR("No data sent from main process..\n"); - /* Not actually an error condition. The main - * process might just be taking a while to read - * the index data for a large multi-event file. - */ - } + char filename[MAX_EV_LEN]; + char event_str[MAX_EV_LEN]; + int ser; + struct event *ev; + int r; - } while ( !rval ); + /* Wait until an event is ready */ + sem_wait(&sb_shared->queue_sem); - if ( bd.err ) { - ERROR("Event pipe read error: %s\n", strerror(errno)); + /* Get the event from the queue */ + pthread_mutex_lock(&sb_shared->queue_lock); + if ( sb_shared->no_more ) { + pthread_mutex_unlock(&sb_shared->queue_lock); allDone = 1; continue; } - - if ( bd.eof ) { - ERROR("Event pipe EOF (should not happen).\n"); - allDone = 1; - continue; + r = sscanf(sb_shared->queue[0], "%s %s %i", + filename, event_str, &ser); + if ( r != 3 ) { + STATUS("Invalid event string '%s'\n", + sb_shared->queue[0]); } + memcpy(sb_shared->last_ev[cookie], sb_shared->queue[0], + MAX_EV_LEN); + shuffle_events(sb_shared); + pthread_mutex_unlock(&sb_shared->queue_lock); - if ( bd.line[0] == '\n' ) { - allDone = 1; - } else { - - char filename[1024]; - char event_str[1024]; - struct event* ev; - int ser; - - chomp(bd.line); - - sscanf(bd.line, "%s %s %i", filename, event_str, &ser); - pargs.filename_p_e->filename = strdup(filename); - - /* Make absolutely sure the same event won't be - * processed a second time */ - bd.line[0] = '\0'; - - if ( strcmp(event_str, "/") != 0 ) { - - ev = get_event_from_event_string(event_str); - if ( ev == NULL ) { - ERROR("Bad event string '%s'\n", - event_str); - continue; - } - - pargs.filename_p_e->ev = ev; + if ( r != 3 ) continue; - } else { + pargs.filename_p_e = initialize_filename_plus_event(); + pargs.filename_p_e->filename = strdup(filename); - pargs.filename_p_e->ev = NULL; + if ( strcmp(event_str, "/") != 0 ) { + ev = get_event_from_event_string(event_str); + if ( ev == NULL ) { + ERROR("Bad event string '%s'\n", event_str); + continue; } + pargs.filename_p_e->ev = ev; - pargs.n_crystals = 0; - process_image(iargs, &pargs, st, cookie, tmpdir, - results_pipe, ser, term_lock); + } else { - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.n_crystals); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0\n"); - } + pargs.filename_p_e->ev = NULL; } + process_image(iargs, &pargs, st, cookie, tmpdir, ser, + sb_shared); + free_filename_plus_event(pargs.filename_p_e); } - free(bd.line); - free(bd.rbuffer); - cleanup_indexing(iargs->indm, iargs->ipriv); free_detector_geometry(iargs->det); free(iargs->hdf5_peak_path); free_copy_hdf5_field_list(iargs->copyme); cell_free(iargs->cell); - fclose(fh); } @@ -556,8 +353,6 @@ static time_t get_monotonic_seconds() #endif -size_t vol = 0; - static ssize_t lwrite(int fd, const char *a) { @@ -647,6 +442,8 @@ static void remove_pipe(struct sb_reader *rd, int d) { int i; + fclose(rd->fhs[d]); + for ( i=d; i<rd->n_read; i++ ) { if ( i < rd->n_read-1 ) { rd->fds[i] = rd->fds[i+1]; @@ -731,20 +528,8 @@ static void *run_reader(void *rdv) static void start_worker_process(struct sandbox *sb, int slot) { pid_t p; - int filename_pipe[2]; - int result_pipe[2]; int stream_pipe[2]; - if ( pipe(filename_pipe) == - 1 ) { - ERROR("pipe() failed!\n"); - return; - } - - if ( pipe(result_pipe) == - 1 ) { - ERROR("pipe() failed!\n"); - return; - } - if ( pipe(stream_pipe) == - 1 ) { ERROR("pipe() failed!\n"); return; @@ -759,12 +544,12 @@ static void start_worker_process(struct sandbox *sb, int slot) if ( p == 0 ) { Stream *st; - int j; struct sigaction sa; int r; char *tmp; struct stat s; size_t ll; + int i; /* First, disconnect the signal handler */ sa.sa_flags = 0; @@ -800,36 +585,29 @@ static void start_worker_process(struct sandbox *sb, int slot) } /* Free resources which will not be needed by worker */ - for ( j=0; j<sb->n_proc; j++ ) { - if ( (j != slot) && (sb->running[j]) ) { - close(sb->stream_pipe_write[j]); - } - } - for ( j=0; j<sb->n_proc; j++ ) { - if ( (j != slot) && (sb->running[j]) ) { - if ( sb->result_fhs[j] != NULL ) { - fclose(sb->result_fhs[j]); - } - close(sb->filename_pipes[j]); - } - } - free(sb->filename_pipes); - free(sb->result_fhs); free(sb->pids); - /* Also prefix, tempdir, */ - - /* Child process gets the 'read' end of the filename - * pipe, and the 'write' end of the result pipe. */ - close(filename_pipe[1]); - close(result_pipe[0]); + for ( i=0; i<sb->reader->n_read; i++ ) { + fclose(sb->reader->fhs[i]); + } + free(sb->reader->fhs); + free(sb->reader->fds); + free(sb->reader); + free(sb->tmpdir); + free(sb->running); + /* Not freed because it's not worth passing them down just for + * this purpose: event list file handle, + * main output stream handle + * original temp dir name (without indexamajig.XX) + * prefix + */ st = open_stream_fd_for_write(stream_pipe[1]); - run_work(sb->iargs, filename_pipe[0], result_pipe[1], - st, slot, tmp, &sb->shared->term_lock); + run_work(sb->iargs, st, slot, tmp, sb->shared); close_stream(st); - //close(filename_pipe[0]); - close(result_pipe[1]); + free(tmp); + free(sb->iargs->beam->photon_energy_from); + munmap(sb->shared, sizeof(struct sb_shm)); free(sb); @@ -843,22 +621,13 @@ static void start_worker_process(struct sandbox *sb, int slot) sb->pids[slot] = p; sb->running[slot] = 1; add_pipe(sb->reader, stream_pipe[0]); - close(filename_pipe[0]); - close(result_pipe[1]); close(stream_pipe[1]); - sb->filename_pipes[slot] = filename_pipe[1]; - - sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); - if ( sb->result_fhs[slot] == NULL ) { - ERROR("fdopen() failed.\n"); - return; - } } static void signal_handler(int sig, siginfo_t *si, void *uc_v) { - write(signal_pipe[1], "\n", 1); + sem_post(&zombie_sem); } @@ -866,7 +635,6 @@ static void handle_zombie(struct sandbox *sb) { int i; - lock_sandbox(sb); for ( i=0; i<sb->n_proc; i++ ) { int status, p; @@ -891,17 +659,14 @@ static void handle_zombie(struct sandbox *sb) if ( WIFSIGNALED(status) ) { STATUS("Worker %i was killed by signal %i\n", i, WTERMSIG(status)); - STATUS("Last filename was: %s (%s)\n", - sb->last_filename[i]->filename, - get_event_string(sb->last_filename[i]->ev) ); - sb->n_processed++; + STATUS("Event ID was: %s\n", + sb->shared->last_ev[i]); start_worker_process(sb, i); } } } - unlock_sandbox(sb); } @@ -932,24 +697,72 @@ static int setup_shm(struct sandbox *sb) return 1; } + if ( pthread_mutex_init(&sb->shared->queue_lock, &attr) ) { + ERROR("Queue lock setup failed.\n"); + return 1; + } + + if ( pthread_mutex_init(&sb->shared->totals_lock, &attr) ) { + ERROR("Totals lock setup failed.\n"); + return 1; + } + pthread_mutexattr_destroy(&attr); return 0; } +static char *maybe_get_event_string(struct event *ev) +{ + if ( ev == NULL ) return "/"; + return get_event_string(ev); +} + + +/* Assumes the caller is already holding queue_lock! */ +static int fill_queue(FILE *fh, int config_basename, struct detector *det, + const char *prefix, struct sandbox *sb) +{ + while ( sb->shared->n_events < QUEUE_SIZE ) { + + struct filename_plus_event *ne; + char ev_string[MAX_EV_LEN]; + + ne = get_pattern(fh, config_basename, det, prefix); + if ( ne == NULL ) return 1; /* No more */ + + memset(ev_string, 0, MAX_EV_LEN); + snprintf(ev_string, MAX_EV_LEN, "%s %s %i", ne->filename, + maybe_get_event_string(ne->ev), sb->serial++); + memcpy(sb->shared->queue[sb->shared->n_events++], ev_string, + MAX_EV_LEN); + sem_post(&sb->shared->queue_sem); + free_filename_plus_event(ne); + + } + return 0; +} + + void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir) { int i; - int allDone; struct sigaction sa; int r; pthread_t reader_thread; struct sandbox *sb; size_t ll; struct stat s; + int allDone = 0; + + if ( n_proc > MAX_NUM_WORKERS ) { + ERROR("Number of workers (%i) is too large. Using %i\n", + n_proc, MAX_NUM_WORKERS); + n_proc = MAX_NUM_WORKERS; + } sb = calloc(1, sizeof(struct sandbox)); if ( sb == NULL ) { @@ -964,12 +777,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - pthread_mutex_init(&sb->lock, NULL); pthread_mutex_init(&sb->reader->lock, NULL); - sb->n_processed = 0; - sb->n_hadcrystals = 0; - sb->n_crystals = 0; sb->n_processed_last_stats = 0; sb->n_hadcrystals_last_stats = 0; sb->n_crystals_last_stats = 0; @@ -988,25 +797,15 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - sb->stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( sb->stream_pipe_write == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return; - } + sb->shared->n_processed = 0; + sb->shared->n_hadcrystals = 0; + sb->shared->n_crystals = 0; + + sem_init(&sb->shared->queue_sem, 1, 0); + sem_init(&zombie_sem, 0, 0); - lock_sandbox(sb); - sb->filename_pipes = calloc(n_proc, sizeof(int)); - sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); - if ( sb->filename_pipes == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return; - } - if ( sb->result_fhs == NULL ) { - ERROR("Couldn't allocate memory for pipe file handles.\n"); - return; - } if ( sb->pids == NULL ) { ERROR("Couldn't allocate memory for PIDs.\n"); return; @@ -1016,18 +815,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - sb->last_filename = calloc(n_proc, sizeof(char *)); - if ( sb->last_filename == NULL ) { - ERROR("Couldn't allocate memory for last filename list.\n"); - return; - } - unlock_sandbox(sb); - - if ( pipe(signal_pipe) == -1 ) { - ERROR("Failed to create signal pipe.\n"); - return; - } - /* Set up signal handler to take action if any children die */ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; sigemptyset(&sa.sa_mask); @@ -1039,7 +826,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } if ( tempdir == NULL ) { - tempdir = strdup(""); + tempdir = ""; } ll = 64+strlen(tempdir); @@ -1068,12 +855,17 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } + /* Fill the queue */ + pthread_mutex_lock(&sb->shared->queue_lock); + sb->shared->n_events = 0; + fill_queue(fh, config_basename, iargs->det, prefix, sb); + sb->shared->no_more = 0; + pthread_mutex_unlock(&sb->shared->queue_lock); + /* Fork the right number of times */ - lock_sandbox(sb); for ( i=0; i<n_proc; i++ ) { start_worker_process(sb, i); } - unlock_sandbox(sb); /* Start reader thread after forking, so that things are definitely * "running" */ @@ -1083,202 +875,78 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - allDone = 0; - while ( !allDone ) { + do { - int r, i; - struct timeval tv; - fd_set fds; + int r; double tNow; - int fdmax; - - tv.tv_sec = 1; - tv.tv_usec = 0; - - FD_ZERO(&fds); - fdmax = 0; - lock_sandbox(sb); - for ( i=0; i<n_proc; i++ ) { - - int fd; - - if ( sb->result_fhs[i] == NULL) continue; - - fd = fileno(sb->result_fhs[i]); - FD_SET(fd, &fds); - if ( fd > fdmax ) fdmax = fd; - } - unlock_sandbox(sb); - - FD_SET(signal_pipe[0], &fds); - if ( signal_pipe[0] > fdmax ) fdmax = signal_pipe[0]; - - r = select(fdmax+1, &fds, NULL, NULL, &tv); - if ( r == -1 ) { - if ( errno == EINTR ) continue; - ERROR("select() failed: %s\n", strerror(errno)); - break; - } + sleep(5); - if ( FD_ISSET(signal_pipe[0], &fds) ) { - - char d; - read(signal_pipe[0], &d, 1); - handle_zombie(sb); + /* Check for dead workers */ + if ( sem_trywait(&zombie_sem) == 0 ) handle_zombie(sb); + /* Top up the queue if necessary */ + r = 0; + pthread_mutex_lock(&sb->shared->queue_lock); + if ( sb->shared->n_events < QUEUE_SIZE/2 ) { + r = fill_queue(fh, config_basename, iargs->det, prefix, + sb); } - - lock_sandbox(sb); - for ( i=0; i<n_proc; i++ ) { - - struct filename_plus_event *nextImage; - char results[1024]; - char *rval; - int fd; - char *eptr; - - if ( sb->result_fhs[i] == NULL ) continue; - - fd = fileno(sb->result_fhs[i]); - if ( !FD_ISSET(fd, &fds) ) continue; - - rval = fgets(results, 1024, sb->result_fhs[i]); - if ( rval == NULL ) { - if ( !feof(sb->result_fhs[i]) ) { - ERROR("fgets() failed: %s\n", - strerror(errno)); - } - sb->result_fhs[i] = NULL; - continue; - } - - chomp(results); - - strtol(results, &eptr, 10); - if ( eptr == results ) { - if ( strlen(results) > 0 ) { - ERROR("Invalid result '%s'\n", - results); - } - } else { - - int nc = atoi(results); - sb->n_crystals += nc; - if ( nc > 0 ) { - sb->n_hadcrystals++; - } - sb->n_processed++; - - } - - /* Send next filename */ - nextImage = get_pattern(fh, config_basename, - iargs->det, prefix); - - if ( sb->last_filename[i] != NULL ) { - free_filename_plus_event(sb->last_filename[i]); - } - - sb->last_filename[i] = nextImage; - - if ( nextImage == NULL ) { - - /* No more images */ - r = write(sb->filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("Write pipe\n"); - } - - } else { - - char tmp[256]; - - r = write(sb->filename_pipes[i], - nextImage->filename, - strlen(nextImage->filename)); - - if ( r < 0 ) { - ERROR("write pipe\n"); - } - - r = write(sb->filename_pipes[i], " ", 1); - if ( r < 0 ) { - ERROR("write pipe\n"); - } - - if ( nextImage->ev != NULL ) { - - r = write(sb->filename_pipes[i], - get_event_string(nextImage->ev), - strlen(get_event_string(nextImage->ev))); - if ( r < 0 ) { - ERROR("write pipe\n"); - } - - } else { - - r = write(sb->filename_pipes[i], "/", 1); - if ( r < 0 ) { - ERROR("write pipe\n"); - } - - } - - snprintf(tmp, 255, " %i", sb->serial++); - r = write(sb->filename_pipes[i], - tmp, strlen(tmp)); - if ( r < 0 ) { - ERROR("write pipe\n"); - } - - r = write(sb->filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("write pipe\n"); - } - - } - } - - unlock_sandbox(sb); + pthread_mutex_unlock(&sb->shared->queue_lock); /* Update progress */ - lock_sandbox(sb); tNow = get_monotonic_seconds(); r = pthread_mutex_trylock(&sb->shared->term_lock); - if ((r==0) && (tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS)) - { + if ( r == 0 ) { + + /* Could get lock, so write status */ + int n_proc_this; + double indexable; + + n_proc_this = sb->shared->n_processed + - sb->n_processed_last_stats; + indexable = (sb->shared->n_processed == 0) ? 0 : + 100.0 * sb->shared->n_hadcrystals + / sb->shared->n_processed; STATUS("%4i indexable out of %4i processed (%4.1f%%), " "%4i crystals so far. " "%4i images processed since the last message.\n", - sb->n_hadcrystals, sb->n_processed, - (sb->n_processed == 0 ? 0 : - 100.0 * sb->n_hadcrystals / sb->n_processed), - sb->n_crystals, - sb->n_processed - sb->n_processed_last_stats); - - sb->n_processed_last_stats = sb->n_processed; - sb->n_hadcrystals_last_stats = sb->n_hadcrystals; - sb->n_crystals_last_stats = sb->n_crystals; - sb->t_last_stats = tNow; + sb->shared->n_hadcrystals, + sb->shared->n_processed, indexable, + sb->shared->n_crystals, n_proc_this); + sb->n_processed_last_stats = sb->shared->n_processed; + sb->n_hadcrystals_last_stats = sb->shared->n_hadcrystals; + sb->n_crystals_last_stats = sb->shared->n_crystals; + sb->t_last_stats = tNow; - } - if ( r == 0 ) pthread_mutex_unlock(&sb->shared->term_lock); - unlock_sandbox(sb); + pthread_mutex_unlock(&sb->shared->term_lock); - allDone = 1; - lock_sandbox(sb); - for ( i=0; i<n_proc; i++ ) { - if ( sb->running[i] ) allDone = 0; } - unlock_sandbox(sb); - } + /* Have all the events been swallowed? */ + pthread_mutex_lock(&sb->shared->queue_lock); + if ( sb->shared->n_events == 0 ) allDone = 1; + pthread_mutex_unlock(&sb->shared->queue_lock); + } while ( !allDone ); fclose(fh); + /* Indicate to the workers that we are finished, and wake them up one + * last time */ + STATUS("Waiting for the last patterns to be processed...\n"); + pthread_mutex_lock(&sb->shared->queue_lock); + sb->shared->no_more = 1; + pthread_mutex_unlock(&sb->shared->queue_lock); + for ( i=0; i<n_proc; i++ ) { + sem_post(&sb->shared->queue_sem); + } + for ( i=0; i<n_proc; i++ ) { + int status; + waitpid(sb->pids[i], &status, 0); + } + /* Indicate to the reader thread that we are done */ pthread_mutex_lock(&sb->reader->lock); sb->reader->done = 1; @@ -1286,30 +954,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, pthread_join(reader_thread, NULL); - for ( i=0; i<n_proc; i++ ) { - int status; - waitpid(sb->pids[i], &status, 0); + for ( i=0; i<sb->reader->n_read; i++ ) { + fclose(sb->reader->fhs[i]); } - - for ( i=0; i<n_proc; i++ ) { - close(sb->filename_pipes[i]); - if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]); - } - + free(sb->reader->fhs); + free(sb->reader->fds); free(sb->running); - free(sb->filename_pipes); - free(sb->result_fhs); free(sb->pids); free(sb->tmpdir); - munmap(sb->shared, sizeof(struct sb_shm)); - - pthread_mutex_destroy(&sb->lock); + free(sb->reader); - STATUS("Final:" - " %i images processed, %i had crystals (%.1f%%)," + STATUS("Final: %i images processed, %i had crystals (%.1f%%)," " %i crystals overall.\n", - sb->n_processed, sb->n_hadcrystals, - 100.0 * sb->n_hadcrystals / sb->n_processed, sb->n_crystals); + sb->shared->n_processed, sb->shared->n_hadcrystals, + 100.0 * sb->shared->n_hadcrystals / sb->shared->n_processed, + sb->shared->n_crystals); + munmap(sb->shared, sizeof(struct sb_shm)); free(sb); } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 235825ce..67bc88c8 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -3,13 +3,13 @@ * * Sandbox for indexing * - * Copyright © 2012-2014 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2014 Thomas White <taw@physics.org> + * 2010-2015 Thomas White <taw@physics.org> * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -31,11 +31,46 @@ * */ +#ifndef IM_SANDBOX_H +#define IM_SANDBOX_H + +#include <semaphore.h> + +struct sb_shm; + #include "index.h" #include "stream.h" #include "cell.h" #include "process_image.h" +/* Length of event queue */ +#define QUEUE_SIZE (256) + +/* Maximum length of an event ID including serial number */ +#define MAX_EV_LEN (1024) + +/* Maximum number of workers */ +#define MAX_NUM_WORKERS (1024) + +struct sb_shm +{ + pthread_mutex_t term_lock; + + pthread_mutex_t queue_lock; + int n_events; + char queue[QUEUE_SIZE][MAX_EV_LEN]; + int no_more; + char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN]; + sem_t queue_sem; + + pthread_mutex_t totals_lock; + int n_processed; + int n_hadcrystals; + int n_crystals; +}; + extern void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir); + +#endif /* IM_SANDBOX_H */ diff --git a/src/process_image.c b/src/process_image.c index 22ff4ec0..e7e3aa78 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -51,6 +51,7 @@ #include "process_image.h" #include "integration.h" #include "predict-refine.h" +#include "im-sandbox.h" static void try_refine_autoR(struct image *image, Crystal *cr) @@ -119,8 +120,8 @@ static void restore_image_data(float **dp, struct detector *det, float **bu) void process_image(const struct index_args *iargs, struct pattern_args *pargs, - Stream *st, int cookie, const char *tmpdir, int results_pipe, - int serial, pthread_mutex_t *term_lock) + Stream *st, int cookie, const char *tmpdir, + int serial, struct sb_shm *sb_shared) { int check; struct hdfile *hdfile; @@ -131,6 +132,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, char *rn; int n_crystals_left; float **prefilter; + int any_crystals; image.features = NULL; image.data = NULL; @@ -295,7 +297,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, iargs->ir_inn, iargs->ir_mid, iargs->ir_out, iargs->int_diag, iargs->int_diag_h, iargs->int_diag_k, iargs->int_diag_l, - term_lock); + &sb_shared->term_lock); ret = write_chunk(st, &image, hdfile, iargs->stream_peaks, iargs->stream_refls, @@ -315,12 +317,17 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, } /* Count crystals which are still good */ - pargs->n_crystals = 0; + pthread_mutex_lock(&sb_shared->totals_lock); + any_crystals = 0; for ( i=0; i<image.n_crystals; i++ ) { if ( crystal_get_user_flag(image.crystals[i]) == 0 ) { - pargs->n_crystals++; + sb_shared->n_crystals++; + any_crystals = 1; } } + sb_shared->n_processed++; + sb_shared->n_hadcrystals += any_crystals; + pthread_mutex_unlock(&sb_shared->totals_lock); for ( i=0; i<image.n_crystals; i++ ) { cell_free(crystal_get_cell(image.crystals[i])); diff --git a/src/process_image.h b/src/process_image.h index d982c4f0..0d5ca1b7 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -34,8 +34,10 @@ #include <config.h> #endif +struct index_args; #include "integration.h" +#include "im-sandbox.h" enum { @@ -95,16 +97,13 @@ struct pattern_args { /* "Input" */ struct filename_plus_event *filename_p_e; - - /* "Output" */ - int n_crystals; }; extern void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, - int cookie, const char *tmpdir, int results_pipe, - int serial, pthread_mutex_t *term_lock); + int cookie, const char *tmpdir, int serial, + struct sb_shm *sb_shared); -#endif /* PROCESS_IMAGEs_H */ +#endif /* PROCESS_IMAGE_H */ |