Logo Search packages:      
Sourcecode: yap version File versions  Download package

mpi.c

/*************************************************************************
*                                                      *
*      YAP Prolog                                            *
*                                                      *
*     Yap Prolog was developed at NCCUP - Universidade do Porto    *
*                                                      *
* Copyright S. Konstantopoulos and Universidade do Porto 2002-2003       *
*                                                      *
**************************************************************************
*                                                      *
* File:           mpi.c                                            *
* Last rev: $Date: 2003/07/03 15:01:18 $                     *
* mods:                                                      *
* comments: Interface to MPI libraries                               *
*                                                      *
*************************************************************************/

#ifndef lint
static char *rcsid = "$Header: /cvsroot/yap/library/mpi/mpi.c,v 1.20 2003/07/03 15:01:18 stasinos Exp $";
#endif

#include "Yap.h"

#if HAVE_MPI

#include "Yatom.h"
#include "yapio.h"

/* for AtomEof */
#include "Heap.h"

#include <stdlib.h>
#include <string.h>
#include <mpi.h>

Term    STD_PROTO(YAP_Read, (int (*)(void)));
void    STD_PROTO(YAP_Write, (Term, void (*)(int), int));

STATIC_PROTO (Int p_mpi_open, (void));
STATIC_PROTO (Int p_mpi_close, (void));
STATIC_PROTO (Int p_mpi_send, (void));
STATIC_PROTO (Int p_mpi_receive, (void));
STATIC_PROTO (Int p_mpi_bcast3, (void));
STATIC_PROTO (Int p_mpi_bcast2, (void));
STATIC_PROTO (Int p_mpi_barrier, (void));


/*
 * Auxiliary Data
 */

static Int rank, numprocs, namelen;
static char processor_name[MPI_MAX_PROCESSOR_NAME];

static Int mpi_argc;
static char **mpi_argv;

/* this should eventually be moved to config.h */
#define RECV_BUF_SIZE 1024*32


/* 
 * A simple stream
 */

static size_t bufsize, bufstrlen;
static char *buf;
static int bufptr;

static void
expand_buffer( int space )
{
#if MPI_AVOID_REALLOC
  /*
    realloc() has been SIGSEGV'ing on HP-UX 10.20, but there is
    no problem in HP-UX 11.0. We can remove this bit here as soon
    as Yap stops compiling on 10.20 anyway. If removed, also remove
    the MPI_AVOID_REALLOC bits from configure.in and config.h.in
  */

  char *tmp;

  tmp = malloc( bufsize + space );
  if( tmp == NULL ) {
    Yap_Error(SYSTEM_ERROR, TermNil, "out of memory" );
    Yap_exit( EXIT_FAILURE );
  }
  memcpy( tmp, buf, bufsize );
  free( buf );
  buf = tmp;
#else /* use realloc */
  buf = realloc( buf, bufsize + space );
  if( buf == NULL ) {
    Yap_Error(SYSTEM_ERROR, TermNil, "out of memory");
    Yap_exit( EXIT_FAILURE );
  }
#endif

  bufsize += space;
}

static void
mpi_putc(Int ch)
{
  if( ch > 0 ) {
    if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE );
    buf[bufptr++] = ch;
  }
}

static Int
mpi_getc(void)
{
  if( bufptr < bufsize ) return buf[bufptr++];
  else return -1;
}




/*
 * C Predicates
 */


