aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-02-22 15:56:40 +0100
committerThomas White <taw@physics.org>2021-02-22 15:56:40 +0100
commitf7cef79ae1c8db2770f067735698850ef88da04e (patch)
treed4891f091ffb1051a917559f9d365d6e97023f0e
parent32ee8110102b2c939c3fcc966a41587d1bb9d316 (diff)
Read job progress from written log files, even for local BE
This simplifies the backends somewhat, and makes them look more similar - e.g. there is now only one routine to find out how far along a merging job is. It has the added bonus of adding a log file for local jobs, which we would've had to add soon anyway.
-rw-r--r--src/crystfel_gui.h9
-rw-r--r--src/gui_ambi.c41
-rw-r--r--src/gui_ambi.h2
-rw-r--r--src/gui_backend_local.c204
-rw-r--r--src/gui_backend_slurm.c93
-rw-r--r--src/gui_index.c26
-rw-r--r--src/gui_index.h2
-rw-r--r--src/gui_merge.c48
-rw-r--r--src/gui_merge.h4
9 files changed, 226 insertions, 203 deletions
diff --git a/src/crystfel_gui.h b/src/crystfel_gui.h
index fd80c9d1..61d39f60 100644
--- a/src/crystfel_gui.h
+++ b/src/crystfel_gui.h
@@ -38,6 +38,15 @@ struct gui_job_notes_page
};
+enum gui_job_type
+{
+ GUI_JOB_INDEXING,
+ GUI_JOB_PROCESS_HKL,
+ GUI_JOB_PROCESS_HKL_SCALE,
+ GUI_JOB_PARTIALATOR,
+ GUI_JOB_AMBIGATOR,
+};
+
extern void add_running_task(struct crystfelproject *proj,
const char *task_desc,
struct crystfel_backend *backend,
diff --git a/src/gui_ambi.c b/src/gui_ambi.c
index 0535dd99..8d07f436 100644
--- a/src/gui_ambi.c
+++ b/src/gui_ambi.c
@@ -509,8 +509,47 @@ int write_ambigator_script(const char *filename,
}
fprintf(fh, " --iterations=%i", params->niter);
- fprintf(fh, " --fg-graph=fg.dat\n");
+ fprintf(fh, " --fg-graph=fg.dat");
+ fprintf(fh, " >stdout.log 2>stderr.log\n");
fclose(fh);
return 0;
}
+
+
+double read_ambigator_progress(char *logfile_str, int niter)
+{
+ FILE *fh;
+ double iter_inc;
+ double frac_complete = 0.0;
+
+ iter_inc = 0.8/niter;
+
+ fh = fopen(logfile_str, "r");
+ if ( fh == NULL ) return 0.0;
+
+ do {
+ char line[1024];
+ int junk;
+
+ if ( fgets(line, 1024, fh) == NULL ) break;
+
+ if ( strncmp(line, "Mean number of correlations per crystal:", 40) == 0 ) {
+ frac_complete = 0.1;
+ }
+ if ( strncmp(line, "Mean f,g =", 10) == 0 ) {
+ frac_complete += iter_inc;
+ }
+ if ( sscanf(line, "%d assignments are different from "
+ "their starting values\n", &junk) == 1 )
+ {
+ frac_complete = 1.0;
+ }
+
+ } while ( 1 );
+
+ fclose(fh);
+
+ printf("got %f\n", frac_complete);
+ return frac_complete;
+}
diff --git a/src/gui_ambi.h b/src/gui_ambi.h
index f577e3c1..c6b68b35 100644
--- a/src/gui_ambi.h
+++ b/src/gui_ambi.h
@@ -42,4 +42,6 @@ extern int write_ambigator_script(const char *filename,
struct ambi_params *params,
const char *out_stream);
+extern double read_ambigator_progress(char *logfile_str, int niter);
+
#endif
diff --git a/src/gui_backend_local.c b/src/gui_backend_local.c
index 26363152..3cb24e27 100644
--- a/src/gui_backend_local.c
+++ b/src/gui_backend_local.c
@@ -62,20 +62,19 @@ struct local_ambi_opts
struct local_job
{
- double frac_complete;
+ enum gui_job_type type;
+
int n_frames;
+ int niter;
/* When both these are true, free the job resources */
int running;
int cancelled;
- int process_hkl; /* vs partialator */
- int scale;
+ char *stderr_filename;
- guint io_watch;
GPid pid;
guint child_watch_source;
- guint io_readable_source;
GFile *workdir;
};
@@ -89,45 +88,6 @@ static void watch_subprocess(GPid pid, gint status, gpointer vp)
}
-static gboolean index_readable(GIOChannel *source, GIOCondition cond,
- void *vp)
-{
- GIOStatus r;
- GError *err = NULL;
- struct local_job *job = vp;
- gchar *line;
-
- r = g_io_channel_read_line(source, &line, NULL, NULL, &err);
- if ( r == G_IO_STATUS_EOF ) {
- STATUS("End of output.\n");
- return FALSE;
- }
- if ( r != G_IO_STATUS_NORMAL ) {
- if ( job->pid != 0 ) {
- STATUS("Read error?\n");
- } else {
- STATUS("End of output (indexamajig exited)\n");
- }
- return FALSE;
- }
- chomp(line);
-
- if ( strncmp(line, "Final: ", 7) == 0 ) {
- job->frac_complete = 1.0;
- } else if ( strstr(line, " images processed, ") != NULL ) {
- int n_proc;
- sscanf(line, "%i ", &n_proc);
- job->frac_complete = (double)n_proc/job->n_frames;
- } else {
- STATUS("%s\n", line);
- }
-
- g_free(line);
-
- return TRUE;
-}
-
-
static int write_file_list(GFile *workdir,
const char *listname,
char **filenames,
@@ -175,16 +135,15 @@ static struct local_job *start_local_job(char **args,
const char *job_title,
GFile *workdir_file,
struct crystfelproject *proj,
- GIOFunc readable_func,
- int phkl, int scale)
+ enum gui_job_type type)
{
- GIOChannel *ioch;
int i;
int r;
int ch_stderr;
GError *error;
struct local_job *job;
char *workdir_str;
+ GFile *stderr_gfile;
workdir_str = g_file_get_path(workdir_file);
if ( workdir_str == NULL ) return NULL;
@@ -192,10 +151,8 @@ static struct local_job *start_local_job(char **args,
job = malloc(sizeof(struct local_job));
if ( job == NULL ) return NULL;
- job->frac_complete = 0.0;
job->workdir = workdir_file;
- job->process_hkl = phkl;
- job->scale = scale;
+ job->type = type;
STATUS("Running program: ");
i = 0;
@@ -219,16 +176,14 @@ static struct local_job *start_local_job(char **args,
}
job->running = 1;
+ stderr_gfile = g_file_get_child(workdir_file, "stderr.log");
+ job->stderr_filename = g_file_get_path(stderr_gfile);
+ g_object_unref(stderr_gfile);
+
job->child_watch_source = g_child_watch_add(job->pid,
watch_subprocess,
job);
- ioch = g_io_channel_unix_new(ch_stderr);
- job->io_readable_source = g_io_add_watch(ioch,
- G_IO_IN | G_IO_ERR | G_IO_HUP,
- readable_func,
- job);
-
return job;
}
@@ -237,9 +192,32 @@ static int get_task_status(void *job_priv,
int *running,
float *frac_complete)
{
+ int n_proc;
struct local_job *job = job_priv;
- *frac_complete = job->frac_complete;
+
*running = job->running;
+
+ switch ( job->type ) {
+
+ case GUI_JOB_INDEXING :
+ n_proc = read_number_processed(job->stderr_filename);
+ *frac_complete = (double)n_proc / job->n_frames;
+ break;
+
+ case GUI_JOB_AMBIGATOR :
+ *frac_complete = read_ambigator_progress(job->stderr_filename,
+ job->niter);
+ break;
+
+ case GUI_JOB_PROCESS_HKL :
+ case GUI_JOB_PROCESS_HKL_SCALE :
+ case GUI_JOB_PARTIALATOR :
+ *frac_complete = read_merge_progress(job->stderr_filename,
+ job->type);
+ break;
+
+ }
+
return 0;
}
@@ -387,91 +365,6 @@ static GtkWidget *make_merging_parameters_widget(void *opts_priv)
}
-static gboolean merge_readable(GIOChannel *source, GIOCondition cond,
- void *vp)
-{
- GIOStatus r;
- GError *err = NULL;
- struct local_job *job = vp;
- gchar *line;
-
- r = g_io_channel_read_line(source, &line, NULL, NULL, &err);
- if ( r == G_IO_STATUS_EOF ) {
- STATUS("End of output.\n");
- if ( job->frac_complete > 0.91 ) {
- job->frac_complete = 1.0;
- }
- return FALSE;
- }
- if ( r != G_IO_STATUS_NORMAL ) {
- if ( job->pid != 0 ) {
- STATUS("Read error?\n");
- } else {
- STATUS("End of output (merge exited)\n");
- }
- return FALSE;
- }
-
- if ( job->process_hkl ) {
- if ( job->scale ) {
- job->frac_complete += 1.0/6.0;
- } else {
- job->frac_complete += 1.0/3.0;
- }
- } else {
- int cycle, max_cycles;
- if ( strcmp(line, "Initial partiality calculation...\n") == 0 ) {
- job->frac_complete = 0.1;
- }
- if ( sscanf(line, "Scaling and refinement cycle %d of %d\n",
- &cycle, &max_cycles) == 2 )
- {
- job->frac_complete = 0.1 + 0.8*(double)cycle/max_cycles;
- }
- if ( strcmp(line, "Final merge...\n") == 0 ) {
- job->frac_complete = 0.9;
- }
- if ( strncmp(line, "Writing two-way split", 20) == 0 ) {
- job->frac_complete = 0.95;
- }
- }
-
- g_free(line);
-
- return TRUE;
-}
-
-
-static gboolean ambi_readable(GIOChannel *source, GIOCondition cond,
- void *vp)
-{
- GIOStatus r;
- GError *err = NULL;
- struct local_job *job = vp;
- gchar *line;
-
- r = g_io_channel_read_line(source, &line, NULL, NULL, &err);
- if ( r == G_IO_STATUS_EOF ) {
- STATUS("End of output.\n");
- return FALSE;
- }
- if ( r != G_IO_STATUS_NORMAL ) {
- if ( job->pid != 0 ) {
- STATUS("Read error?\n");
- } else {
- STATUS("End of output (merge exited)\n");
- }
- return FALSE;
- }
-
- /* FIXME: Calculate the fraction complete */
- job->frac_complete = 0.5;
-
- g_free(line);
- return TRUE;
-}
-
-
static void *run_ambi(const char *job_title,
const char *job_notes,
struct crystfelproject *proj,
@@ -507,7 +400,8 @@ static void *run_ambi(const char *job_title,
args[1] = sc_filename;
args[2] = NULL;
job = start_local_job(args, job_title, workdir,
- proj, ambi_readable, 0, 0);
+ proj, GUI_JOB_AMBIGATOR);
+ job->niter = proj->ambi_params.niter;
} else {
job = NULL;
}
@@ -551,20 +445,20 @@ static void *run_merging(const char *job_title,
&proj->merging_params, "crystfel.hkl") )
{
char *args[3];
- int process_hkl, scale;
+ enum gui_job_type type;
args[0] = "sh";
args[1] = sc_filename;
args[2] = NULL;
if ( strcmp(proj->merging_params.model, "process_hkl") == 0 ) {
- process_hkl = 1;
- scale = proj->merging_params.scale;
+ if ( proj->merging_params.scale ) {
+ type = GUI_JOB_PROCESS_HKL_SCALE;
+ } else {
+ type = GUI_JOB_PROCESS_HKL;
+ }
} else {
- process_hkl = 0;
- scale = 1;
+ type = GUI_JOB_PARTIALATOR;
}
- job = start_local_job(args, job_title, workdir,
- proj, merge_readable,
- process_hkl, scale);
+ job = start_local_job(args, job_title, workdir, proj, type);
} else {
job = NULL;
}
@@ -609,6 +503,7 @@ static void *run_indexing(const char *job_title,
char **streams;
int i;
GFile *workdir;
+ GFile *stderr_gfile;
workdir = make_job_folder(job_title, job_notes);
if ( workdir == NULL ) return NULL;
@@ -636,13 +531,16 @@ static void *run_indexing(const char *job_title,
}
STATUS("\n");
- job = start_local_job(args, job_title, workdir,
- proj, index_readable, 0, 0);
+ job = start_local_job(args, job_title, workdir, proj, GUI_JOB_INDEXING);
if ( job == NULL ) return NULL;
/* Indexing-specific job data */
job->n_frames = proj->n_frames;
+ stderr_gfile = g_file_get_child(workdir, "stderr.log");
+ job->stderr_filename = g_file_get_path(stderr_gfile);
+ g_object_unref(stderr_gfile);
+
streams = malloc(sizeof(char *));
if ( streams != NULL ) {
GFile *stream_gfile = g_file_get_child(job->workdir,
diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c
index 69990a93..44eb1768 100644
--- a/src/gui_backend_slurm.c
+++ b/src/gui_backend_slurm.c
@@ -61,16 +61,10 @@ struct slurm_ambi_opts
char *email_address;
};
-enum slurm_job_type
-{
- SLURM_JOB_INDEXING,
- SLURM_JOB_MERGING,
- SLURM_JOB_AMBIGATOR,
-};
-
struct slurm_job
{
- enum slurm_job_type type;
+ enum gui_job_type type;
+ GFile *workdir;
/* For indexing job (don't worry - will be replaced soon!) */
int n_frames;
@@ -80,35 +74,11 @@ struct slurm_job
/* For merging/ambigator job */
uint32_t job_id;
+ char *stderr_filename;
+ int niter;
};
-static int read_number_processed(const char *filename)
-{
- FILE *fh = fopen(filename, "r");
- int n_proc;
-
- /* Normal situation if SLURM job hasn't started yet */
- if ( fh == NULL ) return 0;
-
- do {
- char line[1024];
- if ( fgets(line, 1024, fh) == NULL ) break;
-
- if ( strncmp(line, "Final: ", 7) == 0 ) {
- sscanf(line, "Final: %i images processed", &n_proc);
- } else if ( strstr(line, " images processed, ") != NULL ) {
- sscanf(line, "%i ", &n_proc);
- }
-
- } while ( 1 );
-
- fclose(fh);
-
- return n_proc;
-}
-
-
static int job_running(uint32_t job_id)
{
job_info_msg_t *job_info;
@@ -151,7 +121,9 @@ static int get_task_status(void *job_priv,
int n_proc = 0;
int all_complete = 1;
- if ( job->type == SLURM_JOB_INDEXING ) {
+ switch ( job->type ) {
+
+ case GUI_JOB_INDEXING :
for ( i=0; i<job->n_blocks; i++ ) {
n_proc += read_number_processed(job->stderr_filenames[i]);
@@ -167,10 +139,22 @@ static int get_task_status(void *job_priv,
*frac_complete = (double)n_proc / job->n_frames;
*running = 1 - all_complete;
+ break;
+
+ case GUI_JOB_AMBIGATOR :
+ *frac_complete = read_ambigator_progress(job->stderr_filename,
+ job->niter);
+ *running = job_running(job->job_id);
+ break;
+
+ case GUI_JOB_PROCESS_HKL :
+ case GUI_JOB_PROCESS_HKL_SCALE :
+ case GUI_JOB_PARTIALATOR :
+ *frac_complete = read_merge_progress(job->stderr_filename,
+ job->type);
+ *running = job_running(job->job_id);
+ break;
- } else {
- *frac_complete = 0.5;
- *running = 1;
}
return 0;
@@ -181,7 +165,7 @@ static void cancel_task(void *job_priv)
{
int i;
struct slurm_job *job = job_priv;
- if ( job->type == SLURM_JOB_INDEXING ) {
+ if ( job->type == GUI_JOB_INDEXING ) {
for ( i=0; i<job->n_blocks; i++) {
if ( job->job_ids[i] == 0 ) continue;
STATUS("Stopping SLURM job %i\n", job->job_ids[i]);
@@ -322,10 +306,10 @@ static uint32_t submit_batch_job(const char *geom_filename,
/* For submitting a single script to the SLURM cluster.
* Used for merging and ambigator, but not for indexing - that needs something
* more sophisticated. */
-static struct slurm_job *start_slurm_job(enum slurm_job_type type,
+static struct slurm_job *start_slurm_job(enum gui_job_type type,
const char *script_filename,
const char *jobname,
- const char *workdir,
+ GFile *workdir,
const char *partition,
const char *email_address)
{
@@ -335,6 +319,7 @@ static struct slurm_job *start_slurm_job(enum slurm_job_type type,
struct slurm_job *job;
job_desc_msg_t job_desc_msg;
submit_response_msg_t *resp;
+ GFile *stderr_gfile;
int r;
script = load_entire_file(script_filename);
@@ -361,7 +346,7 @@ static struct slurm_job *start_slurm_job(enum slurm_job_type type,
job_desc_msg.name = safe_strdup(jobname);
job_desc_msg.std_err = strdup("stderr.log");
job_desc_msg.std_out = strdup("stdout.log");
- job_desc_msg.work_dir = strdup(workdir);
+ job_desc_msg.work_dir = g_file_get_path(workdir);
job_desc_msg.script = script;
job_desc_msg.environment = env;
job_desc_msg.env_size = n_env;
@@ -381,6 +366,10 @@ static struct slurm_job *start_slurm_job(enum slurm_job_type type,
STATUS("Submitted SLURM job ID %i\n", resp->job_id);
+ stderr_gfile = g_file_get_child(workdir, "stderr.log");
+ job->stderr_filename = g_file_get_path(stderr_gfile);
+ g_object_unref(stderr_gfile);
+
job->job_id = resp->job_id;
slurm_free_submit_response_response_msg(resp);
@@ -445,7 +434,7 @@ static void *run_indexing(const char *job_title,
job = malloc(sizeof(struct slurm_job));
if ( job == NULL ) return 0;
- job->type = SLURM_JOB_INDEXING;
+ job->type = GUI_JOB_INDEXING;
job->n_frames = proj->n_frames;
job->n_blocks = proj->n_frames / opts->block_size;
if ( proj->n_frames % opts->block_size ) job->n_blocks++;
@@ -788,9 +777,10 @@ static void *run_ambi(const char *job_title,
&proj->ambi_params, stream_str) )
{
char *workdir_str = g_file_get_path(workdir);
- job = start_slurm_job(SLURM_JOB_AMBIGATOR,
- sc_filename, job_title, workdir_str,
+ job = start_slurm_job(GUI_JOB_AMBIGATOR,
+ sc_filename, job_title, workdir,
opts->partition, opts->email_address);
+ job->niter = proj->ambi_params.niter;
g_free(workdir_str);
} else {
job = NULL;
@@ -834,8 +824,17 @@ static void *run_merging(const char *job_title,
&proj->merging_params, "crystfel.hkl") )
{
char *workdir_str = g_file_get_path(workdir);
- job = start_slurm_job(SLURM_JOB_MERGING,
- sc_filename, job_title, workdir_str,
+ enum gui_job_type type;
+ if ( strcmp(proj->merging_params.model, "process_hkl") == 0 ) {
+ if ( proj->merging_params.scale ) {
+ type = GUI_JOB_PROCESS_HKL_SCALE;
+ } else {
+ type = GUI_JOB_PROCESS_HKL;
+ }
+ } else {
+ type = GUI_JOB_PARTIALATOR;
+ }
+ job = start_slurm_job(type, sc_filename, job_title, workdir,
opts->partition, opts->email_address);
g_free(workdir_str);
} else {
diff --git a/src/gui_index.c b/src/gui_index.c
index 68c89ca2..50b360f0 100644
--- a/src/gui_index.c
+++ b/src/gui_index.c
@@ -771,3 +771,29 @@ char **indexamajig_command_line(const char *geom_filename,
args[n_args] = NULL;
return args;
}
+
+
+int read_number_processed(const char *filename)
+{
+ FILE *fh = fopen(filename, "r");
+ int n_proc;
+
+ /* Normal situation if SLURM job hasn't started yet */
+ if ( fh == NULL ) return 0;
+
+ do {
+ char line[1024];
+ if ( fgets(line, 1024, fh) == NULL ) break;
+
+ if ( strncmp(line, "Final: ", 7) == 0 ) {
+ sscanf(line, "Final: %i images processed", &n_proc);
+ } else if ( strstr(line, " images processed, ") != NULL ) {
+ sscanf(line, "%i ", &n_proc);
+ }
+
+ } while ( 1 );
+
+ fclose(fh);
+
+ return n_proc;
+}
diff --git a/src/gui_index.h b/src/gui_index.h
index 3cd59c9a..7b2c6e19 100644
--- a/src/gui_index.h
+++ b/src/gui_index.h
@@ -48,4 +48,6 @@ extern char **indexamajig_command_line(const char *geom_filename,
struct peak_params *peak_search_params,
struct index_params *indexing_params);
+extern int read_number_processed(const char *filename);
+
#endif
diff --git a/src/gui_merge.c b/src/gui_merge.c
index 762c7e95..cb00b330 100644
--- a/src/gui_merge.c
+++ b/src/gui_merge.c
@@ -402,7 +402,7 @@ static int write_partialator_script(const char *filename,
fprintf(fh, " --iterations=%i", params->niter);
- fprintf(fh, "\n");
+ fprintf(fh, " >stdout.log 2>stderr.log\n");
fclose(fh);
return 0;
@@ -437,7 +437,7 @@ static void add_process_hkl(FILE *fh,
fprintf(fh, " --max-adu=%f", params->max_adu);
fprintf(fh, " --min-res=%f", params->min_res);
fprintf(fh, " --push-res=%f", params->push_res);
- fprintf(fh, " %s\n", extra_arg);
+ fprintf(fh, " %s >>stdout.log 2>>stderr.log\n", extra_arg);
}
@@ -480,3 +480,47 @@ int write_merge_script(const char *filename,
params, out_hkl);
}
}
+
+
+double read_merge_progress(char *logfile_str, enum gui_job_type type)
+{
+ FILE *fh;
+ double frac_complete = 0.0;
+
+ fh = fopen(logfile_str, "r");
+ if ( fh == NULL ) return 0.0;
+
+ do {
+ char line[1024];
+
+ if ( fgets(line, 1024, fh) == NULL ) break;
+
+ if ( type == GUI_JOB_PROCESS_HKL ) {
+ frac_complete += 1.0/3.0;
+ } else if ( type == GUI_JOB_PROCESS_HKL_SCALE ) {
+ frac_complete += 1.0/6.0;
+ } else {
+ int cycle, max_cycles;
+ assert(type == GUI_JOB_PARTIALATOR);
+ if ( strcmp(line, "Initial partiality calculation...\n") == 0 ) {
+ frac_complete = 0.1;
+ }
+ if ( sscanf(line, "Scaling and refinement cycle %d of %d\n",
+ &cycle, &max_cycles) == 2 )
+ {
+ frac_complete = 0.1 + 0.8*(double)cycle/max_cycles;
+ }
+ if ( strcmp(line, "Final merge...\n") == 0 ) {
+ frac_complete = 0.9;
+ }
+ if ( strncmp(line, "Writing two-way split", 20) == 0 ) {
+ frac_complete = 1.0;
+ }
+
+ }
+ } while ( 1 );
+
+ fclose(fh);
+
+ return frac_complete;
+}
diff --git a/src/gui_merge.h b/src/gui_merge.h
index 460c5fce..5c684e81 100644
--- a/src/gui_merge.h
+++ b/src/gui_merge.h
@@ -32,6 +32,7 @@
#include <gtk/gtk.h>
#include "gui_project.h"
+#include "crystfel_gui.h"
extern gint merge_sig(GtkWidget *widget,
struct crystfelproject *proj);
@@ -42,4 +43,7 @@ extern int write_merge_script(const char *filename,
struct merging_params *params,
const char *out_hkl);
+extern double read_merge_progress(const char *logfile_str,
+ enum gui_job_type type);
+
#endif