aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2020-12-03 18:07:19 +0100
committerThomas White <taw@physics.org>2020-12-03 18:07:19 +0100
commitb3892119c0879181fe74b108e1f9e089192cfa21 (patch)
treee16bf767ed3301fb953eb09ebd94a989254bbada /src
parent2d00bfeca27c416b1f14bccf5b1e7c09a9755765 (diff)
SLURM BE: Submit merge job
Diffstat (limited to 'src')
-rw-r--r--src/gui_backend_slurm.c154
1 files changed, 128 insertions, 26 deletions
diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c
index 7c19f426..f6f2e8e2 100644
--- a/src/gui_backend_slurm.c
+++ b/src/gui_backend_slurm.c
@@ -38,6 +38,7 @@
#include "gui_project.h"
#include "gui_index.h"
+#include "gui_merge.h"
#include "crystfel_gui.h"
@@ -56,14 +57,25 @@ struct slurm_merging_opts
char *email_address;
};
+enum slurm_job_type
+{
+ SLURM_JOB_INDEXING,
+ SLURM_JOB_MERGING,
+ SLURM_JOB_AMBIGATOR,
+};
struct slurm_job
{
- double frac_complete;
+ enum slurm_job_type type;
+
+ /* For indexing job (don't worry - will be replaced soon!) */
int n_frames;
int n_blocks;
uint32_t *job_ids;
char **stderr_filenames;
+
+ /* For merging/ambigator job */
+ uint32_t job_id;
};
@@ -135,21 +147,28 @@ static int get_task_status(void *job_priv,
int n_proc = 0;
int all_complete = 1;
- for ( i=0; i<job->n_blocks; i++ ) {
+ if ( job->type == SLURM_JOB_INDEXING ) {
+ for ( i=0; i<job->n_blocks; i++ ) {
- n_proc += read_number_processed(job->stderr_filenames[i]);
+ n_proc += read_number_processed(job->stderr_filenames[i]);
- if ( job->job_ids[i] == 0 ) continue;
+ if ( job->job_ids[i] == 0 ) continue;
- if ( !job_running(job->job_ids[i]) ) {
- job->job_ids[i] = 0;
- } else {
- all_complete = 0;
+ if ( !job_running(job->job_ids[i]) ) {
+ job->job_ids[i] = 0;
+ } else {
+ all_complete = 0;
+ }
}
+
+ *frac_complete = (double)n_proc / job->n_frames;
+ *running = 1 - all_complete;
+
+ } else {
+ *frac_complete = 0.5;
+ *running = 1;
}
- *frac_complete = (double)n_proc / job->n_frames;
- *running = 1 - all_complete;
return 0;
}
@@ -324,13 +343,9 @@ static void write_partial_file_list(GFile *workdir,
}
-static void *run_indexing(const char *job_title,
- const char *job_notes,
- struct crystfelproject *proj,
- void *opts_priv)
+static char *make_workdir(const char *job_title,
+ const char *job_notes)
{
- struct slurm_indexing_opts *opts = opts_priv;
- struct slurm_job *job;
char *workdir;
struct stat s;
GFile *cwd_file;
@@ -338,11 +353,6 @@ static void *run_indexing(const char *job_title,
GFile *workdir_file;
char *notes_path;
FILE *fh;
- char **env;
- int n_env;
- int i;
- int fail = 0;
- char **streams;
workdir = strdup(job_title);
if ( workdir == NULL ) return NULL;
@@ -372,12 +382,35 @@ static void *run_indexing(const char *job_title,
g_object_unref(notes_file);
workdir = g_file_get_path(workdir_file);
+ return workdir;
+}
+
+
+static void *run_indexing(const char *job_title,
+ const char *job_notes,
+ struct crystfelproject *proj,
+ void *opts_priv)
+{
+ struct slurm_indexing_opts *opts = opts_priv;
+ struct slurm_job *job;
+ char **env;
+ int n_env;
+ int i;
+ int fail = 0;
+ char **streams;
+ char *workdir;
+ GFile *workdir_gfile;
+
+ workdir = make_workdir(job_title, job_notes);
+ if ( workdir == NULL ) return NULL;
+ workdir_gfile = g_file_new_for_path(workdir);
env = create_env(&n_env, opts->path_add);
job = malloc(sizeof(struct slurm_job));
if ( job == NULL ) return 0;
+ job->type = SLURM_JOB_INDEXING;
job->n_frames = proj->n_frames;
job->n_blocks = proj->n_frames / opts->block_size;
if ( proj->n_frames % opts->block_size ) job->n_blocks++;
@@ -411,7 +444,7 @@ static void *run_indexing(const char *job_title,
snprintf(stderr_file, 127, "stderr-%i.log", i);
snprintf(stdout_file, 127, "stdout-%i.log", i);
- write_partial_file_list(workdir_file, file_list,
+ write_partial_file_list(workdir_gfile, file_list,
i, opts->block_size,
proj->filenames,
proj->events,
@@ -438,12 +471,12 @@ static void *run_indexing(const char *job_title,
job->job_ids[i] = job_id;
- stderr_gfile = g_file_get_child(workdir_file,
+ stderr_gfile = g_file_get_child(workdir_gfile,
stderr_file);
job->stderr_filenames[i] = g_file_get_path(stderr_gfile);
g_object_unref(stderr_gfile);
- stream_gfile = g_file_get_child(workdir_file,
+ stream_gfile = g_file_get_child(workdir_gfile,
stream_filename);
streams[i] = g_file_get_path(stream_gfile);
g_object_unref(stream_gfile);
@@ -454,7 +487,7 @@ static void *run_indexing(const char *job_title,
for ( i=0; i<n_env; i++ ) free(env[i]);
free(env);
free(workdir);
- g_object_unref(workdir_file);
+ g_object_unref(workdir_gfile);
if ( fail ) {
free(job->job_ids);
@@ -697,7 +730,76 @@ static void *run_merging(const char *job_title,
struct gui_result *input,
void *opts_priv)
{
- return NULL;
+ struct slurm_job *job;
+ job_desc_msg_t job_desc_msg;
+ submit_response_msg_t *resp;
+ char **cmdline;
+ char *cmdline_all;
+ char *script;
+ char **env;
+ int n_env;
+ struct slurm_merging_opts *opts = opts_priv;
+ char *workdir;
+ int r;
+
+ workdir = make_workdir(job_title, job_notes);
+ if ( workdir == NULL ) return NULL;
+
+ job = malloc(sizeof(struct slurm_job));
+ if ( job == NULL ) return NULL;
+
+ cmdline = merging_command_line("`nproc`",
+ input,
+ &proj->merging_params);
+
+ cmdline_all = g_strjoinv(" ", cmdline);
+
+ script = malloc(strlen(cmdline_all)+16);
+ if ( script == NULL ) return 0;
+
+ strcpy(script, "#!/bin/sh\n");
+ strcat(script, cmdline_all);
+ g_free(cmdline_all);
+
+ env = create_env(&n_env, NULL);
+
+ slurm_init_job_desc_msg(&job_desc_msg);
+ job_desc_msg.user_id = getuid();
+ job_desc_msg.group_id = getgid();
+ job_desc_msg.mail_user = safe_strdup(opts->email_address);
+ job_desc_msg.mail_type = MAIL_JOB_FAIL;
+ job_desc_msg.comment = "Submitted via CrystFEL GUI";
+ job_desc_msg.shared = 0;
+ job_desc_msg.time_limit = 60;
+ job_desc_msg.partition = safe_strdup(opts->partition);
+ job_desc_msg.min_nodes = 1;
+ job_desc_msg.max_nodes = 1;
+ job_desc_msg.name = safe_strdup(job_title);
+ job_desc_msg.std_err = strdup("stderr.log");
+ job_desc_msg.std_out = strdup("stdout.log");
+ job_desc_msg.work_dir = strdup(workdir);
+ job_desc_msg.script = script;
+ job_desc_msg.environment = env;
+ job_desc_msg.env_size = n_env;
+
+ r = slurm_submit_batch_job(&job_desc_msg, &resp);
+ if ( r ) {
+ ERROR("Couldn't submit job: %i\n", errno);
+ return 0;
+ }
+
+ free(job_desc_msg.mail_user);
+ free(job_desc_msg.partition);
+ free(job_desc_msg.name);
+ free(job_desc_msg.work_dir);
+ free(job_desc_msg.std_err);
+ free(job_desc_msg.std_out);
+
+ job->job_id = resp->job_id;
+ job->type = SLURM_JOB_MERGING;
+ slurm_free_submit_response_response_msg(resp);
+
+ return job;
}