static Int
p_mpi_open(void)         /* mpi_open(?rank, ?num_procs, ?proc_name) */
{
  Term t_rank = Deref(ARG1), t_numprocs = Deref(ARG2), t_procname = Deref(ARG3);
  Int retv;

/*
With MPICH MPI_Init() must be called during initialisation,
but with LAM it can be called from Prolog (mpi_open/3)

The symptoms match a known RedHat bug, see
http://email.osc.edu/pipermail/mpiexec/2002-July/000067.html
for a suggested workaround:
  Redhat have somehow broken their sem.h and ipc.h. If you use your own
  kernel, copy from ../src/kernel/include/asm & ../src/kernel/include/linux
  the file ipc.h and sem.h to /usr/include/sys, recompile your mpich and
  everything might start working.  (it did for us)
*/

/*
Note that if MPI_Init() fails, Yap/MPICH and Yap/LAM behave differently:
in Yap/MPICH we are still at the Yap initialisation phase, so we get
Yap exit(FAILURE), whereas in Yap/LAM mpi_open/3 simply fails.
*/

  retv = MPI_Init( &mpi_argc, &mpi_argv );
  if( retv ) {
    Term t;

    t = MkIntegerTerm(retv);
    Yap_Error( SYSTEM_ERROR, t, "MPI_Init() returned non-zero" );
    return FALSE;
  }
  MPI_Comm_size( MPI_COMM_WORLD, &numprocs );
  MPI_Comm_rank( MPI_COMM_WORLD, &rank );
  MPI_Get_processor_name( processor_name, &namelen );

  retv = Yap_unify(t_rank, MkIntTerm(rank));
  retv = retv && Yap_unify(t_numprocs, MkIntTerm(numprocs));
  retv = retv && Yap_unify(t_procname, MkAtomTerm(Yap_LookupAtom(processor_name)));

  return retv;
}


static Int               /* mpi_close */
p_mpi_close()
{
  MPI_Finalize();
  return TRUE;
}


static Int
p_mpi_send()             /* mpi_send(+data, +destination, +tag) */
{
  Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3);
  int tag, dest, retv;

  /* The first argument (data) must be bound */
  if (IsVarTerm(t_data)) {
    Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_send");
    return (FALSE);
  }

  /* The second and third args must be bount to integers */
  if (IsVarTerm(t_dest)) {
    Yap_Error(INSTANTIATION_ERROR, t_dest, "mpi_send");
    return (FALSE);
  } else if( !IsIntegerTerm(t_dest) ) {
    Yap_Error(TYPE_ERROR_INTEGER, t_dest, "mpi_send");
    return (FALSE);
  } else {
    dest = IntOfTerm( t_dest );
  }
  if (IsVarTerm(t_tag)) {
    Yap_Error(INSTANTIATION_ERROR, t_tag, "mpi_send");
    return (FALSE);
  } else if( !IsIntegerTerm(t_tag) ) {
    Yap_Error(TYPE_ERROR_INTEGER, t_tag, "mpi_send");
    return (FALSE);
  } else {
    tag  = IntOfTerm( t_tag );
  }

  /* Turn the term into its ASCII representation */
  bufptr = 0;
  YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );

  /* The buf is not NULL-terminated and does not have the
     trailing ". " required by the parser */
  mpi_putc( '.' );
  mpi_putc( ' ' );
  mpi_putc( 0 );
  bufstrlen = strlen(buf);

  /* send the data */
  bufptr = 0;
  retv = MPI_Send( &buf[bufptr], bufstrlen, MPI_CHAR, dest, tag, MPI_COMM_WORLD );
  if( retv != MPI_SUCCESS ) return FALSE;

  return TRUE;
}


