From 14d8651e663538be7445e74cef9206ce9ab4cf36 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 17 Jun 2015 14:03:55 +0200 Subject: Use a POSIX semaphore for synchronising terminal access during indexing Nice side-effect: indexamajig -j and --int-diag can now be used together --- libcrystfel/src/integration.c | 31 ++++++++-------- libcrystfel/src/integration.h | 7 ++-- src/im-sandbox.c | 85 +++++++++++++++++++++++++++---------------- src/indexamajig.c | 5 --- src/process_image.c | 4 +- src/process_image.h | 2 +- 6 files changed, 76 insertions(+), 58 deletions(-) diff --git a/libcrystfel/src/integration.c b/libcrystfel/src/integration.c index 825531a5..336e99ed 100644 --- a/libcrystfel/src/integration.c +++ b/libcrystfel/src/integration.c @@ -38,6 +38,7 @@ #include #include #include +#include #ifdef HAVE_CURSES_COLOR #include @@ -252,14 +253,14 @@ static void show_reference_profile(struct intcontext *ic, int i) static void show_peak_box(struct intcontext *ic, struct peak_box *bx, - int results_pipe) + sem_t *term_sem) { #ifdef HAVE_CURSES_COLOR int q; signed int h, k, l; double fs, ss; - if ( results_pipe != 0 ) write(results_pipe, "SUSPEND\n", 8); + if ( term_sem != NULL ) sem_wait(term_sem); initscr(); clear(); @@ -323,7 +324,7 @@ static void show_peak_box(struct intcontext *ic, struct peak_box *bx, getch(); endwin(); - if ( results_pipe != 0 ) write(results_pipe, "RELEASE\n", 8); + if ( term_sem != NULL ) sem_post(term_sem); #else STATUS("Not showing peak box because CrystFEL was compiled without " "ncurses.\n"); @@ -1170,7 +1171,7 @@ static int get_int_diag(struct intcontext *ic, Reflection *refl) static void integrate_prof2d_once(struct intcontext *ic, struct peak_box *bx, - int results_pipe) + sem_t *sem) { bx->intensity = fit_intensity(ic, bx); bx->sigma = calc_sigma(ic, bx); @@ -1200,8 +1201,7 @@ static void integrate_prof2d_once(struct intcontext *ic, struct peak_box *bx, set_redundancy(bx->refl, 0); } - if ( get_int_diag(ic, bx->refl) ) show_peak_box(ic, bx, - results_pipe); + if ( get_int_diag(ic, bx->refl) ) show_peak_box(ic, bx, sem); } else { @@ -1294,7 +1294,7 @@ static void integrate_prof2d(IntegrationMethod meth, Crystal *cr, struct image *image, IntDiag int_diag, signed int idh, signed int idk, signed int idl, double ir_inn, double ir_mid, double ir_out, - int results_pipe, int **masks) + sem_t *term_sem, int **masks) { RefList *list; UnitCell *cell; @@ -1338,7 +1338,7 @@ static void integrate_prof2d(IntegrationMethod meth, for ( i=0; ioffs_ss; set_detector_pos(refl, pfs, pss); - if ( get_int_diag(ic, refl) ) show_peak_box(ic, bx, results_pipe); + if ( get_int_diag(ic, refl) ) show_peak_box(ic, bx, term_sem); if ( intensity < -5.0*sigma ) { ic->n_implausible++; @@ -1550,7 +1550,7 @@ static void integrate_rings(IntegrationMethod meth, Crystal *cr, struct image *image, IntDiag int_diag, signed int idh, signed int idk, signed int idl, double ir_inn, double ir_mid, double ir_out, - int results_pipe, int **masks) + sem_t *term_sem, int **masks) { RefList *list; Reflection *refl; @@ -1586,7 +1586,7 @@ static void integrate_rings(IntegrationMethod meth, refl != NULL; refl = next_refl(refl, iter) ) { - integrate_rings_once(refl, image, &ic, cell, results_pipe); + integrate_rings_once(refl, image, &ic, cell, term_sem); } //refine_rigid_groups(&ic); @@ -1602,8 +1602,7 @@ void integrate_all_4(struct image *image, IntegrationMethod meth, PartialityModel pmodel, double push_res, double ir_inn, double ir_mid, double ir_out, IntDiag int_diag, - signed int idh, signed int idk, signed int idl, - int results_pipe) + signed int idh, signed int idk, signed int idl, sem_t *sem) { int i; int *masks[image->det->n_panels]; @@ -1643,14 +1642,14 @@ void integrate_all_4(struct image *image, IntegrationMethod meth, integrate_rings(meth, cr, image, int_diag, idh, idk, idl, ir_inn, ir_mid, ir_out, - results_pipe, masks); + sem, masks); break; case INTEGRATION_PROF2D : integrate_prof2d(meth, cr, image, int_diag, idh, idk, idl, ir_inn, ir_mid, ir_out, - results_pipe, masks); + sem, masks); break; default : diff --git a/libcrystfel/src/integration.h b/libcrystfel/src/integration.h index ca04157a..7414f818 100644 --- a/libcrystfel/src/integration.h +++ b/libcrystfel/src/integration.h @@ -3,11 +3,11 @@ * * Integration of intensities * - * Copyright © 2012-2014 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2014 Thomas White + * 2010-2015 Thomas White * * This file is part of CrystFEL. * @@ -33,6 +33,7 @@ #include #endif +#include #include "geometry.h" @@ -127,7 +128,7 @@ extern void integrate_all_4(struct image *image, IntegrationMethod meth, double ir_inn, double ir_mid, double ir_out, IntDiag int_diag, signed int idh, signed int idk, signed int idl, - int results_pipe); + sem_t *term_sem); #ifdef __cplusplus diff --git a/src/im-sandbox.c b/src/im-sandbox.c index ba709f6f..b800f5b4 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -48,6 +48,8 @@ #include #include #include +#include +#include #ifdef HAVE_CLOCK_GETTIME #include @@ -85,6 +87,12 @@ struct sb_reader }; +struct sb_shm +{ + sem_t term_sem; +}; + + struct sandbox { pthread_mutex_t lock; @@ -96,7 +104,6 @@ struct sandbox int n_hadcrystals_last_stats; int n_crystals_last_stats; int t_last_stats; - int suspend_stats; struct index_args *iargs; @@ -110,6 +117,8 @@ struct sandbox struct filename_plus_event **last_filename; int serial; + struct sb_shm *shared; + char *tmpdir; struct sb_reader *reader; @@ -359,7 +368,7 @@ static int read_fpe_data(struct buffer_data *bd) static void run_work(const struct index_args *iargs, int filename_pipe, int results_pipe, Stream *st, - int cookie, const char *tmpdir) + int cookie, const char *tmpdir, sem_t *term_sem) { FILE *fh; int allDone = 0; @@ -494,7 +503,7 @@ static void run_work(const struct index_args *iargs, pargs.n_crystals = 0; process_image(iargs, &pargs, st, cookie, tmpdir, - results_pipe, ser); + results_pipe, ser, term_sem); /* Request another image */ c = sprintf(buf, "%i\n", pargs.n_crystals); @@ -815,11 +824,12 @@ static void start_worker_process(struct sandbox *sb, int slot) st = open_stream_fd_for_write(stream_pipe[1]); run_work(sb->iargs, filename_pipe[0], result_pipe[1], - st, slot, tmp); + st, slot, tmp, &sb->shared->term_sem); close_stream(st); //close(filename_pipe[0]); close(result_pipe[1]); + munmap(sb->shared, sizeof(struct sb_shm)); free(sb); @@ -894,6 +904,25 @@ static void handle_zombie(struct sandbox *sb) } +static int setup_shm(struct sandbox *sb) +{ + sb->shared = mmap(NULL, sizeof(struct sb_shm), PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + if ( sb->shared == MAP_FAILED ) { + ERROR("SHM setup failed: %s\n", strerror(errno)); + return 1; + } + + if ( sem_init(&sb->shared->term_sem, 1, 1) ) { + ERROR("Terminal semaphore setup failed: %s\n", strerror(errno)); + return 1; + } + + return 0; +} + + void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir) @@ -930,7 +959,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->n_hadcrystals_last_stats = 0; sb->n_crystals_last_stats = 0; sb->t_last_stats = get_monotonic_seconds(); - sb->suspend_stats = 0; sb->n_proc = n_proc; sb->iargs = iargs; sb->serial = 1; @@ -939,6 +967,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->reader->fhs = NULL; sb->reader->stream = stream; + if ( setup_shm(sb) ) { + ERROR("Failed to set up SHM.\n"); + free(sb); + return; + } + sb->stream_pipe_write = calloc(n_proc, sizeof(int)); if ( sb->stream_pipe_write == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); @@ -1106,34 +1140,20 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, chomp(results); - if ( strcmp(results, "SUSPEND") == 0 ) { - sb->suspend_stats++; - continue; /* Do not send next filename */ - } else if ( strcmp(results, "RELEASE") == 0 ) { - if ( sb->suspend_stats > 0 ) { - sb->suspend_stats--; - } else { - ERROR("RELEASE before SUSPEND.\n"); + strtol(results, &eptr, 10); + if ( eptr == results ) { + if ( strlen(results) > 0 ) { + ERROR("Invalid result '%s'\n", + results); } - continue; /* Do not send next filename */ } else { - strtol(results, &eptr, 10); - if ( eptr == results ) { - if ( strlen(results) > 0 ) { - ERROR("Invalid result '%s'\n", - results); - } - } else { - - int nc = atoi(results); - sb->n_crystals += nc; - if ( nc > 0 ) { - sb->n_hadcrystals++; - } - sb->n_processed++; - + int nc = atoi(results); + sb->n_crystals += nc; + if ( nc > 0 ) { + sb->n_hadcrystals++; } + sb->n_processed++; } @@ -1210,8 +1230,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Update progress */ lock_sandbox(sb); tNow = get_monotonic_seconds(); - if ( !sb->suspend_stats - && (tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS) ) + r = sem_trywait(&sb->shared->term_sem); + if ((r==0) && (tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS)) { STATUS("%4i indexable out of %4i processed (%4.1f%%), " @@ -1228,7 +1248,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->n_crystals_last_stats = sb->n_crystals; sb->t_last_stats = tNow; + } + if ( r == 0 ) sem_post(&sb->shared->term_sem); unlock_sandbox(sb); allDone = 1; @@ -1264,6 +1286,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, free(sb->result_fhs); free(sb->pids); free(sb->tmpdir); + munmap(sb->shared, sizeof(struct sb_shm)); pthread_mutex_destroy(&sb->lock); diff --git a/src/indexamajig.c b/src/indexamajig.c index eb619a87..c7e4a270 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -695,11 +695,6 @@ int main(int argc, char *argv[]) free(int_diag); - if ( (n_proc > 1) && (iargs.int_diag != INTDIAG_NONE) ) { - n_proc = 1; - STATUS("Ignored \"-j\" because you used --int-diag.\n"); - } - } st = open_stream_for_write_2(outfile, geom_filename, argc, argv); diff --git a/src/process_image.c b/src/process_image.c index 1a5a3477..5ead2c61 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -78,7 +78,7 @@ static void try_refine_autoR(struct image *image, Crystal *cr) void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int results_pipe, - int serial) + int serial, sem_t *term_sem) { float *data_for_measurement; size_t data_size; @@ -260,7 +260,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, iargs->ir_inn, iargs->ir_mid, iargs->ir_out, iargs->int_diag, iargs->int_diag_h, iargs->int_diag_k, iargs->int_diag_l, - results_pipe); + term_sem); ret = write_chunk(st, &image, hdfile, iargs->stream_peaks, iargs->stream_refls, diff --git a/src/process_image.h b/src/process_image.h index de364772..6e44c173 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -104,7 +104,7 @@ struct pattern_args extern void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int results_pipe, - int serial); + int serial, sem_t *term_sem); #endif /* PROCESS_IMAGEs_H */ -- cgit v1.2.3