aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c45
-rw-r--r--src/im-sandbox.h2
-rw-r--r--src/indexamajig.c11
3 files changed, 54 insertions, 4 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 0602da26..e2f485b1 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -37,6 +37,12 @@
#include <config.h>
#endif
+#ifdef HAVE_SCHED_SETAFFINITY
+#define _GNU_SOURCE
+#include <sys/sysinfo.h>
+#include <sched.h>
+#endif
+
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
@@ -85,6 +91,7 @@ struct sandbox
time_t *last_response;
int last_ping[MAX_NUM_WORKERS];
int profile; /* Whether to do wall-clock time profiling */
+ int cpu_pin;
/* Streams to read from (NB not the same indices as the above) */
int n_read;
@@ -678,6 +685,20 @@ static void try_read(struct sandbox *sb)
}
+static void pin_to_cpu(int slot)
+{
+ #ifdef HAVE_SCHED_SETAFFINITY
+ cpu_set_t c;
+
+ CPU_ZERO(&c);
+ CPU_SET(slot, &c);
+ if ( sched_setaffinity(0, sizeof(cpu_set_t), &c) ) {
+ fprintf(stderr, "Failed to set CPU affinity for %i\n", slot);
+ }
+ #endif
+}
+
+
static void start_worker_process(struct sandbox *sb, int slot)
{
pid_t p;
@@ -712,6 +733,8 @@ static void start_worker_process(struct sandbox *sb, int slot)
size_t ll;
int i;
+ if ( sb->cpu_pin ) pin_to_cpu(slot);
+
/* First, disconnect the signal handlers */
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
@@ -1139,7 +1162,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
Stream *stream, const char *tmpdir, int serial_start,
struct im_zmq_params *zmq_params,
struct im_asapo_params *asapo_params,
- int timeout, int profile)
+ int timeout, int profile, int cpu_pin)
{
int i;
struct sandbox *sb;
@@ -1148,6 +1171,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int r;
int allDone = 0;
struct get_pattern_ctx gpctx;
+ int n_cpus;
if ( n_proc > MAX_NUM_WORKERS ) {
ERROR("Number of workers (%i) is too large. Using %i\n",
@@ -1155,6 +1179,24 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
n_proc = MAX_NUM_WORKERS;
}
+ #ifdef HAVE_SCHED_SETAFFINITY
+ n_cpus = get_nprocs();
+ if ( n_proc > n_cpus ) {
+ ERROR("WARNING: Number of workers (%i) is larger than the "
+ "number of available CPUs (%i)\n", n_proc, n_cpus);
+ if ( cpu_pin ) {
+ ERROR("Try again with a smaller number of workers (-j) "
+ "or without --cpu-pin\n");
+ return 1;
+ }
+ }
+ #else
+ if ( cpu_pin ) {
+ ERROR("Option --cpu-pin not available on this system.");
+ return 1;
+ }
+ #endif
+
sb = calloc(1, sizeof(struct sandbox));
if ( sb == NULL ) {
ERROR("Couldn't allocate memory for sandbox.\n");
@@ -1169,6 +1211,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->tmpdir = tmpdir;
sb->profile = profile;
sb->timeout = timeout;
+ sb->cpu_pin = cpu_pin;
if ( zmq_params->addr != NULL ) {
sb->zmq_params = zmq_params;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index a6adddd5..4d8085ce 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -90,6 +90,6 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *tempdir, int serial_start,
struct im_zmq_params *zmq_params,
struct im_asapo_params *asapo_params,
- int timeout, int profile);
+ int timeout, int profile, int cpu_pin);
#endif /* IM_SANDBOX_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 2f431636..fa153bf5 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -9,7 +9,7 @@
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2021 Thomas White <taw@physics.org>
+ * 2010-2022 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -97,6 +97,7 @@ struct indexamajig_arguments
char **copy_headers;
int n_copy_headers;
char *harvest_file;
+ int cpu_pin;
struct taketwo_options **taketwo_opts_ptr;
struct felix_options **felix_opts_ptr;
@@ -445,6 +446,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->asapo_params.wait_for_stream = 1;
break;
+ case 222 :
+ args->cpu_pin = 1;
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -879,6 +884,7 @@ int main(int argc, char *argv[])
args.asapo_params.source = NULL;
args.asapo_params.stream = NULL;
args.asapo_params.wait_for_stream = 0;
+ args.cpu_pin = 0;
args.serial_start = 1;
args.if_peaks = 1;
args.if_multi = 0;
@@ -1000,6 +1006,7 @@ int main(int argc, char *argv[])
{"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
{"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE,
"Wait for ASAP::O stream to appear"},
+ {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1394,7 +1401,7 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start,
&args.zmq_params, &args.asapo_params,
- timeout, args.profile);
+ timeout, args.profile, args.cpu_pin);
if ( pf8_data != NULL ) free_pf8_private_data(pf8_data);
if ( detgeom != NULL) detgeom_free(detgeom);