I tried in vain little hacks to make it work. I think the data structures will
need to be changed in order to allow a more dynamic behaviour but I lack SimGrid
global vision and I don't know how to implement it correctly. I also fear that
keeping the data of finished jobs in memory would be detrimental. Can you guide
me on how this should be implemented?
I attached an example of what I would like to do within my simulator. My real
simulator contains many MSG processes (which are not run at the same time, have
different lifespans and whose execution cannot be planned). Some processes
dedicated to launch SMPI jobs will be run at random times during my simulation.
#define _XOPEN_SOURCE 700
#define _GNU_SOURCE
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <simgrid/msg.h>
#include <xbt/sysdep.h>
#include <xbt/log.h>
#include <xbt/asserts.h>
#include <smpi/smpi.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(mwe, "mwe");
int smpi_replay(int argc, char *argv[])
{
smpi_replay_run(&argc, &argv);
return 0;
}
typedef struct
{
double nb_seconds_to_sleep;
char job_id_str[8];
int job_size;
char tit_filename[64];
} msg_runner_args;
int msg_job_runner(int argc, char *argv[])
{
(void) argc;
(void) argv;
// Retrieve process arguments
msg_runner_args * args = MSG_process_get_data(MSG_process_self());
XBT_INFO("Runner of job %s has been launched", args->job_id_str);
// Sleep for the required number of seconds
MSG_process_sleep(args->nb_seconds_to_sleep);
// OPen the file describing the job
FILE* fp = fopen(args->tit_filename, "r");
xbt_assert(fp != NULL, "Cannot read file '%s'", args->tit_filename);
// Read the file, which contains other filenames (the time-independent-traces)
char ** filename_by_rank = xbt_new(char *, args->job_size);
ssize_t read;
char * line = NULL;
size_t n = 0;
int lines_read = 0;
while (((read = xbt_getline(&line, &n, fp)) != -1) && lines_read < args->job_size)
{
char * line_break_pos = strchrnul(line, '\n'); // Trimming
*line_break_pos = '\0';
asprintf(&filename_by_rank[lines_read], "%s", line);
lines_read++;
free(line);
line = NULL;
}
free(line);
fclose(fp);
// Static mapping from MPI ranks to SimGrid hosts to avoid more complexity
msg_host_t hosts[4];
hosts[0] = MSG_host_by_name("Jupiter");
hosts[1] = MSG_host_by_name("Fafard");
hosts[2] = MSG_host_by_name("Ginette");
hosts[3] = MSG_host_by_name("Bourassa");
// Run the job
for (int i = 0; i < args->job_size; ++i)
{
char *str_pname = NULL;
asprintf(&str_pname, "%s_%d", args->job_id_str, i);
char *str_rank_id = NULL;
asprintf(&str_rank_id, "%d", i);
char ** sub_argv = xbt_new(char*, 5);
sub_argv[0] = xbt_strdup("1");
sub_argv[1] = xbt_strdup(args->job_id_str);
sub_argv[2] = str_rank_id;
sub_argv[3] = xbt_strdup(filename_by_rank[i]);
sub_argv[4] = xbt_strdup("0");
XBT_INFO("Job %s, rank %d launched", args->job_id_str, i);
MSG_process_create_with_arguments(str_pname, smpi_replay, NULL, hosts[i], 5, sub_argv);
free(str_pname);
}
XBT_INFO("Job %s launched", args->job_id_str);
// How could I wait here for job termination (= all SMPI processes finalized)?
// Cleanup
for (int i = 0; i < args->job_size; ++i)
free(filename_by_rank[i]);
xbt_free(filename_by_rank);
xbt_free(args);
return 0;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
printf("Usage: %s platform_file\n", argv[0]);
printf("example: %s msg_platform.xml\n", argv[0]);
exit(1);
}
const char *platform_file = argv[1];
MSG_init(&argc, argv);
/* Simulation setting */
MSG_create_environment(platform_file);
/* The jobs run in this example are always the same.
Hence, we can guess what SMPI apps should be registered. */
const char * job_id_1 = "1";
const char * job_id_2 = "2";
const char * job_id_3 = "3";
const char * job_id_4 = "4";
SMPI_app_instance_register(job_id_1, smpi_replay, 2);
SMPI_app_instance_register(job_id_2, smpi_replay, 4);
SMPI_app_instance_register(job_id_3, smpi_replay, 2);
SMPI_app_instance_register(job_id_4, smpi_replay, 4);
SMPI_init();
// create args and run processes
msg_runner_args * args1 = xbt_new(msg_runner_args, 1);
msg_runner_args * args2 = xbt_new(msg_runner_args, 1);
msg_runner_args * args3 = xbt_new(msg_runner_args, 1);
msg_runner_args * args4 = xbt_new(msg_runner_args, 1);
args1->nb_seconds_to_sleep = 0;
strcpy(args1->job_id_str, job_id_1);
args1->job_size = 2;
sprintf(args1->tit_filename, "app1/app.tit");
args2->nb_seconds_to_sleep = 2;
strcpy(args2->job_id_str, job_id_2);
args2->job_size = 4;
sprintf(args2->tit_filename, "app2/app.tit");
args3->nb_seconds_to_sleep = 4;
strcpy(args3->job_id_str, job_id_3);
args3->job_size = 2;
sprintf(args3->tit_filename, "app1/app.tit");
args4->nb_seconds_to_sleep = 26;
strcpy(args4->job_id_str, job_id_4);
args4->job_size = 4;
sprintf(args4->tit_filename, "app2/app.tit");
msg_host_t host = MSG_host_by_name("Jupiter");
MSG_process_create("runner_1", msg_job_runner, args1, host);
MSG_process_create("runner_2", msg_job_runner, args2, host);
MSG_process_create("runner_3", msg_job_runner, args3, host);
MSG_process_create("runner_4", msg_job_runner, args4, host);
msg_error_t res = MSG_main();
XBT_INFO("Simulation time %g", MSG_get_clock());
//SMPI_finalize();
if (res == MSG_OK)
return 0;
else
return 1;
}