diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 45 | ||||
-rw-r--r-- | src/im-sandbox.h | 2 | ||||
-rw-r--r-- | src/indexamajig.c | 11 |
3 files changed, 54 insertions, 4 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 0602da26..e2f485b1 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -37,6 +37,12 @@ #include <config.h> #endif +#ifdef HAVE_SCHED_SETAFFINITY +#define _GNU_SOURCE +#include <sys/sysinfo.h> +#include <sched.h> +#endif + #include <stdarg.h> #include <stdlib.h> #include <stdio.h> @@ -85,6 +91,7 @@ struct sandbox time_t *last_response; int last_ping[MAX_NUM_WORKERS]; int profile; /* Whether to do wall-clock time profiling */ + int cpu_pin; /* Streams to read from (NB not the same indices as the above) */ int n_read; @@ -678,6 +685,20 @@ static void try_read(struct sandbox *sb) } +static void pin_to_cpu(int slot) +{ + #ifdef HAVE_SCHED_SETAFFINITY + cpu_set_t c; + + CPU_ZERO(&c); + CPU_SET(slot, &c); + if ( sched_setaffinity(0, sizeof(cpu_set_t), &c) ) { + fprintf(stderr, "Failed to set CPU affinity for %i\n", slot); + } + #endif +} + + static void start_worker_process(struct sandbox *sb, int slot) { pid_t p; @@ -712,6 +733,8 @@ static void start_worker_process(struct sandbox *sb, int slot) size_t ll; int i; + if ( sb->cpu_pin ) pin_to_cpu(slot); + /* First, disconnect the signal handlers */ sa.sa_flags = 0; sigemptyset(&sa.sa_mask); @@ -1139,7 +1162,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, Stream *stream, const char *tmpdir, int serial_start, struct im_zmq_params *zmq_params, struct im_asapo_params *asapo_params, - int timeout, int profile) + int timeout, int profile, int cpu_pin) { int i; struct sandbox *sb; @@ -1148,6 +1171,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int r; int allDone = 0; struct get_pattern_ctx gpctx; + int n_cpus; if ( n_proc > MAX_NUM_WORKERS ) { ERROR("Number of workers (%i) is too large. Using %i\n", @@ -1155,6 +1179,24 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, n_proc = MAX_NUM_WORKERS; } + #ifdef HAVE_SCHED_SETAFFINITY + n_cpus = get_nprocs(); + if ( n_proc > n_cpus ) { + ERROR("WARNING: Number of workers (%i) is larger than the " + "number of available CPUs (%i)\n", n_proc, n_cpus); + if ( cpu_pin ) { + ERROR("Try again with a smaller number of workers (-j) " + "or without --cpu-pin\n"); + return 1; + } + } + #else + if ( cpu_pin ) { + ERROR("Option --cpu-pin not available on this system."); + return 1; + } + #endif + sb = calloc(1, sizeof(struct sandbox)); if ( sb == NULL ) { ERROR("Couldn't allocate memory for sandbox.\n"); @@ -1169,6 +1211,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->tmpdir = tmpdir; sb->profile = profile; sb->timeout = timeout; + sb->cpu_pin = cpu_pin; if ( zmq_params->addr != NULL ) { sb->zmq_params = zmq_params; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index a6adddd5..4d8085ce 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -90,6 +90,6 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *tempdir, int serial_start, struct im_zmq_params *zmq_params, struct im_asapo_params *asapo_params, - int timeout, int profile); + int timeout, int profile, int cpu_pin); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index 2f431636..fa153bf5 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -9,7 +9,7 @@ * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2021 Thomas White <taw@physics.org> + * 2010-2022 Thomas White <taw@physics.org> * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -97,6 +97,7 @@ struct indexamajig_arguments char **copy_headers; int n_copy_headers; char *harvest_file; + int cpu_pin; struct taketwo_options **taketwo_opts_ptr; struct felix_options **felix_opts_ptr; @@ -445,6 +446,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->asapo_params.wait_for_stream = 1; break; + case 222 : + args->cpu_pin = 1; + break; + /* ---------- Peak search ---------- */ case 't' : @@ -879,6 +884,7 @@ int main(int argc, char *argv[]) args.asapo_params.source = NULL; args.asapo_params.stream = NULL; args.asapo_params.wait_for_stream = 0; + args.cpu_pin = 0; args.serial_start = 1; args.if_peaks = 1; args.if_multi = 0; @@ -1000,6 +1006,7 @@ int main(int argc, char *argv[]) {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE, "Wait for ASAP::O stream to appear"}, + {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, @@ -1394,7 +1401,7 @@ int main(int argc, char *argv[]) r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, fh, st, tmpdir, args.serial_start, &args.zmq_params, &args.asapo_params, - timeout, args.profile); + timeout, args.profile, args.cpu_pin); if ( pf8_data != NULL ) free_pf8_private_data(pf8_data); if ( detgeom != NULL) detgeom_free(detgeom); |