aboutsummaryrefslogtreecommitdiff
path: root/src/im-sandbox.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2016-03-01 02:59:38 -0800
committerThomas White <taw@physics.org>2016-03-01 03:37:24 -0800
commitc780ec9c40325c257fff73c1d330c5f4d81baea9 (patch)
tree2c633bddd6479c59b595037c879ea3227d1bec93 /src/im-sandbox.c
parent0c1a3d2a45878b544c0f30135fc9d1240f88889a (diff)
indexamajig: Wall clock profiling
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r--src/im-sandbox.c41
1 files changed, 35 insertions, 6 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 23d3d0a2..bb8edcac 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2016 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2015 Thomas White <taw@physics.org>
+ * 2010-2016 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2011 Richard Kirian
* 2012 Lorenzo Galli
@@ -63,6 +63,7 @@
#include "im-sandbox.h"
#include "process_image.h"
+#include "time-accounts.h"
struct sandbox
@@ -244,6 +245,7 @@ static void run_work(const struct index_args *iargs, Stream *st,
int cookie, const char *tmpdir, struct sandbox *sb)
{
int allDone = 0;
+ TimeAccounts *taccs = time_accounts_init();
while ( !allDone ) {
@@ -255,6 +257,7 @@ static void run_work(const struct index_args *iargs, Stream *st,
int r;
/* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
if ( sem_wait(sb->queue_sem) != 0 ) {
ERROR("Failed to wait on queue semaphore: %s\n",
strerror(errno));
@@ -306,17 +309,20 @@ static void run_work(const struct index_args *iargs, Stream *st,
}
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb->shared);
+ sb->shared, taccs);
free_filename_plus_event(pargs.filename_p_e);
}
+ time_accounts_set(taccs, TACC_FINALCLEANUP);
cleanup_indexing(iargs->indm, iargs->ipriv);
free_detector_geometry(iargs->det);
free(iargs->hdf5_peak_path);
free_copy_hdf5_field_list(iargs->copyme);
cell_free(iargs->cell);
+ if ( iargs->profile ) time_accounts_print(taccs);
+ time_accounts_free(taccs);
}
@@ -442,7 +448,7 @@ static void remove_pipe(struct sandbox *sb, int d)
}
-static void try_read(struct sandbox *sb)
+static void try_read(struct sandbox *sb, TimeAccounts *taccs)
{
int r, i;
struct timeval tv;
@@ -450,6 +456,8 @@ static void try_read(struct sandbox *sb)
int fdmax;
const int ofd = get_stream_fd(sb->stream);
+ time_accounts_set(taccs, TACC_SELECT);
+
tv.tv_sec = 0;
tv.tv_usec = 500000;
@@ -483,6 +491,7 @@ static void try_read(struct sandbox *sb)
/* If the chunk cannot be read, assume the connection
* is broken and that the process will die soon. */
+ time_accounts_set(taccs, TACC_STREAMREAD);
if ( pump_chunk(sb->fhs[i], ofd) ) {
remove_pipe(sb, i);
}
@@ -789,6 +798,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int r;
int no_more = 0;
int allDone = 0;
+ TimeAccounts *taccs;
if ( n_proc > MAX_NUM_WORKERS ) {
ERROR("Number of workers (%i) is too large. Using %i\n",
@@ -909,17 +919,21 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
+ taccs = time_accounts_init();
+
do {
time_t tNow;
/* Check for stream output from workers */
- try_read(sb);
+ try_read(sb, taccs);
/* Check for interrupt or zombies */
+ time_accounts_set(taccs, TACC_SIGNALS);
check_signals(sb, semname_q, 1);
/* Top up the queue if necessary */
+ time_accounts_set(taccs, TACC_QUEUETOPUP);
pthread_mutex_lock(&sb->shared->queue_lock);
if ( !no_more && (sb->shared->n_events < QUEUE_SIZE/2) ) {
if ( fill_queue(fh, config_basename, iargs->det,
@@ -928,20 +942,25 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
pthread_mutex_unlock(&sb->shared->queue_lock);
/* Update progress */
+ time_accounts_set(taccs, TACC_STATUS);
tNow = get_monotonic_seconds();
if ( tNow > sb->t_last_stats+5 ) try_status(sb, tNow);
/* Have all the events been swallowed? */
+ time_accounts_set(taccs, TACC_ENDCHECK);
pthread_mutex_lock(&sb->shared->queue_lock);
if ( no_more && (sb->shared->n_events == 0) ) allDone = 1;
pthread_mutex_unlock(&sb->shared->queue_lock);
} while ( !allDone );
+ if ( iargs->profile ) time_accounts_print(taccs);
+
fclose(fh);
/* Indicate to the workers that we are finished, and wake them up one
* last time */
+ time_accounts_set(taccs, TACC_WAKEUP);
STATUS("Waiting for the last patterns to be processed...\n");
pthread_mutex_lock(&sb->shared->queue_lock);
sb->shared->no_more = 1;
@@ -951,14 +970,24 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
for ( i=0; i<n_proc; i++ ) {
int status;
+ time_accounts_set(taccs, TACC_WAITPID);
while ( waitpid(sb->pids[i], &status, WNOHANG) == 0 ) {
- try_read(sb);
+
+ time_accounts_set(taccs, TACC_STREAMREAD);
+ try_read(sb, taccs);
+
+ time_accounts_set(taccs, TACC_SIGNALS);
check_signals(sb, semname_q, 0);
+
+ time_accounts_set(taccs, TACC_WAITPID);
}
/* If this worker died and got waited by the zombie handler,
* waitpid() returns -1 and the loop still exits. */
}
+ if ( iargs->profile ) time_accounts_print(taccs);
+ time_accounts_free(taccs);
+
sem_unlink(semname_q);
for ( i=0; i<sb->n_read; i++ ) {