Unix Parallel Process Tree Scheduler

Suppose we have a tree showing process dependency (that is, tree nodes represent processes to be run). A process represented by a child node should not be started before that represented by the parent  node has finished, but the processes represented by sibling nodes can be run in parallel.

This can be implemented in C by using system calls (wait, exec and fork).

But writing such code is a laborious process, as the if then else structure corresponds to the tree structure and quickly gets complex.

The repository linked below implements a .NET front end that takes the tree as the input and outputs the C code that can implement the process scheduler that respects the dependencies given in the process tree.

https://github.com/gvsh/unix_parallel_process_tree

The following is the sample code outputted by the utility that implements such a process tree. In this example, there are 11 processes at level 1 (names 01**) that can be run in parallel, 2 in the next level (names 02**) and 1 in the last level (0301).

The header file used by the C code is also listed below.

#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <stdarg.h>
#include "ourhdr.h"
char *pname = NULL; /* caller can set this from argv[0] */
void pr_exit(int);
static void err_doit(int, const char *, va_list);
int main(void)
{
char *lev0101 = "1CDMFile.sas";
char *lev0102 = "1LoanFiles.sas";
char *lev0103 = "1COGDMFile.sas";
char *lev0104 = "1IDSExtraction.sas";
char *lev0105 = "1ModelCoreExtraction.sas";
char *lev0106 = "1NicksVariablesPart1.sas";
char *lev0107 = "1NicksVariablesPart2.sas";
char *lev0108 = "1NicksVariablesPart3.sas";
char *lev0109 = "1NicksVariablesPart4.sas";
char *lev0110 = "1NicksVariablesPart5.sas";
char *lev0111 = "1NicksVariablesPart6.sas";
char *lev0201 = "2MergeSubprimeLORandBusDays.sas";
char *lev0202 = "2NicksVariablesLastStep.sas";
char *lev0301 = "3PayInfoIDSModelCoreHist12AcctR.sas";
/*
char *lev0105 = "OneA2.sas";
char *lev0106 = "OneA2.sas";
char *lev0107 = "OneA2.sas";
*/
char *cmd0101[] = { "sas", lev0101, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0102[] = { "sas", lev0102, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0103[] = { "sas", lev0103, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0104[] = { "sas", lev0104, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0105[] = { "sas", lev0105, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0106[] = { "sas", lev0106, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0107[] = { "sas", lev0107, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0108[] = { "sas", lev0108, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0109[] = { "sas", lev0109, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0110[] = { "sas", lev0110, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0111[] = { "sas", lev0111, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0201[] = { "sas", lev0201, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0202[] = { "sas", lev0202, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
char *cmd0301[] = { "sas", lev0301, "-fullstimer", "-mprint", "-memsize", "2G", "-realmemsize", "2G", "-sortsize", "2G", "-threads", (char *)0 };
pid_t pid0101, pid0102, pid0103, pid0104, pid0105, pid0106, pid0107, pid0108, pid0109, pid0110, pid0111, pid0201, pid0202, pid0301;
int sts0101, sts0102, sts0103, sts0104, sts0105, sts0106, sts0107, sts0108, sts0109, sts0110, sts0111, sts0201, sts0202, sts0301, Level1Err, Level2Err, Level3Err;
Level1Err = 0;
Level2Err = 0;
Level3Err = 0;
if ( (pid0101 = fork()) < 0)
err_sys("fork error");
else if (pid0101 == 0) { /* child */
if (execvp ("sas", cmd0101) < 0)
err_sys("execvp error\n");
}
else if (pid0101 > 0) {
if ( (pid0102 = fork()) < 0)
err_sys("fork error");
else if (pid0102 == 0) { /* child */
if (execvp ("sas", cmd0102) < 0)
err_sys("execvp error\n");
}
else if (pid0102 > 0) {
if ( (pid0103 = fork()) < 0)
err_sys("fork error");
else if (pid0103 == 0) { /* child */
if (execvp ("sas", cmd0103) < 0)
err_sys("execvp error\n");
}
else if (pid0103 > 0) {
if ( (pid0104 = fork()) < 0)
err_sys("fork error");
else if (pid0104 == 0) { /* child */
if (execvp ("sas", cmd0104) < 0)
err_sys("execvp error\n");
}
else if (pid0104 > 0) {
if ( (pid0105 = fork()) < 0)
err_sys("fork error");
else if (pid0105 == 0) { /* child */
if (execvp ("sas", cmd0105) < 0)
err_sys("execvp error\n");
}
else if (pid0105 > 0) {
if ( (pid0106 = fork()) < 0)
err_sys("fork error");
else if (pid0106 == 0) { /* child */
if (execvp ("sas", cmd0106) < 0)
err_sys("execvp error\n");
}
else if (pid0106 > 0) {
if ( (pid0107 = fork()) < 0)
err_sys("fork error");
else if (pid0107 == 0) { /* child */
if (execvp ("sas", cmd0107) < 0)
err_sys("execvp error\n");
}
else if (pid0107 > 0) {
if ( (pid0108 = fork()) < 0)
err_sys("fork error");
else if (pid0108 == 0) { /* child */
if (execvp ("sas", cmd0108) < 0)
err_sys("execvp error\n");
}
else if (pid0108 > 0) {
if ( (pid0109 = fork()) < 0)
err_sys("fork error");
else if (pid0109 == 0) { /* child */
if (execvp ("sas", cmd0109) < 0)
err_sys("execvp error\n");
}
else if (pid0109 > 0) {
if ( (pid0110 = fork()) < 0)
err_sys("fork error");
else if (pid0110 == 0) { /* child */
if (execvp ("sas", cmd0110) < 0)
err_sys("execvp error\n");
}
else if (pid0110 > 0) {
if ( (pid0111 = fork()) < 0)
err_sys("fork error");
else if (pid0111 == 0) { /* child */
if (execvp ("sas", cmd0111) < 0)
err_sys("execvp error\n");
}
else if (pid0111 > 0) {
if (waitpid(pid0101, &sts0101, 0) != pid0101) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0102, &sts0102, 0) != pid0102) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0103, &sts0103, 0) != pid0103) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0104, &sts0104, 0) != pid0104) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0105, &sts0105, 0) != pid0105) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0106, &sts0106, 0) != pid0106) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0107, &sts0107, 0) != pid0107) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0108, &sts0108, 0) != pid0108) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0109, &sts0109, 0) != pid0109) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0110, &sts0110, 0) != pid0110) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0111, &sts0111, 0) != pid0111) {
err_sys("waitpid error\n");
}
else {
if (sts0101 > 4) {
pr_exit(sts0101);
printf("Error in Job %s\n", lev0101);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0101);
}
if (sts0102 > 4) {
pr_exit(sts0102);
printf("Error in Job %s\n", lev0102);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0102);
}
if (sts0103 > 4) {
pr_exit(sts0103);
printf("Error in Job %s\n", lev0103);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0103);
}
if (sts0104 > 4) {
pr_exit(sts0104);
printf("Error in Job %s\n", lev0104);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0104);
}
if (sts0105 > 4) {
pr_exit(sts0105);
printf("Error in Job %s\n", lev0105);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0105);
}
if (sts0106 > 4) {
pr_exit(sts0106);
printf("Error in Job %s\n", lev0106);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0106);
}
if (sts0107 > 4) {
pr_exit(sts0107);
printf("Error in Job %s\n", lev0107);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0107);
}
if (sts0108 > 4) {
pr_exit(sts0108);
printf("Error in Job %s\n", lev0108);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0108);
}
if (sts0109 > 4) {
pr_exit(sts0109);
printf("Error in Job %s\n", lev0109);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0109);
}
if (sts0110 > 4) {
pr_exit(sts0110);
printf("Error in Job %s\n", lev0110);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0110);
}
if (sts0111 > 4) {
pr_exit(sts0111);
printf("Error in Job %s\n", lev0111);
Level1Err = 1;
} else {
printf("%s completed successfully \n", lev0111);
}
if (Level1Err == 0) {
printf("Level 1 jobs completed successfully\n");
} else {
err_sys("Level 1 could not be completed\n");
}
if ( (pid0201 = fork()) < 0)
err_sys("fork error");
else if (pid0201 == 0) { /* child */
if (execvp ("sas", cmd0201) < 0)
err_sys("execvp error\n");
}
else if (pid0201 > 0) {
if ( (pid0202 = fork()) < 0)
err_sys("fork error");
else if (pid0202 == 0) { /* child */
if (execvp ("sas", cmd0202) < 0)
err_sys("execvp error\n");
}
else if (pid0202 > 0) {
if (waitpid(pid0201, &sts0201, 0) != pid0201) {
err_sys("waitpid error\n");
}
else if (waitpid(pid0202, &sts0202, 0) != pid0202) {
err_sys("waitpid error\n");
} else {
if (sts0201 > 4) {
pr_exit(sts0201);
printf("Error in Job %s\n", lev0201);
Level2Err = 1;
} else {
printf("%s completed successfully \n", lev0201);
}
if (sts0202 > 4) {
pr_exit(sts0202);
printf("Error in Job %s\n", lev0202);
Level2Err = 1;
} else {
printf("%s completed successfully \n", lev0202);
}
if (Level2Err == 0) {
printf("Level 2 jobs completed successfully\n");
} else {
err_sys("Level 2 could not be completed\n");
}
if ( (pid0301 = fork()) < 0)
err_sys("fork error");
else if (pid0301 == 0) { /* child */
if (execvp ("sas", cmd0301) < 0)
err_sys("execvp error\n");
}
else if (pid0301 > 0) {
if (waitpid(pid0301, &sts0301, 0) != pid0301) {
err_sys("waitpid error\n");
} else {
if (sts0301 > 4) {
pr_exit(sts0301);
printf("Error in Job %s\n", lev0301);
Level3Err = 1;
} else {
printf("%s completed successfully \n", lev0301);
}
if (Level3Err == 0) {
printf("Level 3 jobs completed successfully\n");
} else {
err_sys("Level 3 could not be completed\n");
}
}
}
}
}
}}
}}}} }}}} }}}
exit(0);
}
void pr_exit(int status)
{
if (WIFEXITED(status))
printf("Exit status = %d\n",
WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("abnormal termination, signal number = %d%s\n",
WTERMSIG(status),
#ifdef WCOREDUMP
WCOREDUMP(status) ? " (core file generated)" : "");
#else
"");
#endif
else if (WIFSTOPPED(status))
printf("child stopped, signal number = %d\n",
WSTOPSIG(status));
}
/* Fatal error related to a system call.
* Print a message and terminate. */
void err_sys(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
err_doit(1, fmt, ap);
va_end(ap);
exit(1);
}
/* Print a message and return to caller.
* Caller specifies "errnoflag". */
static void err_doit(int errnoflag, const char *fmt, va_list ap)
{
int errno_save;
char buf[MAXLINE];
errno_save = errno; /* value caller might want printed */
vsprintf(buf, fmt, ap);
if (errnoflag)
sprintf(buf+strlen(buf), ": %s", strerror(errno_save));
strcat(buf, "\n");
fflush(stdout); /* in case stdout and stderr are the same */
fputs(buf, stderr);
fflush(NULL); /* flushes all stdio output streams */
return;
}

 