static Int
p_mpi_receive()          /* mpi_receive(-data, ?orig, ?tag) */
{
  Term t, t_data = Deref(ARG1), t_orig = Deref(ARG2), t_tag = Deref(ARG3);
  int tag, orig, retv;
  MPI_Status status;

  /* The first argument (data) must be unbound */
  if(!IsVarTerm(t_data)) {
    Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
    return FALSE;
  }

  /* The second argument (source) must be bound to an integer
     (the rank of the source) or left unbound (i.e. any source
     is OK) */
  if (IsVarTerm(t_orig)) {
    orig = MPI_ANY_SOURCE;
  } else if( !IsIntegerTerm(t_orig) ) {
    Yap_Error(TYPE_ERROR_INTEGER, t_orig, "mpi_receive");
    return (FALSE);
  } else {
    orig = IntOfTerm( t_orig );
  }

  /* The third argument must be bound to an integer (the tag)
     or left unbound (i.e. any tag is OK) */
  if (IsVarTerm(t_tag)) {
    tag = MPI_ANY_TAG;
  } else if( !IsIntegerTerm(t_tag) ) {
    Yap_Error(TYPE_ERROR_INTEGER, t_tag, "mpi_receive");
    return (FALSE);
  } else
    tag  = IntOfTerm( t_tag );

  /* probe for the size of the term */
  retv = MPI_Probe( orig, tag, MPI_COMM_WORLD, &status );
  if( retv != MPI_SUCCESS ) {
    return FALSE;
  }
  MPI_Get_count( &status, MPI_CHAR, &bufstrlen );

  /* adjust the buffer */
  if( bufsize < bufstrlen ) expand_buffer(bufstrlen-bufsize);

  /* Already know the source from MPI_Probe() */
  if( orig == MPI_ANY_SOURCE ) {
    orig = status.MPI_SOURCE;
    retv = Yap_unify(t_orig, MkIntTerm(orig));
    if( retv == FALSE ) {
      printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
    }
  }

  /* Already know the tag from MPI_Probe() */
  if( tag == MPI_ANY_TAG ) {
    tag = status.MPI_TAG;
    retv = Yap_unify(t_tag, MkIntTerm(status.MPI_TAG));
    if( retv == FALSE ) {
      printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
    } 
  }

  /* Receive the message as a C string */
  retv = MPI_Recv( buf, bufstrlen, MPI_CHAR, orig, tag,
               MPI_COMM_WORLD, &status );
  if( retv != MPI_SUCCESS ) {
    /* Getting in here would be weird; it means the first package
       (size) was sent properly, but there was a glitch with
       the actual content! */
    return FALSE;
  }

  /* parse received string into a Prolog term */

  bufptr = 0;
  t = YAP_Read( mpi_getc );

  if( t == TermNil ) {
    retv = FALSE;
  }
  else {
    retv = Yap_unify(t, t_data);
  }

  return retv;
}


static Int
p_mpi_bcast3()           /* mpi_bcast( ?data, +root, +max_size ) */
{
  Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3);
  int root, retv, max_size;

  /* The second argument must be bound to an integer (the rank of
     root processor */
  if (IsVarTerm(t_root)) {
    Yap_Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
    return FALSE;
  }
  root = IntOfTerm( t_root );

  /*  If this is the root processor, then the first argument must
      be bound to the term to be sent. */
  if( root == rank ) {
    if( IsVarTerm(t_data) ) {
      Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
      return FALSE;
    }
    /* Turn the term into its ASCII representation */
    bufptr = 0;
    YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
    /* NULL-terminate the string and add the ". " termination
       required by the parser. */
    mpi_putc( '.' );
    mpi_putc( ' ' );
    mpi_putc( 0 );
    bufstrlen = strlen(buf);
  }

  /* The third argument must be bound to an integer (the maximum length
     of the broadcast term's ASCII representation */
  if (IsVarTerm(t_max_size)) {
    Yap_Error(INSTANTIATION_ERROR, t_max_size, "mpi_bcast");
    return FALSE;
  }
  /* allow for the ". " bit and the NULL at the end */
  max_size = IntOfTerm( t_max_size ) + 3;

  if( max_size < bufstrlen ) {
    /* issue a warning? explode? bcast s'thing unparsable? */
    printf( "MAYDAY: max_size == %d, bufstrlen == %d\n ", max_size, bufstrlen);
    return FALSE;
  }

  /* adjust the buffer size, if necessary */
  if( max_size > bufsize ) {
    expand_buffer( max_size-bufsize );
  }

  retv = MPI_Bcast( buf, max_size, MPI_CHAR, root, MPI_COMM_WORLD );
  if( retv != MPI_SUCCESS ) {
    printf( "OOOPS! MPI_Bcast() returned %d.\n", retv );
    return FALSE;
  }

  if( root == rank ) return TRUE;
  else {
    /* ARG1 must be unbound so that it can receive data */
    if( !IsVarTerm(t_data) ) {
      Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
      return FALSE;
    }

    bufstrlen = strlen(buf);
    bufptr = 0;

    /* parse received string into a Prolog term */
    return Yap_unify( YAP_Read(mpi_getc), ARG1 );
  }    
}


