aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c726
-rw-r--r--src/im-sandbox.h39
-rw-r--r--src/process_image.c17
-rw-r--r--src/process_image.h11
4 files changed, 247 insertions, 546 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index f796b8b7..747c7d03 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -49,6 +49,7 @@
#include <sys/stat.h>
#include <assert.h>
#include <sys/mman.h>
+#include <semaphore.h>
#ifdef HAVE_CLOCK_GETTIME
#include <time.h>
@@ -67,7 +68,6 @@
/* Write statistics at APPROXIMATELY this interval */
#define STATS_EVERY_N_SECONDS (5)
-
struct sb_reader
{
pthread_mutex_t lock;
@@ -86,19 +86,8 @@ struct sb_reader
};
-struct sb_shm
-{
- pthread_mutex_t term_lock;
-};
-
-
struct sandbox
{
- pthread_mutex_t lock;
-
- int n_processed;
- int n_hadcrystals;
- int n_crystals;
int n_processed_last_stats;
int n_hadcrystals_last_stats;
int n_crystals_last_stats;
@@ -110,10 +99,7 @@ struct sandbox
pid_t *pids;
int *running;
- FILE **result_fhs;
- int *filename_pipes;
- int *stream_pipe_write;
- struct filename_plus_event **last_filename;
+ struct filename_plus_event **last_event;
int serial;
struct sb_shm *shared;
@@ -125,19 +111,7 @@ struct sandbox
/* Horrible global variable for signal handler */
-int signal_pipe[2];
-
-
-static void lock_sandbox(struct sandbox *sb)
-{
- pthread_mutex_lock(&sb->lock);
-}
-
-
-static void unlock_sandbox(struct sandbox *sb)
-{
- pthread_mutex_unlock(&sb->lock);
-}
+sem_t zombie_sem;
static struct filename_plus_event *get_pattern(FILE *fh, int config_basename,
@@ -169,6 +143,8 @@ static struct filename_plus_event *get_pattern(FILE *fh, int config_basename,
rval = fgets(line, 1023, fh);
if ( rval == NULL ) {
free(line);
+ free(filename);
+ filename = NULL;
return NULL;
}
@@ -273,263 +249,84 @@ static struct filename_plus_event *get_pattern(FILE *fh, int config_basename,
}
-struct buffer_data
-{
- char *rbuffer;
- char *line;
- int fd;
- int rbufpos;
- int rbuflen;
- int eof;
- int err;
-};
-
-
-static int read_fpe_data(struct buffer_data *bd)
+static void shuffle_events(struct sb_shm *sb_shared)
{
- int rval;
- int no_line = 0;
-
- bd->eof = 0;
- bd->err = 0;
-
- rval = read(bd->fd, bd->rbuffer+bd->rbufpos, bd->rbuflen-bd->rbufpos);
- if ( rval == 0 ) {
- bd->eof = 1;
- return 1;
- }
- if ( rval == -1 ) {
- bd->err = 1;
- return 1;
- }
-
- bd->rbufpos += rval;
- assert(bd->rbufpos <= bd->rbuflen);
-
- while ( (!no_line) && (bd->rbufpos > 0) ) {
-
- int i;
- int line_ready = 0;
- int line_length = 0;
-
- /* See if there's a full line in the buffer yet */
- for ( i=0; i<bd->rbufpos; i++ ) {
-
- /* Is there a line in the buffer? */
- if ( bd->rbuffer[i] == '\n' ) {
- line_length = i+1;
- line_ready = 1;
- break;
- }
-
- }
-
- if ( line_ready ) {
-
- int new_rbuflen;
-
- if ( bd->line != NULL ) {
- free(bd->line);
- }
-
- bd->line = malloc(line_length+1);
- strncpy(bd->line, bd->rbuffer, line_length);
- bd->line[line_length] = '\0';
-
- /* Now the block's been parsed, it should be
- * forgotten about */
- memmove(bd->rbuffer,
- bd->rbuffer + line_length,
- bd->rbuflen - line_length);
-
- /* Subtract the number of bytes removed */
- bd->rbufpos = bd->rbufpos - line_length;
- new_rbuflen = bd->rbuflen - line_length;
- if ( new_rbuflen == 0 ) new_rbuflen = 256;
- bd->rbuffer = realloc(bd->rbuffer,
- new_rbuflen*sizeof(char));
- bd->rbuflen = new_rbuflen;
-
- return 1;
-
- } else {
-
- if ( bd->rbufpos == bd->rbuflen ) {
- bd->rbuffer = realloc(bd->rbuffer,
- bd->rbuflen + 256);
- bd->rbuflen = bd->rbuflen + 256;
- }
- no_line = 1;
-
- }
+ int i;
+ for ( i=1; i<sb_shared->n_events; i++ ) {
+ memcpy(sb_shared->queue[i-1], sb_shared->queue[i], MAX_EV_LEN);
}
-
- return 0;
+ sb_shared->n_events--;
}
-static void run_work(const struct index_args *iargs,
- int filename_pipe, int results_pipe, Stream *st,
- int cookie, const char *tmpdir, pthread_mutex_t *term_lock)
+static void run_work(const struct index_args *iargs, Stream *st,
+ int cookie, const char *tmpdir, struct sb_shm *sb_shared)
{
- FILE *fh;
int allDone = 0;
- int w;
- unsigned int opts;
- struct buffer_data bd;
-
- bd.rbuffer = malloc(256*sizeof(char));
- bd.rbuflen = 256;
- bd.rbufpos = 0;
- bd.line = NULL;
- bd.fd = 0;
- bd.eof = 0;
- bd.err = 1;
-
- fh = fdopen(filename_pipe, "r");
- if ( fh == NULL ) {
- ERROR("Failed to fdopen() the filename pipe!\n");
- return;
- }
-
- w = write(results_pipe, "\n", 1);
- if ( w < 0 ) {
- ERROR("Failed to send request for first filename.\n");
- }
-
- bd.fd = fileno(fh);
-
- /* Set non-blocking */
- opts = fcntl(bd.fd, F_GETFL);
- fcntl(bd.fd, F_SETFL, opts | O_NONBLOCK);
while ( !allDone ) {
struct pattern_args pargs;
- int c;
- int rval;
- char buf[1024];
-
- pargs.filename_p_e = initialize_filename_plus_event();
-
- rval = 0;
- do {
-
- fd_set fds;
- struct timeval tv;
- int sval;
-
- FD_ZERO(&fds);
- FD_SET(bd.fd, &fds);
-
- tv.tv_sec = 30;
- tv.tv_usec = 0;
-
- sval = select(bd.fd+1, &fds, NULL, NULL, &tv);
-
- if ( sval == -1 ) {
-
- const int err = errno;
-
- switch ( err ) {
-
- case EINTR:
- STATUS("Restarting select()\n");
- break;
-
- default:
- ERROR("select() failed: %s\n",
- strerror(err));
- rval = 1;
-
- }
-
- } else if ( sval != 0 ) {
- rval = read_fpe_data(&bd);
- } else {
- ERROR("No data sent from main process..\n");
- /* Not actually an error condition. The main
- * process might just be taking a while to read
- * the index data for a large multi-event file.
- */
- }
+ char filename[MAX_EV_LEN];
+ char event_str[MAX_EV_LEN];
+ int ser;
+ struct event *ev;
+ int r;
- } while ( !rval );
+ /* Wait until an event is ready */
+ sem_wait(&sb_shared->queue_sem);
- if ( bd.err ) {
- ERROR("Event pipe read error: %s\n", strerror(errno));
+ /* Get the event from the queue */
+ pthread_mutex_lock(&sb_shared->queue_lock);
+ if ( sb_shared->no_more ) {
+ pthread_mutex_unlock(&sb_shared->queue_lock);
allDone = 1;
continue;
}
-
- if ( bd.eof ) {
- ERROR("Event pipe EOF (should not happen).\n");
- allDone = 1;
- continue;
+ r = sscanf(sb_shared->queue[0], "%s %s %i",
+ filename, event_str, &ser);
+ if ( r != 3 ) {
+ STATUS("Invalid event string '%s'\n",
+ sb_shared->queue[0]);
}
+ memcpy(sb_shared->last_ev[cookie], sb_shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb_shared);
+ pthread_mutex_unlock(&sb_shared->queue_lock);
- if ( bd.line[0] == '\n' ) {
- allDone = 1;
- } else {
-
- char filename[1024];
- char event_str[1024];
- struct event* ev;
- int ser;
-
- chomp(bd.line);
-
- sscanf(bd.line, "%s %s %i", filename, event_str, &ser);
- pargs.filename_p_e->filename = strdup(filename);
-
- /* Make absolutely sure the same event won't be
- * processed a second time */
- bd.line[0] = '\0';
-
- if ( strcmp(event_str, "/") != 0 ) {
-
- ev = get_event_from_event_string(event_str);
- if ( ev == NULL ) {
- ERROR("Bad event string '%s'\n",
- event_str);
- continue;
- }
-
- pargs.filename_p_e->ev = ev;
+ if ( r != 3 ) continue;
- } else {
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup(filename);
- pargs.filename_p_e->ev = NULL;
+ if ( strcmp(event_str, "/") != 0 ) {
+ ev = get_event_from_event_string(event_str);
+ if ( ev == NULL ) {
+ ERROR("Bad event string '%s'\n", event_str);
+ continue;
}
+ pargs.filename_p_e->ev = ev;
- pargs.n_crystals = 0;
- process_image(iargs, &pargs, st, cookie, tmpdir,
- results_pipe, ser, term_lock);
+ } else {
- /* Request another image */
- c = sprintf(buf, "%i\n", pargs.n_crystals);
- w = write(results_pipe, buf, c);
- if ( w < 0 ) {
- ERROR("write P0\n");
- }
+ pargs.filename_p_e->ev = NULL;
}
+ process_image(iargs, &pargs, st, cookie, tmpdir, ser,
+ sb_shared);
+
free_filename_plus_event(pargs.filename_p_e);
}
- free(bd.line);
- free(bd.rbuffer);
-
cleanup_indexing(iargs->indm, iargs->ipriv);
free_detector_geometry(iargs->det);
free(iargs->hdf5_peak_path);
free_copy_hdf5_field_list(iargs->copyme);
cell_free(iargs->cell);
- fclose(fh);
}
@@ -556,8 +353,6 @@ static time_t get_monotonic_seconds()
#endif
-size_t vol = 0;
-
static ssize_t lwrite(int fd, const char *a)
{
@@ -647,6 +442,8 @@ static void remove_pipe(struct sb_reader *rd, int d)
{
int i;
+ fclose(rd->fhs[d]);
+
for ( i=d; i<rd->n_read; i++ ) {
if ( i < rd->n_read-1 ) {
rd->fds[i] = rd->fds[i+1];
@@ -731,20 +528,8 @@ static void *run_reader(void *rdv)
static void start_worker_process(struct sandbox *sb, int slot)
{
pid_t p;
- int filename_pipe[2];
- int result_pipe[2];
int stream_pipe[2];
- if ( pipe(filename_pipe) == - 1 ) {
- ERROR("pipe() failed!\n");
- return;
- }
-
- if ( pipe(result_pipe) == - 1 ) {
- ERROR("pipe() failed!\n");
- return;
- }
-
if ( pipe(stream_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
return;
@@ -759,12 +544,12 @@ static void start_worker_process(struct sandbox *sb, int slot)
if ( p == 0 ) {
Stream *st;
- int j;
struct sigaction sa;
int r;
char *tmp;
struct stat s;
size_t ll;
+ int i;
/* First, disconnect the signal handler */
sa.sa_flags = 0;
@@ -800,36 +585,29 @@ static void start_worker_process(struct sandbox *sb, int slot)
}
/* Free resources which will not be needed by worker */
- for ( j=0; j<sb->n_proc; j++ ) {
- if ( (j != slot) && (sb->running[j]) ) {
- close(sb->stream_pipe_write[j]);
- }
- }
- for ( j=0; j<sb->n_proc; j++ ) {
- if ( (j != slot) && (sb->running[j]) ) {
- if ( sb->result_fhs[j] != NULL ) {
- fclose(sb->result_fhs[j]);
- }
- close(sb->filename_pipes[j]);
- }
- }
- free(sb->filename_pipes);
- free(sb->result_fhs);
free(sb->pids);
- /* Also prefix, tempdir, */
-
- /* Child process gets the 'read' end of the filename
- * pipe, and the 'write' end of the result pipe. */
- close(filename_pipe[1]);
- close(result_pipe[0]);
+ for ( i=0; i<sb->reader->n_read; i++ ) {
+ fclose(sb->reader->fhs[i]);
+ }
+ free(sb->reader->fhs);
+ free(sb->reader->fds);
+ free(sb->reader);
+ free(sb->tmpdir);
+ free(sb->running);
+ /* Not freed because it's not worth passing them down just for
+ * this purpose: event list file handle,
+ * main output stream handle
+ * original temp dir name (without indexamajig.XX)
+ * prefix
+ */
st = open_stream_fd_for_write(stream_pipe[1]);
- run_work(sb->iargs, filename_pipe[0], result_pipe[1],
- st, slot, tmp, &sb->shared->term_lock);
+ run_work(sb->iargs, st, slot, tmp, sb->shared);
close_stream(st);
- //close(filename_pipe[0]);
- close(result_pipe[1]);
+ free(tmp);
+ free(sb->iargs->beam->photon_energy_from);
+
munmap(sb->shared, sizeof(struct sb_shm));
free(sb);
@@ -843,22 +621,13 @@ static void start_worker_process(struct sandbox *sb, int slot)
sb->pids[slot] = p;
sb->running[slot] = 1;
add_pipe(sb->reader, stream_pipe[0]);
- close(filename_pipe[0]);
- close(result_pipe[1]);
close(stream_pipe[1]);
- sb->filename_pipes[slot] = filename_pipe[1];
-
- sb->result_fhs[slot] = fdopen(result_pipe[0], "r");
- if ( sb->result_fhs[slot] == NULL ) {
- ERROR("fdopen() failed.\n");
- return;
- }
}
static void signal_handler(int sig, siginfo_t *si, void *uc_v)
{
- write(signal_pipe[1], "\n", 1);
+ sem_post(&zombie_sem);
}
@@ -866,7 +635,6 @@ static void handle_zombie(struct sandbox *sb)
{
int i;
- lock_sandbox(sb);
for ( i=0; i<sb->n_proc; i++ ) {
int status, p;
@@ -891,17 +659,14 @@ static void handle_zombie(struct sandbox *sb)
if ( WIFSIGNALED(status) ) {
STATUS("Worker %i was killed by signal %i\n",
i, WTERMSIG(status));
- STATUS("Last filename was: %s (%s)\n",
- sb->last_filename[i]->filename,
- get_event_string(sb->last_filename[i]->ev) );
- sb->n_processed++;
+ STATUS("Event ID was: %s\n",
+ sb->shared->last_ev[i]);
start_worker_process(sb, i);
}
}
}
- unlock_sandbox(sb);
}
@@ -932,24 +697,72 @@ static int setup_shm(struct sandbox *sb)
return 1;
}
+ if ( pthread_mutex_init(&sb->shared->queue_lock, &attr) ) {
+ ERROR("Queue lock setup failed.\n");
+ return 1;
+ }
+
+ if ( pthread_mutex_init(&sb->shared->totals_lock, &attr) ) {
+ ERROR("Totals lock setup failed.\n");
+ return 1;
+ }
+
pthread_mutexattr_destroy(&attr);
return 0;
}
+static char *maybe_get_event_string(struct event *ev)
+{
+ if ( ev == NULL ) return "/";
+ return get_event_string(ev);
+}
+
+
+/* Assumes the caller is already holding queue_lock! */
+static int fill_queue(FILE *fh, int config_basename, struct detector *det,
+ const char *prefix, struct sandbox *sb)
+{
+ while ( sb->shared->n_events < QUEUE_SIZE ) {
+
+ struct filename_plus_event *ne;
+ char ev_string[MAX_EV_LEN];
+
+ ne = get_pattern(fh, config_basename, det, prefix);
+ if ( ne == NULL ) return 1; /* No more */
+
+ memset(ev_string, 0, MAX_EV_LEN);
+ snprintf(ev_string, MAX_EV_LEN, "%s %s %i", ne->filename,
+ maybe_get_event_string(ne->ev), sb->serial++);
+ memcpy(sb->shared->queue[sb->shared->n_events++], ev_string,
+ MAX_EV_LEN);
+ sem_post(&sb->shared->queue_sem);
+ free_filename_plus_event(ne);
+
+ }
+ return 0;
+}
+
+
void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tempdir)
{
int i;
- int allDone;
struct sigaction sa;
int r;
pthread_t reader_thread;
struct sandbox *sb;
size_t ll;
struct stat s;
+ int allDone = 0;
+
+ if ( n_proc > MAX_NUM_WORKERS ) {
+ ERROR("Number of workers (%i) is too large. Using %i\n",
+ n_proc, MAX_NUM_WORKERS);
+ n_proc = MAX_NUM_WORKERS;
+ }
sb = calloc(1, sizeof(struct sandbox));
if ( sb == NULL ) {
@@ -964,12 +777,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- pthread_mutex_init(&sb->lock, NULL);
pthread_mutex_init(&sb->reader->lock, NULL);
- sb->n_processed = 0;
- sb->n_hadcrystals = 0;
- sb->n_crystals = 0;
sb->n_processed_last_stats = 0;
sb->n_hadcrystals_last_stats = 0;
sb->n_crystals_last_stats = 0;
@@ -988,25 +797,15 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- sb->stream_pipe_write = calloc(n_proc, sizeof(int));
- if ( sb->stream_pipe_write == NULL ) {
- ERROR("Couldn't allocate memory for pipes.\n");
- return;
- }
+ sb->shared->n_processed = 0;
+ sb->shared->n_hadcrystals = 0;
+ sb->shared->n_crystals = 0;
+
+ sem_init(&sb->shared->queue_sem, 1, 0);
+ sem_init(&zombie_sem, 0, 0);
- lock_sandbox(sb);
- sb->filename_pipes = calloc(n_proc, sizeof(int));
- sb->result_fhs = calloc(n_proc, sizeof(FILE *));
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
- if ( sb->filename_pipes == NULL ) {
- ERROR("Couldn't allocate memory for pipes.\n");
- return;
- }
- if ( sb->result_fhs == NULL ) {
- ERROR("Couldn't allocate memory for pipe file handles.\n");
- return;
- }
if ( sb->pids == NULL ) {
ERROR("Couldn't allocate memory for PIDs.\n");
return;
@@ -1016,18 +815,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- sb->last_filename = calloc(n_proc, sizeof(char *));
- if ( sb->last_filename == NULL ) {
- ERROR("Couldn't allocate memory for last filename list.\n");
- return;
- }
- unlock_sandbox(sb);
-
- if ( pipe(signal_pipe) == -1 ) {
- ERROR("Failed to create signal pipe.\n");
- return;
- }
-
/* Set up signal handler to take action if any children die */
sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
sigemptyset(&sa.sa_mask);
@@ -1039,7 +826,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
if ( tempdir == NULL ) {
- tempdir = strdup("");
+ tempdir = "";
}
ll = 64+strlen(tempdir);
@@ -1068,12 +855,17 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
+ /* Fill the queue */
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ sb->shared->n_events = 0;
+ fill_queue(fh, config_basename, iargs->det, prefix, sb);
+ sb->shared->no_more = 0;
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+
/* Fork the right number of times */
- lock_sandbox(sb);
for ( i=0; i<n_proc; i++ ) {
start_worker_process(sb, i);
}
- unlock_sandbox(sb);
/* Start reader thread after forking, so that things are definitely
* "running" */
@@ -1083,202 +875,78 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- allDone = 0;
- while ( !allDone ) {
+ do {
- int r, i;
- struct timeval tv;
- fd_set fds;
+ int r;
double tNow;
- int fdmax;
-
- tv.tv_sec = 1;
- tv.tv_usec = 0;
-
- FD_ZERO(&fds);
- fdmax = 0;
- lock_sandbox(sb);
- for ( i=0; i<n_proc; i++ ) {
-
- int fd;
-
- if ( sb->result_fhs[i] == NULL) continue;
-
- fd = fileno(sb->result_fhs[i]);
- FD_SET(fd, &fds);
- if ( fd > fdmax ) fdmax = fd;
- }
- unlock_sandbox(sb);
-
- FD_SET(signal_pipe[0], &fds);
- if ( signal_pipe[0] > fdmax ) fdmax = signal_pipe[0];
-
- r = select(fdmax+1, &fds, NULL, NULL, &tv);
- if ( r == -1 ) {
- if ( errno == EINTR ) continue;
- ERROR("select() failed: %s\n", strerror(errno));
- break;
- }
+ sleep(5);
- if ( FD_ISSET(signal_pipe[0], &fds) ) {
-
- char d;
- read(signal_pipe[0], &d, 1);
- handle_zombie(sb);
+ /* Check for dead workers */
+ if ( sem_trywait(&zombie_sem) == 0 ) handle_zombie(sb);
+ /* Top up the queue if necessary */
+ r = 0;
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( sb->shared->n_events < QUEUE_SIZE/2 ) {
+ r = fill_queue(fh, config_basename, iargs->det, prefix,
+ sb);
}
-
- lock_sandbox(sb);
- for ( i=0; i<n_proc; i++ ) {
-
- struct filename_plus_event *nextImage;
- char results[1024];
- char *rval;
- int fd;
- char *eptr;
-
- if ( sb->result_fhs[i] == NULL ) continue;
-
- fd = fileno(sb->result_fhs[i]);
- if ( !FD_ISSET(fd, &fds) ) continue;
-
- rval = fgets(results, 1024, sb->result_fhs[i]);
- if ( rval == NULL ) {
- if ( !feof(sb->result_fhs[i]) ) {
- ERROR("fgets() failed: %s\n",
- strerror(errno));
- }
- sb->result_fhs[i] = NULL;
- continue;
- }
-
- chomp(results);
-
- strtol(results, &eptr, 10);
- if ( eptr == results ) {
- if ( strlen(results) > 0 ) {
- ERROR("Invalid result '%s'\n",
- results);
- }
- } else {
-
- int nc = atoi(results);
- sb->n_crystals += nc;
- if ( nc > 0 ) {
- sb->n_hadcrystals++;
- }
- sb->n_processed++;
-
- }
-
- /* Send next filename */
- nextImage = get_pattern(fh, config_basename,
- iargs->det, prefix);
-
- if ( sb->last_filename[i] != NULL ) {
- free_filename_plus_event(sb->last_filename[i]);
- }
-
- sb->last_filename[i] = nextImage;
-
- if ( nextImage == NULL ) {
-
- /* No more images */
- r = write(sb->filename_pipes[i], "\n", 1);
- if ( r < 0 ) {
- ERROR("Write pipe\n");
- }
-
- } else {
-
- char tmp[256];
-
- r = write(sb->filename_pipes[i],
- nextImage->filename,
- strlen(nextImage->filename));
-
- if ( r < 0 ) {
- ERROR("write pipe\n");
- }
-
- r = write(sb->filename_pipes[i], " ", 1);
- if ( r < 0 ) {
- ERROR("write pipe\n");
- }
-
- if ( nextImage->ev != NULL ) {
-
- r = write(sb->filename_pipes[i],
- get_event_string(nextImage->ev),
- strlen(get_event_string(nextImage->ev)));
- if ( r < 0 ) {
- ERROR("write pipe\n");
- }
-
- } else {
-
- r = write(sb->filename_pipes[i], "/", 1);
- if ( r < 0 ) {
- ERROR("write pipe\n");
- }
-
- }
-
- snprintf(tmp, 255, " %i", sb->serial++);
- r = write(sb->filename_pipes[i],
- tmp, strlen(tmp));
- if ( r < 0 ) {
- ERROR("write pipe\n");
- }
-
- r = write(sb->filename_pipes[i], "\n", 1);
- if ( r < 0 ) {
- ERROR("write pipe\n");
- }
-
- }
- }
-
- unlock_sandbox(sb);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
/* Update progress */
- lock_sandbox(sb);
tNow = get_monotonic_seconds();
r = pthread_mutex_trylock(&sb->shared->term_lock);
- if ((r==0) && (tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS))
- {
+ if ( r == 0 ) {
+
+ /* Could get lock, so write status */
+ int n_proc_this;
+ double indexable;
+
+ n_proc_this = sb->shared->n_processed
+ - sb->n_processed_last_stats;
+ indexable = (sb->shared->n_processed == 0) ? 0 :
+ 100.0 * sb->shared->n_hadcrystals
+ / sb->shared->n_processed;
STATUS("%4i indexable out of %4i processed (%4.1f%%), "
"%4i crystals so far. "
"%4i images processed since the last message.\n",
- sb->n_hadcrystals, sb->n_processed,
- (sb->n_processed == 0 ? 0 :
- 100.0 * sb->n_hadcrystals / sb->n_processed),
- sb->n_crystals,
- sb->n_processed - sb->n_processed_last_stats);
-
- sb->n_processed_last_stats = sb->n_processed;
- sb->n_hadcrystals_last_stats = sb->n_hadcrystals;
- sb->n_crystals_last_stats = sb->n_crystals;
- sb->t_last_stats = tNow;
+ sb->shared->n_hadcrystals,
+ sb->shared->n_processed, indexable,
+ sb->shared->n_crystals, n_proc_this);
+ sb->n_processed_last_stats = sb->shared->n_processed;
+ sb->n_hadcrystals_last_stats = sb->shared->n_hadcrystals;
+ sb->n_crystals_last_stats = sb->shared->n_crystals;
+ sb->t_last_stats = tNow;
- }
- if ( r == 0 ) pthread_mutex_unlock(&sb->shared->term_lock);
- unlock_sandbox(sb);
+ pthread_mutex_unlock(&sb->shared->term_lock);
- allDone = 1;
- lock_sandbox(sb);
- for ( i=0; i<n_proc; i++ ) {
- if ( sb->running[i] ) allDone = 0;
}
- unlock_sandbox(sb);
- }
+ /* Have all the events been swallowed? */
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( sb->shared->n_events == 0 ) allDone = 1;
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ } while ( !allDone );
fclose(fh);
+ /* Indicate to the workers that we are finished, and wake them up one
+ * last time */
+ STATUS("Waiting for the last patterns to be processed...\n");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ sb->shared->no_more = 1;
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ for ( i=0; i<n_proc; i++ ) {
+ sem_post(&sb->shared->queue_sem);
+ }
+ for ( i=0; i<n_proc; i++ ) {
+ int status;
+ waitpid(sb->pids[i], &status, 0);
+ }
+
/* Indicate to the reader thread that we are done */
pthread_mutex_lock(&sb->reader->lock);
sb->reader->done = 1;
@@ -1286,30 +954,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
pthread_join(reader_thread, NULL);
- for ( i=0; i<n_proc; i++ ) {
- int status;
- waitpid(sb->pids[i], &status, 0);
+ for ( i=0; i<sb->reader->n_read; i++ ) {
+ fclose(sb->reader->fhs[i]);
}
-
- for ( i=0; i<n_proc; i++ ) {
- close(sb->filename_pipes[i]);
- if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]);
- }
-
+ free(sb->reader->fhs);
+ free(sb->reader->fds);
free(sb->running);
- free(sb->filename_pipes);
- free(sb->result_fhs);
free(sb->pids);
free(sb->tmpdir);
- munmap(sb->shared, sizeof(struct sb_shm));
-
- pthread_mutex_destroy(&sb->lock);
+ free(sb->reader);
- STATUS("Final:"
- " %i images processed, %i had crystals (%.1f%%),"
+ STATUS("Final: %i images processed, %i had crystals (%.1f%%),"
" %i crystals overall.\n",
- sb->n_processed, sb->n_hadcrystals,
- 100.0 * sb->n_hadcrystals / sb->n_processed, sb->n_crystals);
+ sb->shared->n_processed, sb->shared->n_hadcrystals,
+ 100.0 * sb->shared->n_hadcrystals / sb->shared->n_processed,
+ sb->shared->n_crystals);
+ munmap(sb->shared, sizeof(struct sb_shm));
free(sb);
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 235825ce..67bc88c8 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2014 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2014 Thomas White <taw@physics.org>
+ * 2010-2015 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -31,11 +31,46 @@
*
*/
+#ifndef IM_SANDBOX_H
+#define IM_SANDBOX_H
+
+#include <semaphore.h>
+
+struct sb_shm;
+
#include "index.h"
#include "stream.h"
#include "cell.h"
#include "process_image.h"
+/* Length of event queue */
+#define QUEUE_SIZE (256)
+
+/* Maximum length of an event ID including serial number */
+#define MAX_EV_LEN (1024)
+
+/* Maximum number of workers */
+#define MAX_NUM_WORKERS (1024)
+
+struct sb_shm
+{
+ pthread_mutex_t term_lock;
+
+ pthread_mutex_t queue_lock;
+ int n_events;
+ char queue[QUEUE_SIZE][MAX_EV_LEN];
+ int no_more;
+ char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN];
+ sem_t queue_sem;
+
+ pthread_mutex_t totals_lock;
+ int n_processed;
+ int n_hadcrystals;
+ int n_crystals;
+};
+
extern void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
const char *tempdir);
+
+#endif /* IM_SANDBOX_H */
diff --git a/src/process_image.c b/src/process_image.c
index 22ff4ec0..e7e3aa78 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -51,6 +51,7 @@
#include "process_image.h"
#include "integration.h"
#include "predict-refine.h"
+#include "im-sandbox.h"
static void try_refine_autoR(struct image *image, Crystal *cr)
@@ -119,8 +120,8 @@ static void restore_image_data(float **dp, struct detector *det, float **bu)
void process_image(const struct index_args *iargs, struct pattern_args *pargs,
- Stream *st, int cookie, const char *tmpdir, int results_pipe,
- int serial, pthread_mutex_t *term_lock)
+ Stream *st, int cookie, const char *tmpdir,
+ int serial, struct sb_shm *sb_shared)
{
int check;
struct hdfile *hdfile;
@@ -131,6 +132,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
char *rn;
int n_crystals_left;
float **prefilter;
+ int any_crystals;
image.features = NULL;
image.data = NULL;
@@ -295,7 +297,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
iargs->ir_inn, iargs->ir_mid, iargs->ir_out,
iargs->int_diag, iargs->int_diag_h,
iargs->int_diag_k, iargs->int_diag_l,
- term_lock);
+ &sb_shared->term_lock);
ret = write_chunk(st, &image, hdfile,
iargs->stream_peaks, iargs->stream_refls,
@@ -315,12 +317,17 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
/* Count crystals which are still good */
- pargs->n_crystals = 0;
+ pthread_mutex_lock(&sb_shared->totals_lock);
+ any_crystals = 0;
for ( i=0; i<image.n_crystals; i++ ) {
if ( crystal_get_user_flag(image.crystals[i]) == 0 ) {
- pargs->n_crystals++;
+ sb_shared->n_crystals++;
+ any_crystals = 1;
}
}
+ sb_shared->n_processed++;
+ sb_shared->n_hadcrystals += any_crystals;
+ pthread_mutex_unlock(&sb_shared->totals_lock);
for ( i=0; i<image.n_crystals; i++ ) {
cell_free(crystal_get_cell(image.crystals[i]));
diff --git a/src/process_image.h b/src/process_image.h
index d982c4f0..0d5ca1b7 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -34,8 +34,10 @@
#include <config.h>
#endif
+struct index_args;
#include "integration.h"
+#include "im-sandbox.h"
enum {
@@ -95,16 +97,13 @@ struct pattern_args
{
/* "Input" */
struct filename_plus_event *filename_p_e;
-
- /* "Output" */
- int n_crystals;
};
extern void process_image(const struct index_args *iargs,
struct pattern_args *pargs, Stream *st,
- int cookie, const char *tmpdir, int results_pipe,
- int serial, pthread_mutex_t *term_lock);
+ int cookie, const char *tmpdir, int serial,
+ struct sb_shm *sb_shared);
-#endif /* PROCESS_IMAGEs_H */
+#endif /* PROCESS_IMAGE_H */