/**
Header file as given in the book - Advanced Programming in the UNIX Environment
by W. Richard Stevens & Stephen A. Rago
*/
/* Our own header, to be included *after* all standard system headers */
#ifndef __ourhdr_h
#define __ourhdr_h
#include <sys/types.h> /* required for some of our prototypes */
#include <stdio.h> /* for convenience */
#include <stdlib.h> /* for convenience */
#include <string.h> /* for convenience */
#include <unistd.h> /* for convenience */
#define MAXLINE 4096 /* max line length */
#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
/* default file access permissions for new files */
#define DIR_MODE (FILE_MODE | S_IXUSR | S_IXGRP | S_IXOTH)
/* default permissions for new directories */
typedef void Sigfunc(int); /* for signal handlers */
/* 4.3BSD Reno <signal.h> doesn't define SIG_ERR */
#if defined(SIG_IGN) && !defined(SIG_ERR)
#define SIG_ERR ((Sigfunc *)-1)
#endif
#define min(a,b) ((a) < (b) ? (a) : (b))
#define max(a,b) ((a) > (b) ? (a) : (b))
/* prototypes for our own functions */
char *path_alloc(int *); /* {Prog pathalloc} */
int open_max(void); /* {Prog openmax} */
void clr_fl(int, int); /* {Prog setfl} */
void set_fl(int, int); /* {Prog setfl} */
void pr_exit(int); /* {Prog prexit} */
void pr_mask(const char *); /* {Prog prmask} */
Sigfunc *signal_intr(int, Sigfunc *);/* {Prog signal_intr_function} */
int tty_cbreak(int); /* {Prog raw} */
int tty_raw(int); /* {Prog raw} */
int tty_reset(int); /* {Prog raw} */
void tty_atexit(void); /* {Prog raw} */
#ifdef ECHO /* only if <termios.h> has been included */
struct termios *tty_termios(void); /* {Prog raw} */
#endif
void sleep_us(unsigned int); /* {Ex sleepus} */
ssize_t readn(int, void *, size_t);/* {Prog readn} */
ssize_t writen(int, const void *, size_t);/* {Prog writen} */
int daemon_init(void); /* {Prog daemoninit} */
int s_pipe(int *); /* {Progs svr4_spipe bsd_spipe} */
int recv_fd(int, ssize_t (*func)(int, const void *, size_t));
/* {Progs recvfd_svr4 recvfd_43bsd} */
int send_fd(int, int); /* {Progs sendfd_svr4 sendfd_43bsd} */
int send_err(int, int, const char *);/* {Prog senderr} */
int serv_listen(const char *); /* {Progs servlisten_svr4 servlisten_44bsd} */
int serv_accept(int, uid_t *); /* {Progs servaccept_svr4 servaccept_44bsd} */
int cli_conn(const char *); /* {Progs cliconn_svr4 cliconn_44bsd} */
int buf_args(char *, int (*func)(int, char **));
/* {Prog bufargs} */
int ptym_open(char *); /* {Progs ptyopen_svr4 ptyopen_44bsd} */
int ptys_open(int, char *); /* {Progs ptyopen_svr4 ptyopen_44bsd} */
#ifdef TIOCGWINSZ
pid_t pty_fork(int *, char *, const struct termios *,
const struct winsize *); /* {Prog ptyfork} */
#endif
int lock_reg(int, int, int, off_t, int, off_t);
/* {Prog lockreg} */
#define read_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLK, F_RDLCK, offset, whence, len)
#define readw_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLKW, F_RDLCK, offset, whence, len)
#define write_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLK, F_WRLCK, offset, whence, len)
#define writew_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLKW, F_WRLCK, offset, whence, len)
#define un_lock(fd, offset, whence, len) \
lock_reg(fd, F_SETLK, F_UNLCK, offset, whence, len)
pid_t lock_test(int, int, off_t, int, off_t);
/* {Prog locktest} */
#define is_readlock(fd, offset, whence, len) \
lock_test(fd, F_RDLCK, offset, whence, len)
#define is_writelock(fd, offset, whence, len) \
lock_test(fd, F_WRLCK, offset, whence, len)
void err_dump(const char *, ...); /* {App misc_source} */
void err_msg(const char *, ...);
void err_quit(const char *, ...);
void err_ret(const char *, ...);
void err_sys(const char *, ...);
void log_msg(const char *, ...); /* {App misc_source} */
void log_open(const char *, int, int);
void log_quit(const char *, ...);
void log_ret(const char *, ...);
void log_sys(const char *, ...);
void TELL_WAIT(void); /* parent/child from {Sec race_conditions} */
void TELL_PARENT(pid_t);
void TELL_CHILD(pid_t);
void WAIT_PARENT(void);
void WAIT_CHILD(void);
#endif /* __ourhdr_h */
view raw ourhdr.h hosted with ❤ by GitHub