diff options
Diffstat (limited to 'src/gui_backend_slurm.c')
-rw-r--r-- | src/gui_backend_slurm.c | 372 |
1 files changed, 226 insertions, 146 deletions
diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c index e02120a3..ce51e72e 100644 --- a/src/gui_backend_slurm.c +++ b/src/gui_backend_slurm.c @@ -28,7 +28,6 @@ #include <glib.h> #include <gtk/gtk.h> -#include <slurm/slurm.h> #include <gio/gio.h> #include <utils.h> @@ -86,83 +85,165 @@ struct slurm_job }; -static int job_alive(slurm_job_info_t *job) +static const char *get_str_val(const char *line, const char *key) { - switch ( job->job_state & JOB_STATE_BASE ) { + const char *pos = strstr(line, key); + if ( pos == NULL ) return NULL; - /* Only the following states are reasons to keep on watching - * the job */ - case JOB_PENDING : - case JOB_RUNNING : - case JOB_SUSPENDED : - return 1; + const char *eq = strchr(pos, '='); + if ( eq == NULL ) return NULL; - default : - return 0; - } + const char *sp = strchr(pos, ' '); + if ( sp == NULL ) return NULL; + + return strndup(eq+1, sp-eq-1); } -static int job_running(uint32_t job_id) +static int job_alive(const char *s) { - job_info_msg_t *job_info; - int running = 1; + if ( strcmp(s, "PENDING") == 0 ) return 1; + if ( strcmp(s, "RUNNING") == 0 ) return 1; + if ( strcmp(s, "SUSPENDED") == 0 ) return 1; + if ( strcmp(s, "COMPLETING") == 0 ) return 1; + return 0; +} - if ( slurm_load_job(&job_info, job_id, 0) ) { - STATUS("Couldn't get status: %i\n", - slurm_strerror(slurm_get_errno())); - return 0; - } - running = job_alive(&job_info->job_array[0]); - slurm_free_job_info_msg(job_info); - return running; +static char *g_bytes_to_terminated_array(GBytes *bytes) +{ + gpointer arr; + gsize size; + char *buf; + + arr = g_bytes_unref_to_data(bytes, &size); + + buf = malloc(size+1); + if ( buf == NULL ) return NULL; + + memcpy(buf, arr, size); + buf[size] = '\0'; + + g_free(arr); + + return buf; } -static double indexing_progress(struct slurm_job *job, int *running) +static int get_job_status(int job_id, int *running, + int *n_running, int *n_complete) { - job_info_msg_t *array_job_info; - int i; - int n_running; - int lowest_alive_task; + const gchar *args[6]; + GError *error = NULL; + GSubprocess *sp; + char job_id_str[64]; + char *line; + char *nl; + int array_task; + GBytes *stdout_buf; + GBytes *stderr_buf; + char *buf; + char *buf_stderr; + + snprintf(job_id_str, 63, "%i", job_id); + args[0] = "scontrol"; + args[1] = "-o"; + args[2] = "show"; + args[3] = "job"; + args[4] = job_id_str; + args[5] = NULL; + + sp = g_subprocess_newv(args, G_SUBPROCESS_FLAGS_STDOUT_PIPE + | G_SUBPROCESS_FLAGS_STDERR_PIPE, &error); + if ( sp == NULL ) { + ERROR("Failed to invoke scontrol: %s\n", error->message); + g_error_free(error); + return 1; + } - if ( slurm_load_job(&array_job_info, job->job_id, 0) ) { - STATUS("Couldn't get status: %i\n", - slurm_strerror(slurm_get_errno())); + if ( !g_subprocess_communicate(sp, NULL, NULL, + &stdout_buf, &stderr_buf, &error) ) + { + ERROR("Error communicating with scontrol: %s\n", error->message); + g_error_free(error); + return 1; + } + + buf = g_bytes_to_terminated_array(stdout_buf); + buf_stderr = g_bytes_to_terminated_array(stderr_buf); + + if ( buf_stderr[0] != '\0' ) { + ERROR("scontrol error: %s\n", buf_stderr); + /* ... but carry on */ + } + free(buf_stderr); + + if ( strstr(buf, "ArrayTaskId") != NULL ) { + array_task = 1; *running = 0; - return 0.0; + } else { + array_task = 0; } - n_running = 0; - lowest_alive_task = job->n_blocks; - for ( i=0; i<array_job_info->record_count; i++ ) { + *n_running = 0; + *n_complete = 0; + + /* Parse output */ + line = &buf[0]; + nl = strchr(line, '\n'); + while ( nl != NULL ) { + + nl[0] = '\0'; + + if ( array_task ) { + + const char *state = get_str_val(line, "JobState"); + const char *array_task_str = get_str_val(line, "ArrayTaskId"); + + /* Ignore array job 'leader' */ + if ( strchr(array_task_str, '-') == NULL ) { - slurm_job_info_t *job_info = &array_job_info->job_array[i]; + if ( job_alive(state) ) { + (*n_running)++; + *running = 1; + } - /* Find the array_task_id of the lowest task which is still - * running, or which might still run. Exclude the main array - * job, identified by having job_id == array_job_id. */ - if ( job_alive(job_info) ) { - if ( (job_info->array_task_id < lowest_alive_task) - && (job_info->job_id != job_info->array_job_id) ) - { - lowest_alive_task = job_info->array_task_id; + if ( strcmp(state, "COMPLETED") == 0 ) { + (*n_complete)++; + } + + } else { + if ( job_alive(state) ) { + *running = 1; + } } - n_running++; + + } else { + + const char *state = get_str_val(line, "JobState"); + *running = job_alive(state); + } + + line = nl+1; + nl = strchr(line, '\n'); } - slurm_free_job_info_msg(array_job_info); - *running = (n_running > 0); + free(buf); + + return 0; +} + +static double indexing_progress(struct slurm_job *job, int *running, + int n_running, int n_complete) +{ /* If there are lots of blocks, just count running jobs instead of * reading loads of log files */ - if ( (job->n_blocks > 15) - && (lowest_alive_task < job->n_blocks) ) - { + if ( job->n_blocks > 15 ) { - return (double)lowest_alive_task / job->n_blocks; + return 0.1*(double)(n_running+n_complete) / job->n_blocks + + 0.9*(double)n_complete / job->n_blocks; } else { @@ -192,17 +273,23 @@ static int get_task_status(void *job_priv, float *frac_complete) { struct slurm_job *job = job_priv; + int n_running, n_complete; + + if ( get_job_status(job->job_id, running, &n_running, &n_complete) ) { + ERROR("Failed to get task status: %i\n", job->job_id); + return 1; + } switch ( job->type ) { case GUI_JOB_INDEXING : - *frac_complete = indexing_progress(job, running); + *frac_complete = indexing_progress(job, running, + n_running, n_complete); break; case GUI_JOB_AMBIGATOR : *frac_complete = read_ambigator_progress(job->stderr_filename, job->niter); - *running = job_running(job->job_id); break; case GUI_JOB_PROCESS_HKL : @@ -210,7 +297,6 @@ static int get_task_status(void *job_priv, case GUI_JOB_PARTIALATOR : *frac_complete = read_merge_progress(job->stderr_filename, job->type); - *running = job_running(job->job_id); break; } @@ -247,32 +333,6 @@ static void cancel_task(void *job_priv) } -static char **create_env(int *psize) -{ - char **env; - gchar **env_list; - int i, n_env; - - env_list = g_get_environ(); - n_env = 0; - while ( env_list[n_env] != NULL ) n_env++; - - /* Can't mix g_malloc/g_free with normal malloc/free, so we - * must do a deep copy */ - env = malloc(n_env*sizeof(char *)); - if ( env == NULL ) return NULL; - - for ( i=0; i<n_env; i++ ) { - env[i] = strdup(env_list[i]); - } - - g_strfreev(env_list); - - *psize = n_env; - return env; -} - - static void partition_activate_sig(GtkEntry *entry, gpointer data) { struct slurm_common_opts *opts = data; @@ -469,6 +529,14 @@ static void write_common_opts(FILE *fh, } +static int empty(const char *str) +{ + if ( str == NULL ) return 1; + if ( str[0] == '\0' ) return 1; + return 0; +} + + static struct slurm_job *start_slurm_job(enum gui_job_type type, const char *script_filename, const char *jobname, @@ -478,81 +546,93 @@ static struct slurm_job *start_slurm_job(enum gui_job_type type, const char *stderr_filename, struct slurm_common_opts *opts) { - char **env; - int n_env; - char *script; - struct slurm_job *job; - job_desc_msg_t job_desc_msg; - submit_response_msg_t *resp; - int r; - GFile *cwd_gfile; + char time_limit[64]; + const gchar *args[64]; + GError *error = NULL; + GSubprocess *sp; + char buf[256]; + gsize bytes_read; + int n = 0; - script = load_entire_file(script_filename); - if ( script == NULL ) return NULL; + snprintf(time_limit, 63, "%i", opts->time_limit); - job = malloc(sizeof(struct slurm_job)); - if ( job == NULL ) { - free(script); - return NULL; + args[n++] = "sbatch"; + if ( !empty(array_inx) ) { + args[n++] = "--array"; + args[n++] = array_inx; } - - job->type = type; - - cwd_gfile = g_file_new_for_path("."); - - env = create_env(&n_env); - if ( env == NULL ) return NULL; - - slurm_init_job_desc_msg(&job_desc_msg); - job_desc_msg.user_id = getuid(); - job_desc_msg.group_id = getgid(); - job_desc_msg.mail_user = safe_strdup(opts->email_address); - job_desc_msg.mail_type = MAIL_JOB_FAIL; - job_desc_msg.comment = "Submitted via CrystFEL GUI"; - job_desc_msg.shared = 0; - job_desc_msg.time_limit = opts->time_limit; /* minutes */ - job_desc_msg.partition = safe_strdup(opts->partition); - job_desc_msg.min_nodes = 1; - job_desc_msg.max_nodes = 1; - job_desc_msg.name = safe_strdup(jobname); - job_desc_msg.std_err = strdup(stderr_filename); - job_desc_msg.std_out = strdup(stdout_filename); - job_desc_msg.work_dir = g_file_get_path(cwd_gfile); - job_desc_msg.script = script; - job_desc_msg.environment = env; - job_desc_msg.env_size = n_env; - job_desc_msg.features = safe_strdup(opts->constraint); - job_desc_msg.account = safe_strdup(opts->account); - job_desc_msg.array_inx = safe_strdup(array_inx); - - g_object_unref(cwd_gfile); - - r = slurm_submit_batch_job(&job_desc_msg, &resp); - free(job_desc_msg.mail_user); - free(job_desc_msg.partition); - free(job_desc_msg.name); - free(job_desc_msg.work_dir); - free(job_desc_msg.std_err); - free(job_desc_msg.std_out); - free(job_desc_msg.features); - free(job_desc_msg.account); - free(job_desc_msg.script); - if ( r ) { - ERROR("Couldn't submit job: %s\n", - slurm_strerror(slurm_get_errno())); - free(job); + args[n++] = "--job-name"; + args[n++] = jobname; + args[n++] = "--output"; + args[n++] = stdout_filename; + args[n++] = "--error"; + args[n++] = stderr_filename; + args[n++] = "--time"; + args[n++] = time_limit; + if ( !empty(opts->email_address) ) { + args[n++] = "--mail-user"; + args[n++] = opts->email_address; + } + if ( !empty(opts->partition) ) { + args[n++] = "--partition"; + args[n++] = opts->partition; + } + if ( !empty(opts->constraint) ) { + args[n++] = "--constraint"; + args[n++] = opts->constraint; + } + if ( !empty(opts->account) ) { + args[n++] = "--account"; + args[n++] = opts->account; + } + args[n++] = "--nodes=1"; + args[n++] = "--mail-type=FAIL"; + args[n++] = "--comment"; + args[n++] = "Submitted via CrystFEL GUI"; + args[n++] = script_filename; + args[n++] = NULL; + + sp = g_subprocess_newv(args, G_SUBPROCESS_FLAGS_STDOUT_PIPE + | G_SUBPROCESS_FLAGS_STDERR_MERGE, &error); + if ( sp == NULL ) { + ERROR("Failed to invoke sbatch: %s\n", error->message); + g_error_free(error); return NULL; } - STATUS("Submitted SLURM job ID %i\n", resp->job_id); + g_subprocess_wait(sp, NULL, &error); - job->job_id = resp->job_id; - slurm_free_submit_response_response_msg(resp); + bytes_read = g_input_stream_read(g_subprocess_get_stdout_pipe(sp), + buf, 256, NULL, &error); + buf[bytes_read] = '\0'; + chomp(buf); - job->stderr_filename = strdup(stderr_filename); - job->workdir = g_file_dup(workdir); + if ( strncmp(buf, "Submitted batch job ", 20) == 0 ) { - return job; + struct slurm_job *job; + int job_id; + + if ( convert_int(buf+20, &job_id) ) { + ERROR("Didn't get batch job ID from '%s'\n", buf); + return NULL; + } + + job = malloc(sizeof(struct slurm_job)); + if ( job == NULL ) return NULL; + + job->type = type; + job->job_id = job_id; + job->stderr_filename = strdup(stderr_filename); + job->workdir = g_file_dup(workdir); + + STATUS("Submitted batch job ID %i\n", job_id); + + return job; + + } else { + ERROR("Didn't understand sbatch reply: '%s'\n", buf); + return NULL; + } } |