From f7cef79ae1c8db2770f067735698850ef88da04e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 22 Feb 2021 15:56:40 +0100 Subject: Read job progress from written log files, even for local BE This simplifies the backends somewhat, and makes them look more similar - e.g. there is now only one routine to find out how far along a merging job is. It has the added bonus of adding a log file for local jobs, which we would've had to add soon anyway. --- src/crystfel_gui.h | 9 +++ src/gui_ambi.c | 41 +++++++++- src/gui_ambi.h | 2 + src/gui_backend_local.c | 204 ++++++++++++------------------------------------ src/gui_backend_slurm.c | 93 +++++++++++----------- src/gui_index.c | 26 ++++++ src/gui_index.h | 2 + src/gui_merge.c | 48 +++++++++++- src/gui_merge.h | 4 + 9 files changed, 226 insertions(+), 203 deletions(-) diff --git a/src/crystfel_gui.h b/src/crystfel_gui.h index fd80c9d1..61d39f60 100644 --- a/src/crystfel_gui.h +++ b/src/crystfel_gui.h @@ -38,6 +38,15 @@ struct gui_job_notes_page }; +enum gui_job_type +{ + GUI_JOB_INDEXING, + GUI_JOB_PROCESS_HKL, + GUI_JOB_PROCESS_HKL_SCALE, + GUI_JOB_PARTIALATOR, + GUI_JOB_AMBIGATOR, +}; + extern void add_running_task(struct crystfelproject *proj, const char *task_desc, struct crystfel_backend *backend, diff --git a/src/gui_ambi.c b/src/gui_ambi.c index 0535dd99..8d07f436 100644 --- a/src/gui_ambi.c +++ b/src/gui_ambi.c @@ -509,8 +509,47 @@ int write_ambigator_script(const char *filename, } fprintf(fh, " --iterations=%i", params->niter); - fprintf(fh, " --fg-graph=fg.dat\n"); + fprintf(fh, " --fg-graph=fg.dat"); + fprintf(fh, " >stdout.log 2>stderr.log\n"); fclose(fh); return 0; } + + +double read_ambigator_progress(char *logfile_str, int niter) +{ + FILE *fh; + double iter_inc; + double frac_complete = 0.0; + + iter_inc = 0.8/niter; + + fh = fopen(logfile_str, "r"); + if ( fh == NULL ) return 0.0; + + do { + char line[1024]; + int junk; + + if ( fgets(line, 1024, fh) == NULL ) break; + + if ( strncmp(line, "Mean number of correlations per crystal:", 40) == 0 ) { + frac_complete = 0.1; + } + if ( strncmp(line, "Mean f,g =", 10) == 0 ) { + frac_complete += iter_inc; + } + if ( sscanf(line, "%d assignments are different from " + "their starting values\n", &junk) == 1 ) + { + frac_complete = 1.0; + } + + } while ( 1 ); + + fclose(fh); + + printf("got %f\n", frac_complete); + return frac_complete; +} diff --git a/src/gui_ambi.h b/src/gui_ambi.h index f577e3c1..c6b68b35 100644 --- a/src/gui_ambi.h +++ b/src/gui_ambi.h @@ -42,4 +42,6 @@ extern int write_ambigator_script(const char *filename, struct ambi_params *params, const char *out_stream); +extern double read_ambigator_progress(char *logfile_str, int niter); + #endif diff --git a/src/gui_backend_local.c b/src/gui_backend_local.c index 26363152..3cb24e27 100644 --- a/src/gui_backend_local.c +++ b/src/gui_backend_local.c @@ -62,20 +62,19 @@ struct local_ambi_opts struct local_job { - double frac_complete; + enum gui_job_type type; + int n_frames; + int niter; /* When both these are true, free the job resources */ int running; int cancelled; - int process_hkl; /* vs partialator */ - int scale; + char *stderr_filename; - guint io_watch; GPid pid; guint child_watch_source; - guint io_readable_source; GFile *workdir; }; @@ -89,45 +88,6 @@ static void watch_subprocess(GPid pid, gint status, gpointer vp) } -static gboolean index_readable(GIOChannel *source, GIOCondition cond, - void *vp) -{ - GIOStatus r; - GError *err = NULL; - struct local_job *job = vp; - gchar *line; - - r = g_io_channel_read_line(source, &line, NULL, NULL, &err); - if ( r == G_IO_STATUS_EOF ) { - STATUS("End of output.\n"); - return FALSE; - } - if ( r != G_IO_STATUS_NORMAL ) { - if ( job->pid != 0 ) { - STATUS("Read error?\n"); - } else { - STATUS("End of output (indexamajig exited)\n"); - } - return FALSE; - } - chomp(line); - - if ( strncmp(line, "Final: ", 7) == 0 ) { - job->frac_complete = 1.0; - } else if ( strstr(line, " images processed, ") != NULL ) { - int n_proc; - sscanf(line, "%i ", &n_proc); - job->frac_complete = (double)n_proc/job->n_frames; - } else { - STATUS("%s\n", line); - } - - g_free(line); - - return TRUE; -} - - static int write_file_list(GFile *workdir, const char *listname, char **filenames, @@ -175,16 +135,15 @@ static struct local_job *start_local_job(char **args, const char *job_title, GFile *workdir_file, struct crystfelproject *proj, - GIOFunc readable_func, - int phkl, int scale) + enum gui_job_type type) { - GIOChannel *ioch; int i; int r; int ch_stderr; GError *error; struct local_job *job; char *workdir_str; + GFile *stderr_gfile; workdir_str = g_file_get_path(workdir_file); if ( workdir_str == NULL ) return NULL; @@ -192,10 +151,8 @@ static struct local_job *start_local_job(char **args, job = malloc(sizeof(struct local_job)); if ( job == NULL ) return NULL; - job->frac_complete = 0.0; job->workdir = workdir_file; - job->process_hkl = phkl; - job->scale = scale; + job->type = type; STATUS("Running program: "); i = 0; @@ -219,16 +176,14 @@ static struct local_job *start_local_job(char **args, } job->running = 1; + stderr_gfile = g_file_get_child(workdir_file, "stderr.log"); + job->stderr_filename = g_file_get_path(stderr_gfile); + g_object_unref(stderr_gfile); + job->child_watch_source = g_child_watch_add(job->pid, watch_subprocess, job); - ioch = g_io_channel_unix_new(ch_stderr); - job->io_readable_source = g_io_add_watch(ioch, - G_IO_IN | G_IO_ERR | G_IO_HUP, - readable_func, - job); - return job; } @@ -237,9 +192,32 @@ static int get_task_status(void *job_priv, int *running, float *frac_complete) { + int n_proc; struct local_job *job = job_priv; - *frac_complete = job->frac_complete; + *running = job->running; + + switch ( job->type ) { + + case GUI_JOB_INDEXING : + n_proc = read_number_processed(job->stderr_filename); + *frac_complete = (double)n_proc / job->n_frames; + break; + + case GUI_JOB_AMBIGATOR : + *frac_complete = read_ambigator_progress(job->stderr_filename, + job->niter); + break; + + case GUI_JOB_PROCESS_HKL : + case GUI_JOB_PROCESS_HKL_SCALE : + case GUI_JOB_PARTIALATOR : + *frac_complete = read_merge_progress(job->stderr_filename, + job->type); + break; + + } + return 0; } @@ -387,91 +365,6 @@ static GtkWidget *make_merging_parameters_widget(void *opts_priv) } -static gboolean merge_readable(GIOChannel *source, GIOCondition cond, - void *vp) -{ - GIOStatus r; - GError *err = NULL; - struct local_job *job = vp; - gchar *line; - - r = g_io_channel_read_line(source, &line, NULL, NULL, &err); - if ( r == G_IO_STATUS_EOF ) { - STATUS("End of output.\n"); - if ( job->frac_complete > 0.91 ) { - job->frac_complete = 1.0; - } - return FALSE; - } - if ( r != G_IO_STATUS_NORMAL ) { - if ( job->pid != 0 ) { - STATUS("Read error?\n"); - } else { - STATUS("End of output (merge exited)\n"); - } - return FALSE; - } - - if ( job->process_hkl ) { - if ( job->scale ) { - job->frac_complete += 1.0/6.0; - } else { - job->frac_complete += 1.0/3.0; - } - } else { - int cycle, max_cycles; - if ( strcmp(line, "Initial partiality calculation...\n") == 0 ) { - job->frac_complete = 0.1; - } - if ( sscanf(line, "Scaling and refinement cycle %d of %d\n", - &cycle, &max_cycles) == 2 ) - { - job->frac_complete = 0.1 + 0.8*(double)cycle/max_cycles; - } - if ( strcmp(line, "Final merge...\n") == 0 ) { - job->frac_complete = 0.9; - } - if ( strncmp(line, "Writing two-way split", 20) == 0 ) { - job->frac_complete = 0.95; - } - } - - g_free(line); - - return TRUE; -} - - -static gboolean ambi_readable(GIOChannel *source, GIOCondition cond, - void *vp) -{ - GIOStatus r; - GError *err = NULL; - struct local_job *job = vp; - gchar *line; - - r = g_io_channel_read_line(source, &line, NULL, NULL, &err); - if ( r == G_IO_STATUS_EOF ) { - STATUS("End of output.\n"); - return FALSE; - } - if ( r != G_IO_STATUS_NORMAL ) { - if ( job->pid != 0 ) { - STATUS("Read error?\n"); - } else { - STATUS("End of output (merge exited)\n"); - } - return FALSE; - } - - /* FIXME: Calculate the fraction complete */ - job->frac_complete = 0.5; - - g_free(line); - return TRUE; -} - - static void *run_ambi(const char *job_title, const char *job_notes, struct crystfelproject *proj, @@ -507,7 +400,8 @@ static void *run_ambi(const char *job_title, args[1] = sc_filename; args[2] = NULL; job = start_local_job(args, job_title, workdir, - proj, ambi_readable, 0, 0); + proj, GUI_JOB_AMBIGATOR); + job->niter = proj->ambi_params.niter; } else { job = NULL; } @@ -551,20 +445,20 @@ static void *run_merging(const char *job_title, &proj->merging_params, "crystfel.hkl") ) { char *args[3]; - int process_hkl, scale; + enum gui_job_type type; args[0] = "sh"; args[1] = sc_filename; args[2] = NULL; if ( strcmp(proj->merging_params.model, "process_hkl") == 0 ) { - process_hkl = 1; - scale = proj->merging_params.scale; + if ( proj->merging_params.scale ) { + type = GUI_JOB_PROCESS_HKL_SCALE; + } else { + type = GUI_JOB_PROCESS_HKL; + } } else { - process_hkl = 0; - scale = 1; + type = GUI_JOB_PARTIALATOR; } - job = start_local_job(args, job_title, workdir, - proj, merge_readable, - process_hkl, scale); + job = start_local_job(args, job_title, workdir, proj, type); } else { job = NULL; } @@ -609,6 +503,7 @@ static void *run_indexing(const char *job_title, char **streams; int i; GFile *workdir; + GFile *stderr_gfile; workdir = make_job_folder(job_title, job_notes); if ( workdir == NULL ) return NULL; @@ -636,13 +531,16 @@ static void *run_indexing(const char *job_title, } STATUS("\n"); - job = start_local_job(args, job_title, workdir, - proj, index_readable, 0, 0); + job = start_local_job(args, job_title, workdir, proj, GUI_JOB_INDEXING); if ( job == NULL ) return NULL; /* Indexing-specific job data */ job->n_frames = proj->n_frames; + stderr_gfile = g_file_get_child(workdir, "stderr.log"); + job->stderr_filename = g_file_get_path(stderr_gfile); + g_object_unref(stderr_gfile); + streams = malloc(sizeof(char *)); if ( streams != NULL ) { GFile *stream_gfile = g_file_get_child(job->workdir, diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c index 69990a93..44eb1768 100644 --- a/src/gui_backend_slurm.c +++ b/src/gui_backend_slurm.c @@ -61,16 +61,10 @@ struct slurm_ambi_opts char *email_address; }; -enum slurm_job_type -{ - SLURM_JOB_INDEXING, - SLURM_JOB_MERGING, - SLURM_JOB_AMBIGATOR, -}; - struct slurm_job { - enum slurm_job_type type; + enum gui_job_type type; + GFile *workdir; /* For indexing job (don't worry - will be replaced soon!) */ int n_frames; @@ -80,35 +74,11 @@ struct slurm_job /* For merging/ambigator job */ uint32_t job_id; + char *stderr_filename; + int niter; }; -static int read_number_processed(const char *filename) -{ - FILE *fh = fopen(filename, "r"); - int n_proc; - - /* Normal situation if SLURM job hasn't started yet */ - if ( fh == NULL ) return 0; - - do { - char line[1024]; - if ( fgets(line, 1024, fh) == NULL ) break; - - if ( strncmp(line, "Final: ", 7) == 0 ) { - sscanf(line, "Final: %i images processed", &n_proc); - } else if ( strstr(line, " images processed, ") != NULL ) { - sscanf(line, "%i ", &n_proc); - } - - } while ( 1 ); - - fclose(fh); - - return n_proc; -} - - static int job_running(uint32_t job_id) { job_info_msg_t *job_info; @@ -151,7 +121,9 @@ static int get_task_status(void *job_priv, int n_proc = 0; int all_complete = 1; - if ( job->type == SLURM_JOB_INDEXING ) { + switch ( job->type ) { + + case GUI_JOB_INDEXING : for ( i=0; in_blocks; i++ ) { n_proc += read_number_processed(job->stderr_filenames[i]); @@ -167,10 +139,22 @@ static int get_task_status(void *job_priv, *frac_complete = (double)n_proc / job->n_frames; *running = 1 - all_complete; + break; + + case GUI_JOB_AMBIGATOR : + *frac_complete = read_ambigator_progress(job->stderr_filename, + job->niter); + *running = job_running(job->job_id); + break; + + case GUI_JOB_PROCESS_HKL : + case GUI_JOB_PROCESS_HKL_SCALE : + case GUI_JOB_PARTIALATOR : + *frac_complete = read_merge_progress(job->stderr_filename, + job->type); + *running = job_running(job->job_id); + break; - } else { - *frac_complete = 0.5; - *running = 1; } return 0; @@ -181,7 +165,7 @@ static void cancel_task(void *job_priv) { int i; struct slurm_job *job = job_priv; - if ( job->type == SLURM_JOB_INDEXING ) { + if ( job->type == GUI_JOB_INDEXING ) { for ( i=0; in_blocks; i++) { if ( job->job_ids[i] == 0 ) continue; STATUS("Stopping SLURM job %i\n", job->job_ids[i]); @@ -322,10 +306,10 @@ static uint32_t submit_batch_job(const char *geom_filename, /* For submitting a single script to the SLURM cluster. * Used for merging and ambigator, but not for indexing - that needs something * more sophisticated. */ -static struct slurm_job *start_slurm_job(enum slurm_job_type type, +static struct slurm_job *start_slurm_job(enum gui_job_type type, const char *script_filename, const char *jobname, - const char *workdir, + GFile *workdir, const char *partition, const char *email_address) { @@ -335,6 +319,7 @@ static struct slurm_job *start_slurm_job(enum slurm_job_type type, struct slurm_job *job; job_desc_msg_t job_desc_msg; submit_response_msg_t *resp; + GFile *stderr_gfile; int r; script = load_entire_file(script_filename); @@ -361,7 +346,7 @@ static struct slurm_job *start_slurm_job(enum slurm_job_type type, job_desc_msg.name = safe_strdup(jobname); job_desc_msg.std_err = strdup("stderr.log"); job_desc_msg.std_out = strdup("stdout.log"); - job_desc_msg.work_dir = strdup(workdir); + job_desc_msg.work_dir = g_file_get_path(workdir); job_desc_msg.script = script; job_desc_msg.environment = env; job_desc_msg.env_size = n_env; @@ -381,6 +366,10 @@ static struct slurm_job *start_slurm_job(enum slurm_job_type type, STATUS("Submitted SLURM job ID %i\n", resp->job_id); + stderr_gfile = g_file_get_child(workdir, "stderr.log"); + job->stderr_filename = g_file_get_path(stderr_gfile); + g_object_unref(stderr_gfile); + job->job_id = resp->job_id; slurm_free_submit_response_response_msg(resp); @@ -445,7 +434,7 @@ static void *run_indexing(const char *job_title, job = malloc(sizeof(struct slurm_job)); if ( job == NULL ) return 0; - job->type = SLURM_JOB_INDEXING; + job->type = GUI_JOB_INDEXING; job->n_frames = proj->n_frames; job->n_blocks = proj->n_frames / opts->block_size; if ( proj->n_frames % opts->block_size ) job->n_blocks++; @@ -788,9 +777,10 @@ static void *run_ambi(const char *job_title, &proj->ambi_params, stream_str) ) { char *workdir_str = g_file_get_path(workdir); - job = start_slurm_job(SLURM_JOB_AMBIGATOR, - sc_filename, job_title, workdir_str, + job = start_slurm_job(GUI_JOB_AMBIGATOR, + sc_filename, job_title, workdir, opts->partition, opts->email_address); + job->niter = proj->ambi_params.niter; g_free(workdir_str); } else { job = NULL; @@ -834,8 +824,17 @@ static void *run_merging(const char *job_title, &proj->merging_params, "crystfel.hkl") ) { char *workdir_str = g_file_get_path(workdir); - job = start_slurm_job(SLURM_JOB_MERGING, - sc_filename, job_title, workdir_str, + enum gui_job_type type; + if ( strcmp(proj->merging_params.model, "process_hkl") == 0 ) { + if ( proj->merging_params.scale ) { + type = GUI_JOB_PROCESS_HKL_SCALE; + } else { + type = GUI_JOB_PROCESS_HKL; + } + } else { + type = GUI_JOB_PARTIALATOR; + } + job = start_slurm_job(type, sc_filename, job_title, workdir, opts->partition, opts->email_address); g_free(workdir_str); } else { diff --git a/src/gui_index.c b/src/gui_index.c index 68c89ca2..50b360f0 100644 --- a/src/gui_index.c +++ b/src/gui_index.c @@ -771,3 +771,29 @@ char **indexamajig_command_line(const char *geom_filename, args[n_args] = NULL; return args; } + + +int read_number_processed(const char *filename) +{ + FILE *fh = fopen(filename, "r"); + int n_proc; + + /* Normal situation if SLURM job hasn't started yet */ + if ( fh == NULL ) return 0; + + do { + char line[1024]; + if ( fgets(line, 1024, fh) == NULL ) break; + + if ( strncmp(line, "Final: ", 7) == 0 ) { + sscanf(line, "Final: %i images processed", &n_proc); + } else if ( strstr(line, " images processed, ") != NULL ) { + sscanf(line, "%i ", &n_proc); + } + + } while ( 1 ); + + fclose(fh); + + return n_proc; +} diff --git a/src/gui_index.h b/src/gui_index.h index 3cd59c9a..7b2c6e19 100644 --- a/src/gui_index.h +++ b/src/gui_index.h @@ -48,4 +48,6 @@ extern char **indexamajig_command_line(const char *geom_filename, struct peak_params *peak_search_params, struct index_params *indexing_params); +extern int read_number_processed(const char *filename); + #endif diff --git a/src/gui_merge.c b/src/gui_merge.c index 762c7e95..cb00b330 100644 --- a/src/gui_merge.c +++ b/src/gui_merge.c @@ -402,7 +402,7 @@ static int write_partialator_script(const char *filename, fprintf(fh, " --iterations=%i", params->niter); - fprintf(fh, "\n"); + fprintf(fh, " >stdout.log 2>stderr.log\n"); fclose(fh); return 0; @@ -437,7 +437,7 @@ static void add_process_hkl(FILE *fh, fprintf(fh, " --max-adu=%f", params->max_adu); fprintf(fh, " --min-res=%f", params->min_res); fprintf(fh, " --push-res=%f", params->push_res); - fprintf(fh, " %s\n", extra_arg); + fprintf(fh, " %s >>stdout.log 2>>stderr.log\n", extra_arg); } @@ -480,3 +480,47 @@ int write_merge_script(const char *filename, params, out_hkl); } } + + +double read_merge_progress(char *logfile_str, enum gui_job_type type) +{ + FILE *fh; + double frac_complete = 0.0; + + fh = fopen(logfile_str, "r"); + if ( fh == NULL ) return 0.0; + + do { + char line[1024]; + + if ( fgets(line, 1024, fh) == NULL ) break; + + if ( type == GUI_JOB_PROCESS_HKL ) { + frac_complete += 1.0/3.0; + } else if ( type == GUI_JOB_PROCESS_HKL_SCALE ) { + frac_complete += 1.0/6.0; + } else { + int cycle, max_cycles; + assert(type == GUI_JOB_PARTIALATOR); + if ( strcmp(line, "Initial partiality calculation...\n") == 0 ) { + frac_complete = 0.1; + } + if ( sscanf(line, "Scaling and refinement cycle %d of %d\n", + &cycle, &max_cycles) == 2 ) + { + frac_complete = 0.1 + 0.8*(double)cycle/max_cycles; + } + if ( strcmp(line, "Final merge...\n") == 0 ) { + frac_complete = 0.9; + } + if ( strncmp(line, "Writing two-way split", 20) == 0 ) { + frac_complete = 1.0; + } + + } + } while ( 1 ); + + fclose(fh); + + return frac_complete; +} diff --git a/src/gui_merge.h b/src/gui_merge.h index 460c5fce..5c684e81 100644 --- a/src/gui_merge.h +++ b/src/gui_merge.h @@ -32,6 +32,7 @@ #include #include "gui_project.h" +#include "crystfel_gui.h" extern gint merge_sig(GtkWidget *widget, struct crystfelproject *proj); @@ -42,4 +43,7 @@ extern int write_merge_script(const char *filename, struct merging_params *params, const char *out_hkl); +extern double read_merge_progress(const char *logfile_str, + enum gui_job_type type); + #endif -- cgit v1.2.3