aboutsummaryrefslogtreecommitdiff
path: root/src/im-sandbox.c
diff options
context:
space:
mode:
authorValerio Mariani <valerio.mariani@desy.de>2014-05-09 11:02:17 +0200
committerThomas White <taw@physics.org>2014-09-05 18:12:38 +0200
commit45492b842c3af2af542256417a8bab5bbc7bd5f7 (patch)
tree53fc320ad0734940c5a3fe2d075ae7417787432a /src/im-sandbox.c
parentae9fa9e6bfd1ed98a2b146d2e228c69a9cd651cc (diff)
Multi-event mode
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r--src/im-sandbox.c386
1 files changed, 335 insertions, 51 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 42e4a090..48518b82 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -10,6 +10,7 @@
*
* Authors:
* 2010-2014 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -46,6 +47,7 @@
#include <fcntl.h>
#include <signal.h>
#include <sys/stat.h>
+#include <assert.h>
#ifdef HAVE_CLOCK_GETTIME
#include <time.h>
@@ -53,6 +55,10 @@
#include <sys/time.h>
#endif
+#include <events.h>
+#include <hdf5-file.h>
+#include <detector.h>
+
#include "im-sandbox.h"
#include "process_image.h"
@@ -101,7 +107,7 @@ struct sandbox
FILE **result_fhs;
int *filename_pipes;
int *stream_pipe_write;
- char **last_filename;
+ struct filename_plus_event **last_filename;
char *tmpdir;
@@ -125,48 +131,210 @@ static void unlock_sandbox(struct sandbox *sb)
}
-static char *get_pattern(FILE *fh, int config_basename, const char *prefix)
+static struct filename_plus_event *get_pattern
+ (FILE *fh, int config_basename, struct detector *det,
+ const char *prefix)
{
- char *line;
- char *filename;
+ char *line = NULL;
size_t len;
+ struct filename_plus_event *fne;
+ struct hdfile *hdfile;
+ char filename_buf[2014];
+ char event_buf[2014];
- do {
+ static char *filename = NULL;
+ static struct event_list *ev_list = NULL;
+ static int event_index = -1;
- /* Get the next filename */
- char *rval;
+ line = malloc(1024*sizeof(char));
- line = malloc(1024*sizeof(char));
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) {
+ while ( event_index == -1 ) {
+
+ int scan_check;
+
+ do {
+
+ /* Get the next filename */
+ char *rval;
+
+ rval = fgets(line, 1023, fh);
+ if ( rval == NULL ) {
+ free(line);
+ return NULL;
+ }
+
+ chomp(line);
+
+ } while ( strlen(line) == 0 );
+
+ if ( config_basename ) {
+ char *tmp;
+ tmp = safe_basename(line);
free(line);
- return NULL;
+ line = tmp;
}
- chomp(line);
+ scan_check = sscanf(line, "%s %s", filename_buf, event_buf );
- } while ( strlen(line) == 0 );
+ len = strlen(prefix)+strlen(filename_buf)+1;
- if ( config_basename ) {
- char *tmp;
- tmp = safe_basename(line);
- free(line);
- line = tmp;
- }
+ /* Round the length of the buffer, too keep Valgrind quiet when it gets
+ * given to write() a bit later on */
+ len += 4 - (len % 4);
+
+ if ( filename == NULL ) {
+ filename = malloc(len);
+ } else {
+ char *new_filename;
+ new_filename = realloc(filename, len*sizeof(char));
+ if ( filename == NULL ) {
+ return NULL;
+ }
+ filename = new_filename;
+ }
+
+ snprintf(filename, 1023, "%s%s", prefix, filename_buf);
+
+ if ( det->path_dim != 0 || det->dim_dim != 0 ) {
+
+ ev_list = initialize_event_list();
- len = strlen(prefix)+strlen(line)+1;
+ if ( scan_check == 1) {
- /* Round the length of the buffer, too keep Valgrind quiet when it gets
- * given to write() a bit later on */
- len += 4 - (len % 4);
+ hdfile = hdfile_open(filename);
+ if ( hdfile == NULL ) {
+ ERROR("Failed to open file %s\n", filename);
+ free(line);
+ return NULL;
+ }
+
+ if ( ev_list != NULL ) {
+ free_event_list(ev_list);
+ }
- filename = malloc(len);
+ ev_list = fill_event_list(hdfile, det);
+
+ if ( ev_list->num_events == 0 ) {
+ event_index = -1;
+ } else {
+ event_index = 0;
+ }
+
+ hdfile_close(hdfile);
+
+ } else {
+
+ struct event *ev_to_add;
+
+ ev_to_add = get_event_from_event_string(event_buf);
+ append_event_to_event_list(ev_list, ev_to_add);
+ free_event(ev_to_add);
+ event_index = 0;
+
+ }
+ } else {
- snprintf(filename, 1023, "%s%s", prefix, line);
+ event_index = 0;
+
+ }
+ }
+
+ fne = malloc(sizeof(struct filename_plus_event));
+ fne->filename = strdup(filename);
+
+ if ( det->path_dim !=0 || det->dim_dim !=0 ) {
+ fne->ev = copy_event(ev_list->events[event_index]);
+ if ( event_index != ev_list->num_events-1 ) {
+ event_index += 1;
+ } else {
+ event_index = -1;
+ }
+ } else {
+ fne->ev = NULL;
+ event_index = -1;
+ }
free(line);
+ return fne;
+}
+
+
+struct buffer_data
+{
+ char *rbuffer;
+ char *line;
+ int fd;
+ int rbufpos;
+ int rbuflen;
+};
+
+
+static int read_fpe_data(struct buffer_data *bd)
+{
+ int rval;
+ int no_line = 0;
+
+ rval = read(bd->fd, bd->rbuffer+bd->rbufpos, bd->rbuflen-bd->rbufpos);
+ if ( (rval == -1) || (rval == 0) ) return 1;
+ bd->rbufpos += rval;
+ assert(bd->rbufpos <= bd->rbuflen);
+
+ while ( (!no_line) && (bd->rbufpos > 0) ) {
+
+ int i;
+ int line_ready = 0;
+ int line_end = 0;
+
+ /* See if there's a full line in the buffer yet */
+ for ( i=0; i<bd->rbufpos; i++ ) {
+
+ /* Is there a line in the buffer? */
+ if ( strncmp(&bd->rbuffer[i] ,"\n" ,1 ) == 0 ) {
+ line_end = i;
+ line_ready = 1;
+ break;
+ }
+
+ }
+
+ if ( line_ready ) {
+
+ int new_rbuflen;
+
+ if ( bd->line != NULL ) {
+ free(bd->line);
+ }
+
+ bd->line = strdup(bd->rbuffer);
+
+ /* Now the block's been parsed, it should be
+ * forgotten about */
+ memmove(bd->rbuffer,
+ bd->rbuffer + line_end + 2,
+ bd->rbuflen - line_end - 2);
+
+ /* Subtract the number of bytes removed */
+ bd->rbufpos = bd->rbufpos - line_end - 1;
+ new_rbuflen = bd->rbuflen - line_end - 2 ;
+ if ( new_rbuflen == 0 ) new_rbuflen = 256;
+ bd->rbuffer = realloc(bd->rbuffer, new_rbuflen*sizeof(char));
+ bd->rbuflen = new_rbuflen;
+
+ return 1;
+
+ } else {
+
+ if ( bd->rbufpos == bd->rbuflen ) {
+ bd->rbuffer = realloc(bd->rbuffer, bd->rbuflen + 256);
+ bd->rbuflen = bd->rbuflen + 256;
+ }
+ no_line = 1;
+
+ }
+
+ }
- return filename;
+ return 0;
}
@@ -174,9 +342,19 @@ static void run_work(const struct index_args *iargs,
int filename_pipe, int results_pipe, Stream *st,
int cookie, const char *tmpdir)
{
- int allDone = 0;
FILE *fh;
+ int allDone = 0;
int w;
+ unsigned int opts;
+ struct buffer_data *bd;
+
+ bd = malloc(sizeof(struct buffer_data));
+ bd->rbuffer = malloc(256*sizeof(char));
+ bd->rbuflen = 256;
+ bd->rbufpos = 0;
+ bd->line = NULL;
+ bd->fd = 0;
+
fh = fdopen(filename_pipe, "r");
if ( fh == NULL ) {
@@ -189,36 +367,101 @@ static void run_work(const struct index_args *iargs,
ERROR("Failed to send request for first filename.\n");
}
+ bd->fd = fileno(fh);
+
+ /* Set non-blocking */
+ opts = fcntl(bd->fd, F_GETFL);
+ fcntl(bd->fd, F_SETFL, opts | O_NONBLOCK);
+
while ( !allDone ) {
struct pattern_args pargs;
int c;
- char *line;
- char *rval;
+ int error;
+ int rval;
char buf[1024];
- line = malloc(1024*sizeof(char));
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) {
+ error = 0;
+ pargs.filename_p_e = initialize_filename_plus_event();
- ERROR("Read error!\n");
- free(line);
+ rval =0;
+
+ do {
+
+ fd_set fds;
+ struct timeval tv;
+ int sval;
+
+ FD_ZERO(&fds);
+ FD_SET(bd->fd, &fds);
+
+ tv.tv_sec = 30;
+ tv.tv_usec = 0;
+
+ sval = select(bd->fd+1, &fds, NULL, NULL, &tv);
+
+ if ( sval == -1 ) {
+
+ const int err = errno;
+
+ switch ( err ) {
+
+ case EINTR:
+ STATUS("Restarting select()\n");
+ break;
+
+ default:
+ ERROR("select() failed: %s\n", strerror(err));
+ rval = 1;
+
+ }
+
+ } else if ( sval != 0 ) {
+ rval = read_fpe_data(bd);
+ } else {
+ ERROR("No data sent from main process..\n");
+ rval = 1;
+ error = 1;
+ }
+
+ } while ( !rval );
+
+ if ( error == 1 ) {
allDone = 1;
continue;
-
}
- chomp(line);
+ chomp(bd->line);
- if ( strlen(line) == 0 ) {
+ if ( strlen(bd->line) == 0 ) {
allDone = 1;
} else {
- pargs.filename = line;
- pargs.n_crystals = 0;
+ char filename[1024];
+ char event_str[1024];
+ struct event* ev;
+
+ sscanf(bd->line, "%s %s", filename, event_str);
+ pargs.filename_p_e->filename = strdup(filename);
+
+ if ( strcmp(event_str, "/") != 0 ) {
+
+ ev = get_event_from_event_string(event_str);
+ if ( ev == NULL ) {
+ ERROR("Error in event recovery\n");
+ }
+ pargs.filename_p_e->ev = ev;
+
+ } else {
+
+ pargs.filename_p_e->ev = NULL;
+
+ }
+
+ pargs.n_crystals = 0;
process_image(iargs, &pargs, st, cookie, tmpdir,
results_pipe);
@@ -229,12 +472,16 @@ static void run_work(const struct index_args *iargs,
ERROR("write P0\n");
}
- }
+ free_filename_plus_event(pargs.filename_p_e);
- free(line);
+ }
}
+ free(bd->line);
+ free(bd->rbuffer);
+ free(bd);
+
cleanup_indexing(iargs->indm, iargs->ipriv);
free(iargs->indm);
free(iargs->ipriv);
@@ -603,8 +850,9 @@ static void handle_zombie(struct sandbox *sb)
if ( WIFSIGNALED(status) ) {
STATUS("Worker %i was killed by signal %i\n",
i, WTERMSIG(status));
- STATUS("Last filename was: %s\n",
- sb->last_filename[i]);
+ STATUS("Last filename was: %s (%s)\n",
+ sb->last_filename[i]->filename,
+ get_event_string(sb->last_filename[i]->ev) );
sb->n_processed++;
start_worker_process(sb, i);
}
@@ -804,7 +1052,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
lock_sandbox(sb);
for ( i=0; i<n_proc; i++ ) {
- char *nextImage;
+ struct filename_plus_event *nextImage;
char results[1024];
char *rval;
int fd;
@@ -857,27 +1105,62 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
/* Send next filename */
- nextImage = get_pattern(fh, config_basename, prefix);
+ nextImage = get_pattern(fh, config_basename, iargs->det, prefix);
+
+ if ( sb->last_filename[i] != NULL ) {
+ free_filename_plus_event(sb->last_filename[i]);
+ }
- free(sb->last_filename[i]);
sb->last_filename[i] = nextImage;
if ( nextImage == NULL ) {
+
/* No more images */
r = write(sb->filename_pipes[i], "\n", 1);
if ( r < 0 ) {
ERROR("Write pipe\n");
}
+
} else {
- r = write(sb->filename_pipes[i], nextImage,
- strlen(nextImage));
- r -= write(sb->filename_pipes[i], "\n", 1);
+
+ r = write(sb->filename_pipes[i], nextImage->filename,
+ strlen(nextImage->filename));
+
if ( r < 0 ) {
ERROR("write pipe\n");
}
- }
+ r = write(sb->filename_pipes[i], " ", 1);
+ if ( r < 0 ) {
+ ERROR("write pipe\n");
+ }
+
+ if ( nextImage->ev != NULL ) {
+
+ r = write(sb->filename_pipes[i],
+ get_event_string(nextImage->ev),
+ strlen(get_event_string(nextImage->ev)));
+ if ( r < 0 ) {
+ ERROR("write pipe\n");
+ }
+
+ } else {
+
+ r = write(sb->filename_pipes[i], "/", 1);
+ if ( r < 0 ) {
+ ERROR("write pipe\n");
+ }
+
+ }
+
+ r = write(sb->filename_pipes[i], "\n", 1);
+ if ( r < 0 ) {
+ ERROR("write pipe\n");
+ }
+
+ }
}
+
unlock_sandbox(sb);
/* Update progress */
@@ -891,7 +1174,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
"%4i crystals so far. "
"%4i images processed since the last message.\n",
sb->n_hadcrystals, sb->n_processed,
- 100.0 * sb->n_hadcrystals / sb->n_processed,
+ (sb->n_processed == 0 ? 0 :
+ 100.0 * sb->n_hadcrystals / sb->n_processed),
sb->n_crystals,
sb->n_processed - sb->n_processed_last_stats);