/*
  This is the same as above, but for dynamic data size.
  It is implemented as two broadcasts, the first being the size
  and the second the actual data.
*/

static Int
p_mpi_bcast2()           /* mpi_bcast( ?data, +root ) */
{
  Term t_data = Deref(ARG1), t_root = Deref(ARG2);
  int root, retv;

  /* The second argument must be bound to an integer (the rank of
     root processor */
  if (IsVarTerm(t_root)) {
    Yap_Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
    return FALSE;
  }
  root = IntOfTerm( t_root );


  /*  If this is the root processor, then the first argument must
      be bound to the term to be sent. */
  if( root == rank ) {
    if( IsVarTerm(t_data) ) {
      Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
      return FALSE;
    }
    bufptr = 0;
    /* Turn the term into its ASCII representation */
    YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
    /* NULL-terminate the string and add the ". " termination
       required by the parser. */
    buf[bufptr] = 0;
    strcat( buf, ". " );
    bufstrlen = bufptr + 2;
  }
  /* Otherwise, it must a variable */
  else {
    if( !IsVarTerm(t_data) ) {
      Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
      return FALSE;
    }
  }


  /* Broadcast the data size */
  retv = MPI_Bcast( &bufstrlen, sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD );
  if( retv != MPI_SUCCESS ) {
    printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
    return FALSE;
  }

  /* adjust the buffer size, if necessary */
  if( bufstrlen > bufsize ) {
    expand_buffer( bufstrlen - bufsize );
  }
  /* Broadcast the data */
  retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD );
  if( retv != MPI_SUCCESS ) {
    printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
    return FALSE;
  }

  if( root == rank ) return TRUE;
  else {
    /* ARG1 must be unbound so that it can receive data */
    if( !IsVarTerm(t_data) ) {
      Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
      return FALSE;
    }

    bufstrlen = strlen(buf);
    bufptr = 0;

    return Yap_unify(YAP_Read( mpi_getc ), ARG1);
  }
}


static Int
p_mpi_barrier()            /* mpi_barrier/0 */
{
  int retv;

  retv = MPI_Barrier( MPI_COMM_WORLD );

  return (retv == 0);
}



/*
 * Init
 */


void
Yap_InitMPI(void)
{
  int i,j;

  mpi_argv = malloc( Yap_argc * sizeof(char *) );
  mpi_argv[0] = strdup( Yap_argv[0] );

  bufsize = RECV_BUF_SIZE;
  buf = malloc(bufsize * sizeof(char));

  for( i=1; i<Yap_argc; ++i ) {
    if( !strcmp(Yap_argv[i], "--") ) { ++i; break; }
  }
  for( j=1; i<Yap_argc; ++i, ++j ) {
    mpi_argv[j] = strdup( Yap_argv[i] );
  }
  mpi_argc = j;

  mpi_argv[0] = strdup( Yap_argv[0] );

  Yap_InitCPred( "mpi_open", 3, p_mpi_open, SafePredFlag|SyncPredFlag );
  Yap_InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag|SyncPredFlag );
  Yap_InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag|SyncPredFlag );
  Yap_InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag|SyncPredFlag );
  Yap_InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SafePredFlag|SyncPredFlag );
  Yap_InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SafePredFlag|SyncPredFlag );
  Yap_InitCPred( "mpi_barrier", 0, p_mpi_barrier, SafePredFlag|SyncPredFlag );
}

#endif /* HAVE_MPI */

Generated by  Doxygen 1.6.0   Back to index