aboutsummaryrefslogtreecommitdiff
path: root/src/gui_backend_slurm.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-03-09 16:31:01 +0100
committerThomas White <taw@physics.org>2022-03-18 16:46:30 +0100
commita4c786d52613063d338ee010f4415e8a6aef0513 (patch)
tree31479897a5099c9872d20dce9d41d133426ea5ab /src/gui_backend_slurm.c
parent66c1f2a616ae5082ed73d4cd2cc339b65f0d6f61 (diff)
GUI: Use sbatch/scancel/scontrol instead of Slurm API
This commit strips out all references to the Slurm API, instead making subprocess calls to sbatch and scontrol. Attempting to use the Slurm API seems to have been a mis-step. First, it seems that nowhere has the Slurm headers pre-installed. Literally none of the facilities where there are known deployments of CrystFEL have them. And in a significant fraction of cases, getting them installed is difficult, slow or impossible. In addition, the API doesn't seem to work in all cases, so we already shell out to 'scancel' to abort jobs - see d76fc3495. There are some tricky implications for submitting Slurm jobs from a container via the API. The Slurm REST API offers a solution, but is very new and not widely available. Calls to the Slurm executables are much easier to 'tunnel' out of a container. This isn't a great solution. It's a net increase of only about 40 lines of source code, but it incurs some unpleasant string handling and will probably be less reliable overall. It completely relies on Slurm's not being internationalised. If Slurm's messages start getting translated, we will be in trouble.
Diffstat (limited to 'src/gui_backend_slurm.c')
-rw-r--r--src/gui_backend_slurm.c372
1 files changed, 226 insertions, 146 deletions
diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c
index e02120a3..ce51e72e 100644
--- a/src/gui_backend_slurm.c
+++ b/src/gui_backend_slurm.c
@@ -28,7 +28,6 @@
#include <glib.h>
#include <gtk/gtk.h>
-#include <slurm/slurm.h>
#include <gio/gio.h>
#include <utils.h>
@@ -86,83 +85,165 @@ struct slurm_job
};
-static int job_alive(slurm_job_info_t *job)
+static const char *get_str_val(const char *line, const char *key)
{
- switch ( job->job_state & JOB_STATE_BASE ) {
+ const char *pos = strstr(line, key);
+ if ( pos == NULL ) return NULL;
- /* Only the following states are reasons to keep on watching
- * the job */
- case JOB_PENDING :
- case JOB_RUNNING :
- case JOB_SUSPENDED :
- return 1;
+ const char *eq = strchr(pos, '=');
+ if ( eq == NULL ) return NULL;
- default :
- return 0;
- }
+ const char *sp = strchr(pos, ' ');
+ if ( sp == NULL ) return NULL;
+
+ return strndup(eq+1, sp-eq-1);
}
-static int job_running(uint32_t job_id)
+static int job_alive(const char *s)
{
- job_info_msg_t *job_info;
- int running = 1;
+ if ( strcmp(s, "PENDING") == 0 ) return 1;
+ if ( strcmp(s, "RUNNING") == 0 ) return 1;
+ if ( strcmp(s, "SUSPENDED") == 0 ) return 1;
+ if ( strcmp(s, "COMPLETING") == 0 ) return 1;
+ return 0;
+}
- if ( slurm_load_job(&job_info, job_id, 0) ) {
- STATUS("Couldn't get status: %i\n",
- slurm_strerror(slurm_get_errno()));
- return 0;
- }
- running = job_alive(&job_info->job_array[0]);
- slurm_free_job_info_msg(job_info);
- return running;
+static char *g_bytes_to_terminated_array(GBytes *bytes)
+{
+ gpointer arr;
+ gsize size;
+ char *buf;
+
+ arr = g_bytes_unref_to_data(bytes, &size);
+
+ buf = malloc(size+1);
+ if ( buf == NULL ) return NULL;
+
+ memcpy(buf, arr, size);
+ buf[size] = '\0';
+
+ g_free(arr);
+
+ return buf;
}
-static double indexing_progress(struct slurm_job *job, int *running)
+static int get_job_status(int job_id, int *running,
+ int *n_running, int *n_complete)
{
- job_info_msg_t *array_job_info;
- int i;
- int n_running;
- int lowest_alive_task;
+ const gchar *args[6];
+ GError *error = NULL;
+ GSubprocess *sp;
+ char job_id_str[64];
+ char *line;
+ char *nl;
+ int array_task;
+ GBytes *stdout_buf;
+ GBytes *stderr_buf;
+ char *buf;
+ char *buf_stderr;
+
+ snprintf(job_id_str, 63, "%i", job_id);
+ args[0] = "scontrol";
+ args[1] = "-o";
+ args[2] = "show";
+ args[3] = "job";
+ args[4] = job_id_str;
+ args[5] = NULL;
+
+ sp = g_subprocess_newv(args, G_SUBPROCESS_FLAGS_STDOUT_PIPE
+ | G_SUBPROCESS_FLAGS_STDERR_PIPE, &error);
+ if ( sp == NULL ) {
+ ERROR("Failed to invoke scontrol: %s\n", error->message);
+ g_error_free(error);
+ return 1;
+ }
- if ( slurm_load_job(&array_job_info, job->job_id, 0) ) {
- STATUS("Couldn't get status: %i\n",
- slurm_strerror(slurm_get_errno()));
+ if ( !g_subprocess_communicate(sp, NULL, NULL,
+ &stdout_buf, &stderr_buf, &error) )
+ {
+ ERROR("Error communicating with scontrol: %s\n", error->message);
+ g_error_free(error);
+ return 1;
+ }
+
+ buf = g_bytes_to_terminated_array(stdout_buf);
+ buf_stderr = g_bytes_to_terminated_array(stderr_buf);
+
+ if ( buf_stderr[0] != '\0' ) {
+ ERROR("scontrol error: %s\n", buf_stderr);
+ /* ... but carry on */
+ }
+ free(buf_stderr);
+
+ if ( strstr(buf, "ArrayTaskId") != NULL ) {
+ array_task = 1;
*running = 0;
- return 0.0;
+ } else {
+ array_task = 0;
}
- n_running = 0;
- lowest_alive_task = job->n_blocks;
- for ( i=0; i<array_job_info->record_count; i++ ) {
+ *n_running = 0;
+ *n_complete = 0;
+
+ /* Parse output */
+ line = &buf[0];
+ nl = strchr(line, '\n');
+ while ( nl != NULL ) {
+
+ nl[0] = '\0';
+
+ if ( array_task ) {
+
+ const char *state = get_str_val(line, "JobState");
+ const char *array_task_str = get_str_val(line, "ArrayTaskId");
+
+ /* Ignore array job 'leader' */
+ if ( strchr(array_task_str, '-') == NULL ) {
- slurm_job_info_t *job_info = &array_job_info->job_array[i];
+ if ( job_alive(state) ) {
+ (*n_running)++;
+ *running = 1;
+ }
- /* Find the array_task_id of the lowest task which is still
- * running, or which might still run. Exclude the main array
- * job, identified by having job_id == array_job_id. */
- if ( job_alive(job_info) ) {
- if ( (job_info->array_task_id < lowest_alive_task)
- && (job_info->job_id != job_info->array_job_id) )
- {
- lowest_alive_task = job_info->array_task_id;
+ if ( strcmp(state, "COMPLETED") == 0 ) {
+ (*n_complete)++;
+ }
+
+ } else {
+ if ( job_alive(state) ) {
+ *running = 1;
+ }
}
- n_running++;
+
+ } else {
+
+ const char *state = get_str_val(line, "JobState");
+ *running = job_alive(state);
+
}
+
+ line = nl+1;
+ nl = strchr(line, '\n');
}
- slurm_free_job_info_msg(array_job_info);
- *running = (n_running > 0);
+ free(buf);
+
+ return 0;
+}
+
+static double indexing_progress(struct slurm_job *job, int *running,
+ int n_running, int n_complete)
+{
/* If there are lots of blocks, just count running jobs instead of
* reading loads of log files */
- if ( (job->n_blocks > 15)
- && (lowest_alive_task < job->n_blocks) )
- {
+ if ( job->n_blocks > 15 ) {
- return (double)lowest_alive_task / job->n_blocks;
+ return 0.1*(double)(n_running+n_complete) / job->n_blocks
+ + 0.9*(double)n_complete / job->n_blocks;
} else {
@@ -192,17 +273,23 @@ static int get_task_status(void *job_priv,
float *frac_complete)
{
struct slurm_job *job = job_priv;
+ int n_running, n_complete;
+
+ if ( get_job_status(job->job_id, running, &n_running, &n_complete) ) {
+ ERROR("Failed to get task status: %i\n", job->job_id);
+ return 1;
+ }
switch ( job->type ) {
case GUI_JOB_INDEXING :
- *frac_complete = indexing_progress(job, running);
+ *frac_complete = indexing_progress(job, running,
+ n_running, n_complete);
break;
case GUI_JOB_AMBIGATOR :
*frac_complete = read_ambigator_progress(job->stderr_filename,
job->niter);
- *running = job_running(job->job_id);
break;
case GUI_JOB_PROCESS_HKL :
@@ -210,7 +297,6 @@ static int get_task_status(void *job_priv,
case GUI_JOB_PARTIALATOR :
*frac_complete = read_merge_progress(job->stderr_filename,
job->type);
- *running = job_running(job->job_id);
break;
}
@@ -247,32 +333,6 @@ static void cancel_task(void *job_priv)
}
-static char **create_env(int *psize)
-{
- char **env;
- gchar **env_list;
- int i, n_env;
-
- env_list = g_get_environ();
- n_env = 0;
- while ( env_list[n_env] != NULL ) n_env++;
-
- /* Can't mix g_malloc/g_free with normal malloc/free, so we
- * must do a deep copy */
- env = malloc(n_env*sizeof(char *));
- if ( env == NULL ) return NULL;
-
- for ( i=0; i<n_env; i++ ) {
- env[i] = strdup(env_list[i]);
- }
-
- g_strfreev(env_list);
-
- *psize = n_env;
- return env;
-}
-
-
static void partition_activate_sig(GtkEntry *entry, gpointer data)
{
struct slurm_common_opts *opts = data;
@@ -469,6 +529,14 @@ static void write_common_opts(FILE *fh,
}
+static int empty(const char *str)
+{
+ if ( str == NULL ) return 1;
+ if ( str[0] == '\0' ) return 1;
+ return 0;
+}
+
+
static struct slurm_job *start_slurm_job(enum gui_job_type type,
const char *script_filename,
const char *jobname,
@@ -478,81 +546,93 @@ static struct slurm_job *start_slurm_job(enum gui_job_type type,
const char *stderr_filename,
struct slurm_common_opts *opts)
{
- char **env;
- int n_env;
- char *script;
- struct slurm_job *job;
- job_desc_msg_t job_desc_msg;
- submit_response_msg_t *resp;
- int r;
- GFile *cwd_gfile;
+ char time_limit[64];
+ const gchar *args[64];
+ GError *error = NULL;
+ GSubprocess *sp;
+ char buf[256];
+ gsize bytes_read;
+ int n = 0;
- script = load_entire_file(script_filename);
- if ( script == NULL ) return NULL;
+ snprintf(time_limit, 63, "%i", opts->time_limit);
- job = malloc(sizeof(struct slurm_job));
- if ( job == NULL ) {
- free(script);
- return NULL;
+ args[n++] = "sbatch";
+ if ( !empty(array_inx) ) {
+ args[n++] = "--array";
+ args[n++] = array_inx;
}
-
- job->type = type;
-
- cwd_gfile = g_file_new_for_path(".");
-
- env = create_env(&n_env);
- if ( env == NULL ) return NULL;
-
- slurm_init_job_desc_msg(&job_desc_msg);
- job_desc_msg.user_id = getuid();
- job_desc_msg.group_id = getgid();
- job_desc_msg.mail_user = safe_strdup(opts->email_address);
- job_desc_msg.mail_type = MAIL_JOB_FAIL;
- job_desc_msg.comment = "Submitted via CrystFEL GUI";
- job_desc_msg.shared = 0;
- job_desc_msg.time_limit = opts->time_limit; /* minutes */
- job_desc_msg.partition = safe_strdup(opts->partition);
- job_desc_msg.min_nodes = 1;
- job_desc_msg.max_nodes = 1;
- job_desc_msg.name = safe_strdup(jobname);
- job_desc_msg.std_err = strdup(stderr_filename);
- job_desc_msg.std_out = strdup(stdout_filename);
- job_desc_msg.work_dir = g_file_get_path(cwd_gfile);
- job_desc_msg.script = script;
- job_desc_msg.environment = env;
- job_desc_msg.env_size = n_env;
- job_desc_msg.features = safe_strdup(opts->constraint);
- job_desc_msg.account = safe_strdup(opts->account);
- job_desc_msg.array_inx = safe_strdup(array_inx);
-
- g_object_unref(cwd_gfile);
-
- r = slurm_submit_batch_job(&job_desc_msg, &resp);
- free(job_desc_msg.mail_user);
- free(job_desc_msg.partition);
- free(job_desc_msg.name);
- free(job_desc_msg.work_dir);
- free(job_desc_msg.std_err);
- free(job_desc_msg.std_out);
- free(job_desc_msg.features);
- free(job_desc_msg.account);
- free(job_desc_msg.script);
- if ( r ) {
- ERROR("Couldn't submit job: %s\n",
- slurm_strerror(slurm_get_errno()));
- free(job);
+ args[n++] = "--job-name";
+ args[n++] = jobname;
+ args[n++] = "--output";
+ args[n++] = stdout_filename;
+ args[n++] = "--error";
+ args[n++] = stderr_filename;
+ args[n++] = "--time";
+ args[n++] = time_limit;
+ if ( !empty(opts->email_address) ) {
+ args[n++] = "--mail-user";
+ args[n++] = opts->email_address;
+ }
+ if ( !empty(opts->partition) ) {
+ args[n++] = "--partition";
+ args[n++] = opts->partition;
+ }
+ if ( !empty(opts->constraint) ) {
+ args[n++] = "--constraint";
+ args[n++] = opts->constraint;
+ }
+ if ( !empty(opts->account) ) {
+ args[n++] = "--account";
+ args[n++] = opts->account;
+ }
+ args[n++] = "--nodes=1";
+ args[n++] = "--mail-type=FAIL";
+ args[n++] = "--comment";
+ args[n++] = "Submitted via CrystFEL GUI";
+ args[n++] = script_filename;
+ args[n++] = NULL;
+
+ sp = g_subprocess_newv(args, G_SUBPROCESS_FLAGS_STDOUT_PIPE
+ | G_SUBPROCESS_FLAGS_STDERR_MERGE, &error);
+ if ( sp == NULL ) {
+ ERROR("Failed to invoke sbatch: %s\n", error->message);
+ g_error_free(error);
return NULL;
}
- STATUS("Submitted SLURM job ID %i\n", resp->job_id);
+ g_subprocess_wait(sp, NULL, &error);
- job->job_id = resp->job_id;
- slurm_free_submit_response_response_msg(resp);
+ bytes_read = g_input_stream_read(g_subprocess_get_stdout_pipe(sp),
+ buf, 256, NULL, &error);
+ buf[bytes_read] = '\0';
+ chomp(buf);
- job->stderr_filename = strdup(stderr_filename);
- job->workdir = g_file_dup(workdir);
+ if ( strncmp(buf, "Submitted batch job ", 20) == 0 ) {
- return job;
+ struct slurm_job *job;
+ int job_id;
+
+ if ( convert_int(buf+20, &job_id) ) {
+ ERROR("Didn't get batch job ID from '%s'\n", buf);
+ return NULL;
+ }
+
+ job = malloc(sizeof(struct slurm_job));
+ if ( job == NULL ) return NULL;
+
+ job->type = type;
+ job->job_id = job_id;
+ job->stderr_filename = strdup(stderr_filename);
+ job->workdir = g_file_dup(workdir);
+
+ STATUS("Submitted batch job ID %i\n", job_id);
+
+ return job;
+
+ } else {
+ ERROR("Didn't understand sbatch reply: '%s'\n", buf);
+ return NULL;
+ }
}