aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2016-08-16 17:09:09 +0200
committerThomas White <taw@physics.org>2016-08-16 17:09:09 +0200
commitf31e48d223e637f7ef155846f6e4b2cab2a7d4cc (patch)
treed194b5b923da13e4a96831372001d7de0335ce0b
parentb450ebaf97cde7667e807613c9e35a50f1ed81c4 (diff)
indexamajig: Detect and kill hung worker processes
-rw-r--r--src/im-sandbox.c90
1 files changed, 61 insertions, 29 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 2f85978f..51cd5903 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -84,6 +84,9 @@ struct sandbox
char *tmpdir;
+ /* The last time each worker was heard from */
+ time_t *last_response;
+
/* Streams to read from */
int n_read;
FILE **fhs;
@@ -94,6 +97,52 @@ struct sandbox
};
+#ifdef HAVE_CLOCK_GETTIME
+
+static time_t get_monotonic_seconds()
+{
+ struct timespec tp;
+ clock_gettime(CLOCK_MONOTONIC, &tp);
+ return tp.tv_sec;
+}
+
+#else
+
+/* Fallback version of the above. The time according to gettimeofday() is not
+ * monotonic, so measuring intervals based on it will screw up if there's a
+ * timezone change (e.g. daylight savings) while the program is running. */
+static time_t get_monotonic_seconds()
+{
+ struct timeval tp;
+ gettimeofday(&tp, NULL);
+ return tp.tv_sec;
+}
+
+#endif
+
+
+static void stamp_response(struct sandbox *sb, int n)
+{
+ sb->last_response[n] = get_monotonic_seconds();
+}
+
+
+static void check_hung_workers(struct sandbox *sb)
+{
+ int i;
+ time_t tnow = get_monotonic_seconds();
+ for ( i=0; i<sb->n_read; i++ ) {
+ if ( !sb->running[i] ) continue;
+ if ( tnow - sb->last_response[i] > 240 ) {
+ STATUS("Worker %i did not respond for 240 seconds - "
+ "sending it SIGKILL.\n", i);
+ kill(sb->pids[i], SIGKILL);
+ stamp_response(sb, i);
+ }
+ }
+}
+
+
static struct filename_plus_event *get_pattern(FILE *fh, int config_basename,
struct detector *det,
const char *prefix)
@@ -325,30 +374,6 @@ static void run_work(const struct index_args *iargs, Stream *st,
}
-#ifdef HAVE_CLOCK_GETTIME
-
-static time_t get_monotonic_seconds()
-{
- struct timespec tp;
- clock_gettime(CLOCK_MONOTONIC, &tp);
- return tp.tv_sec;
-}
-
-#else
-
-/* Fallback version of the above. The time according to gettimeofday() is not
- * monotonic, so measuring intervals based on it will screw up if there's a
- * timezone change (e.g. daylight savings) while the program is running. */
-static time_t get_monotonic_seconds()
-{
- struct timeval tp;
- gettimeofday(&tp, NULL);
- return tp.tv_sec;
-}
-
-#endif
-
-
static ssize_t lwrite(int fd, const char *a)
{
size_t l = strlen(a);
@@ -491,6 +516,7 @@ static void try_read(struct sandbox *sb, TimeAccounts *taccs)
/* If the chunk cannot be read, assume the connection
* is broken and that the process will die soon. */
time_accounts_set(taccs, TACC_STREAMREAD);
+ stamp_response(sb, i);
if ( pump_chunk(sb->fhs[i], ofd) ) {
remove_pipe(sb, i);
}
@@ -603,6 +629,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
* and the 'read' end of the result pipe. */
sb->pids[slot] = p;
sb->running[slot] = 1;
+ stamp_response(sb, slot);
add_pipe(sb, stream_pipe[0]);
close(stream_pipe[1]);
}
@@ -842,14 +869,13 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
- if ( sb->pids == NULL ) {
+ sb->last_response = calloc(n_proc, sizeof(time_t));
+ if ( (sb->pids == NULL) || (sb->running == NULL)
+ || (sb->last_response == NULL) )
+ {
ERROR("Couldn't allocate memory for PIDs.\n");
return;
}
- if ( sb->running == NULL ) {
- ERROR("Couldn't allocate memory for process flags.\n");
- return;
- }
if ( tempdir == NULL ) {
tempdir = "";
@@ -931,6 +957,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
time_accounts_set(taccs, TACC_SIGNALS);
check_signals(sb, semname_q, 1);
+ /* Check for hung workers */
+ check_hung_workers(sb);
+
/* Top up the queue if necessary */
time_accounts_set(taccs, TACC_QUEUETOPUP);
pthread_mutex_lock(&sb->shared->queue_lock);
@@ -978,6 +1007,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
time_accounts_set(taccs, TACC_SIGNALS);
check_signals(sb, semname_q, 0);
+ check_hung_workers(sb);
+
time_accounts_set(taccs, TACC_WAITPID);
}
/* If this worker died and got waited by the zombie handler,
@@ -995,6 +1026,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
free(sb->fhs);
free(sb->fds);
free(sb->running);
+ free(sb->last_response);
free(sb->pids);
free(sb->tmpdir);