aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gui_backend_slurm.c372
-rw-r--r--src/gui_project.c16
2 files changed, 231 insertions, 157 deletions
diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c
index e02120a3..ce51e72e 100644
--- a/src/gui_backend_slurm.c
+++ b/src/gui_backend_slurm.c
@@ -28,7 +28,6 @@
#include <glib.h>
#include <gtk/gtk.h>
-#include <slurm/slurm.h>
#include <gio/gio.h>
#include <utils.h>
@@ -86,83 +85,165 @@ struct slurm_job
};
-static int job_alive(slurm_job_info_t *job)
+static const char *get_str_val(const char *line, const char *key)
{
- switch ( job->job_state & JOB_STATE_BASE ) {
+ const char *pos = strstr(line, key);
+ if ( pos == NULL ) return NULL;
- /* Only the following states are reasons to keep on watching
- * the job */
- case JOB_PENDING :
- case JOB_RUNNING :
- case JOB_SUSPENDED :
- return 1;
+ const char *eq = strchr(pos, '=');
+ if ( eq == NULL ) return NULL;
- default :
- return 0;
- }
+ const char *sp = strchr(pos, ' ');
+ if ( sp == NULL ) return NULL;
+
+ return strndup(eq+1, sp-eq-1);
}
-static int job_running(uint32_t job_id)
+static int job_alive(const char *s)
{
- job_info_msg_t *job_info;
- int running = 1;
+ if ( strcmp(s, "PENDING") == 0 ) return 1;
+ if ( strcmp(s, "RUNNING") == 0 ) return 1;
+ if ( strcmp(s, "SUSPENDED") == 0 ) return 1;
+ if ( strcmp(s, "COMPLETING") == 0 ) return 1;
+ return 0;
+}
- if ( slurm_load_job(&job_info, job_id, 0) ) {
- STATUS("Couldn't get status: %i\n",
- slurm_strerror(slurm_get_errno()));
- return 0;
- }
- running = job_alive(&job_info->job_array[0]);
- slurm_free_job_info_msg(job_info);
- return running;
+static char *g_bytes_to_terminated_array(GBytes *bytes)
+{
+ gpointer arr;
+ gsize size;
+ char *buf;
+
+ arr = g_bytes_unref_to_data(bytes, &size);
+
+ buf = malloc(size+1);
+ if ( buf == NULL ) return NULL;
+
+ memcpy(buf, arr, size);
+ buf[size] = '\0';
+
+ g_free(arr);
+
+ return buf;
}
-static double indexing_progress(struct slurm_job *job, int *running)
+static int get_job_status(int job_id, int *running,
+ int *n_running, int *n_complete)
{
- job_info_msg_t *array_job_info;
- int i;
- int n_running;
- int lowest_alive_task;
+ const gchar *args[6];
+ GError *error = NULL;
+ GSubprocess *sp;
+ char job_id_str[64];
+ char *line;
+ char *nl;
+ int array_task;
+ GBytes *stdout_buf;
+ GBytes *stderr_buf;
+ char *buf;
+ char *buf_stderr;
+
+ snprintf(job_id_str, 63, "%i", job_id);
+ args[0] = "scontrol";
+ args[1] = "-o";
+ args[2] = "show";
+ args[3] = "job";
+ args[4] = job_id_str;
+ args[5] = NULL;
+
+ sp = g_subprocess_newv(args, G_SUBPROCESS_FLAGS_STDOUT_PIPE
+ | G_SUBPROCESS_FLAGS_STDERR_PIPE, &error);
+ if ( sp == NULL ) {
+ ERROR("Failed to invoke scontrol: %s\n", error->message);
+ g_error_free(error);
+ return 1;
+ }
- if ( slurm_load_job(&array_job_info, job->job_id, 0) ) {
- STATUS("Couldn't get status: %i\n",
- slurm_strerror(slurm_get_errno()));
+ if ( !g_subprocess_communicate(sp, NULL, NULL,
+ &stdout_buf, &stderr_buf, &error) )
+ {
+ ERROR("Error communicating with scontrol: %s\n", error->message);
+ g_error_free(error);
+ return 1;
+ }
+
+ buf = g_bytes_to_terminated_array(stdout_buf);
+ buf_stderr = g_bytes_to_terminated_array(stderr_buf);
+
+ if ( buf_stderr[0] != '\0' ) {
+ ERROR("scontrol error: %s\n", buf_stderr);
+ /* ... but carry on */
+ }
+ free(buf_stderr);
+
+ if ( strstr(buf, "ArrayTaskId") != NULL ) {
+ array_task = 1;
*running = 0;
- return 0.0;
+ } else {
+ array_task = 0;
}
- n_running = 0;
- lowest_alive_task = job->n_blocks;
- for ( i=0; i<array_job_info->record_count; i++ ) {
+ *n_running = 0;
+ *n_complete = 0;
+
+ /* Parse output */
+ line = &buf[0];
+ nl = strchr(line, '\n');
+ while ( nl != NULL ) {
+
+ nl[0] = '\0';
+
+ if ( array_task ) {
+
+ const char *state = get_str_val(line, "JobState");
+ const char *array_task_str = get_str_val(line, "ArrayTaskId");
+
+ /* Ignore array job 'leader' */
+ if ( strchr(array_task_str, '-') == NULL ) {
- slurm_job_info_t *job_info = &array_job_info->job_array[i];
+ if ( job_alive(state) ) {
+ (*n_running)++;
+ *running = 1;
+ }
- /* Find the array_task_id of the lowest task which is still
- * running, or which might still run. Exclude the main array
- * job, identified by having job_id == array_job_id. */
- if ( job_alive(job_info) ) {
- if ( (job_info->array_task_id < lowest_alive_task)
- && (job_info->job_id != job_info->array_job_id) )
- {
- lowest_alive_task = job_info->array_task_id;
+ if ( strcmp(state, "COMPLETED") == 0 ) {
+ (*n_complete)++;
+ }
+
+ } else {
+ if ( job_alive(state) ) {
+ *running = 1;
+ }
}
- n_running++;
+
+ } else {
+
+ const char *state = get_str_val(line, "JobState");
+ *running = job_alive(state);
+
}
+
+ line = nl+1;
+ nl = strchr(line, '\n');
}
- slurm_free_job_info_msg(array_job_info);
- *running = (n_running > 0);
+ free(buf);
+
+ return 0;
+}
+
+static double indexing_progress(struct slurm_job *job, int *running,
+ int n_running, int n_complete)
+{
/* If there are lots of blocks, just count running jobs instead of
* reading loads of log files */
- if ( (job->n_blocks > 15)
- && (lowest_alive_task < job->n_blocks) )
- {
+ if ( job->n_blocks > 15 ) {
- return (double)lowest_alive_task / job->n_blocks;
+ return 0.1*(double)(n_running+n_complete) / job->n_blocks
+ + 0.9*(double)n_complete / job->n_blocks;
} else {
@@ -192,17 +273,23 @@ static int get_task_status(void *job_priv,
float *frac_complete)
{
struct slurm_job *job = job_priv;
+ int n_running, n_complete;
+
+ if ( get_job_status(job->job_id, running, &n_running, &n_complete) ) {
+ ERROR("Failed to get task status: %i\n", job->job_id);
+ return 1;
+ }
switch ( job->type ) {
case GUI_JOB_INDEXING :
- *frac_complete = indexing_progress(job, running);
+ *frac_complete = indexing_progress(job, running,
+ n_running, n_complete);
break;
case GUI_JOB_AMBIGATOR :
*frac_complete = read_ambigator_progress(job->stderr_filename,
job->niter);
- *running = job_running(job->job_id);
break;
case GUI_JOB_PROCESS_HKL :
@@ -210,7 +297,6 @@ static int get_task_status(void *job_priv,
case GUI_JOB_PARTIALATOR :
*frac_complete = read_merge_progress(job->stderr_filename,
job->type);
- *running = job_running(job->job_id);
break;
}
@@ -247,32 +333,6 @@ static void cancel_task(void *job_priv)
}
-static char **create_env(int *psize)
-{
- char **env;
- gchar **env_list;
- int i, n_env;
-
- env_list = g_get_environ();
- n_env = 0;
- while ( env_list[n_env] != NULL ) n_env++;
-
- /* Can't mix g_malloc/g_free with normal malloc/free, so we
- * must do a deep copy */
- env = malloc(n_env*sizeof(char *));
- if ( env == NULL ) return NULL;
-
- for ( i=0; i<n_env; i++ ) {
- env[i] = strdup(env_list[i]);
- }
-
- g_strfreev(env_list);
-
- *psize = n_env;
- return env;
-}
-
-
static void partition_activate_sig(GtkEntry *entry, gpointer data)
{
struct slurm_common_opts *opts = data;
@@ -469,6 +529,14 @@ static void write_common_opts(FILE *fh,
}
+static int empty(const char *str)
+{
+ if ( str == NULL ) return 1;
+ if ( str[0] == '\0' ) return 1;
+ return 0;
+}
+
+
static struct slurm_job *start_slurm_job(enum gui_job_type type,
const char *script_filename,
const char *jobname,
@@ -478,81 +546,93 @@ static struct slurm_job *start_slurm_job(enum gui_job_type type,
const char *stderr_filename,
struct slurm_common_opts *opts)
{
- char **env;
- int n_env;
- char *script;
- struct slurm_job *job;
- job_desc_msg_t job_desc_msg;
- submit_response_msg_t *resp;
- int r;
- GFile *cwd_gfile;
+ char time_limit[64];
+ const gchar *args[64];
+ GError *error = NULL;
+ GSubprocess *sp;
+ char buf[256];
+ gsize bytes_read;
+ int n = 0;
- script = load_entire_file(script_filename);
- if ( script == NULL ) return NULL;
+ snprintf(time_limit, 63, "%i", opts->time_limit);
- job = malloc(sizeof(struct slurm_job));
- if ( job == NULL ) {
- free(script);
- return NULL;
+ args[n++] = "sbatch";
+ if ( !empty(array_inx) ) {
+ args[n++] = "--array";
+ args[n++] = array_inx;
}
-
- job->type = type;
-
- cwd_gfile = g_file_new_for_path(".");
-
- env = create_env(&n_env);
- if ( env == NULL ) return NULL;
-
- slurm_init_job_desc_msg(&job_desc_msg);
- job_desc_msg.user_id = getuid();
- job_desc_msg.group_id = getgid();
- job_desc_msg.mail_user = safe_strdup(opts->email_address);
- job_desc_msg.mail_type = MAIL_JOB_FAIL;
- job_desc_msg.comment = "Submitted via CrystFEL GUI";
- job_desc_msg.shared = 0;
- job_desc_msg.time_limit = opts->time_limit; /* minutes */
- job_desc_msg.partition = safe_strdup(opts->partition);
- job_desc_msg.min_nodes = 1;
- job_desc_msg.max_nodes = 1;
- job_desc_msg.name = safe_strdup(jobname);
- job_desc_msg.std_err = strdup(stderr_filename);
- job_desc_msg.std_out = strdup(stdout_filename);
- job_desc_msg.work_dir = g_file_get_path(cwd_gfile);
- job_desc_msg.script = script;
- job_desc_msg.environment = env;
- job_desc_msg.env_size = n_env;
- job_desc_msg.features = safe_strdup(opts->constraint);
- job_desc_msg.account = safe_strdup(opts->account);
- job_desc_msg.array_inx = safe_strdup(array_inx);
-
- g_object_unref(cwd_gfile);
-
- r = slurm_submit_batch_job(&job_desc_msg, &resp);
- free(job_desc_msg.mail_user);
- free(job_desc_msg.partition);
- free(job_desc_msg.name);
- free(job_desc_msg.work_dir);
- free(job_desc_msg.std_err);
- free(job_desc_msg.std_out);
- free(job_desc_msg.features);
- free(job_desc_msg.account);
- free(job_desc_msg.script);
- if ( r ) {
- ERROR("Couldn't submit job: %s\n",
- slurm_strerror(slurm_get_errno()));
- free(job);
+ args[n++] = "--job-name";
+ args[n++] = jobname;
+ args[n++] = "--output";
+ args[n++] = stdout_filename;
+ args[n++] = "--error";
+ args[n++] = stderr_filename;
+ args[n++] = "--time";
+ args[n++] = time_limit;
+ if ( !empty(opts->email_address) ) {
+ args[n++] = "--mail-user";
+ args[n++] = opts->email_address;
+ }
+ if ( !empty(opts->partition) ) {
+ args[n++] = "--partition";
+ args[n++] = opts->partition;
+ }
+ if ( !empty(opts->constraint) ) {
+ args[n++] = "--constraint";
+ args[n++] = opts->constraint;
+ }
+ if ( !empty(opts->account) ) {
+ args[n++] = "--account";
+ args[n++] = opts->account;
+ }
+ args[n++] = "--nodes=1";
+ args[n++] = "--mail-type=FAIL";
+ args[n++] = "--comment";
+ args[n++] = "Submitted via CrystFEL GUI";
+ args[n++] = script_filename;
+ args[n++] = NULL;
+
+ sp = g_subprocess_newv(args, G_SUBPROCESS_FLAGS_STDOUT_PIPE
+ | G_SUBPROCESS_FLAGS_STDERR_MERGE, &error);
+ if ( sp == NULL ) {
+ ERROR("Failed to invoke sbatch: %s\n", error->message);
+ g_error_free(error);
return NULL;
}
- STATUS("Submitted SLURM job ID %i\n", resp->job_id);
+ g_subprocess_wait(sp, NULL, &error);
- job->job_id = resp->job_id;
- slurm_free_submit_response_response_msg(resp);
+ bytes_read = g_input_stream_read(g_subprocess_get_stdout_pipe(sp),
+ buf, 256, NULL, &error);
+ buf[bytes_read] = '\0';
+ chomp(buf);
- job->stderr_filename = strdup(stderr_filename);
- job->workdir = g_file_dup(workdir);
+ if ( strncmp(buf, "Submitted batch job ", 20) == 0 ) {
- return job;
+ struct slurm_job *job;
+ int job_id;
+
+ if ( convert_int(buf+20, &job_id) ) {
+ ERROR("Didn't get batch job ID from '%s'\n", buf);
+ return NULL;
+ }
+
+ job = malloc(sizeof(struct slurm_job));
+ if ( job == NULL ) return NULL;
+
+ job->type = type;
+ job->job_id = job_id;
+ job->stderr_filename = strdup(stderr_filename);
+ job->workdir = g_file_dup(workdir);
+
+ STATUS("Submitted batch job ID %i\n", job_id);
+
+ return job;
+
+ } else {
+ ERROR("Didn't understand sbatch reply: '%s'\n", buf);
+ return NULL;
+ }
}
diff --git a/src/gui_project.c b/src/gui_project.c
index 59f9852a..7ab4e367 100644
--- a/src/gui_project.c
+++ b/src/gui_project.c
@@ -880,6 +880,7 @@ static void read_frames(FILE *fh, struct crystfelproject *proj)
}
+/* NB caller is responsible for applying default_project() to proj */
int load_project(struct crystfelproject *proj)
{
FILE *fh;
@@ -887,12 +888,6 @@ int load_project(struct crystfelproject *proj)
fh = fopen("crystfel.project", "r");
if ( fh == NULL ) return 1;
- if ( default_project(proj) ) {
- ERROR("Failed to make default project when loading.\n");
- fclose(fh);
- return 1;
- }
-
read_parameters(fh, proj);
read_results(fh, proj);
read_frames(fh, proj);
@@ -1226,12 +1221,11 @@ int default_project(struct crystfelproject *proj)
return 1;
}
- #ifdef HAVE_SLURM
- if ( make_slurm_backend(&proj->backends[proj->n_backends++]) ) {
- ERROR("SLURM backend setup failed\n");
- return 1;
+ if ( make_slurm_backend(&proj->backends[proj->n_backends]) == 0 ) {
+ proj->n_backends++;
+ } else {
+ STATUS("Slurm unavailable\n");
}
- #endif
/* Default parameter values */
proj->show_centre = 1;