Skip to content

Commit

Permalink
removed .cfg file and merged Lisandro's changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cekees committed Nov 6, 2012
2 parents 9264bc3 + 9d64f8d commit 3383ce1
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 15 deletions.
20 changes: 18 additions & 2 deletions src/MPI/Comm.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ cdef class Comm:
def Mprobe(self, int source=0, int tag=0, Status status=None):
"""
Blocking test for a message
.. note:: This function blocks until the message arrives.
"""
cdef MPI_Message cmessage = MPI_MESSAGE_NULL
cdef MPI_Status *statusp = arg_Status(status)
Expand Down Expand Up @@ -1185,6 +1183,24 @@ cdef class Comm:
request.ob_buf = PyMPI_irecv(obj, dest, tag, comm, &request.ob_mpi)
return request
#
def mprobe(self, int source=0, int tag=0, Status status=None):
cdef MPI_Comm comm = self.ob_mpi
cdef MPI_Status *statusp = arg_Status(status)
cdef Message message = <Message>Message.__new__(Message)
message.ob_buf = PyMPI_mprobe(source, tag, comm,
&message.ob_mpi, statusp)
return message
#
def improbe(self, int source=0, int tag=0, Status status=None):
cdef int flag = 0
cdef MPI_Comm comm = self.ob_mpi
cdef MPI_Status *statusp = arg_Status(status)
cdef Message message = <Message>Message.__new__(Message)
message.ob_buf = PyMPI_improbe(source, tag, comm, &flag,
&message.ob_mpi, statusp)
if flag == 0: return None
return message
#
def barrier(self):
"Barrier"
cdef MPI_Comm comm = self.ob_mpi
Expand Down
61 changes: 53 additions & 8 deletions src/MPI/Message.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ cdef class Message:
int source=0, int tag=0, Status status=None):
"""
Blocking test for a message
.. note:: This function blocks until the message arrives.
"""
cdef MPI_Message cmessage = MPI_MESSAGE_NULL
cdef MPI_Status *statusp = arg_Status(status)
Expand Down Expand Up @@ -66,7 +64,7 @@ cdef class Message:
"""
Blocking receive of matched message
"""
cdef MPI_Message message = self.ob_mpi
cdef MPI_Message message = self.ob_mpi
cdef int source = MPI_ANY_SOURCE
if message == MPI_MESSAGE_NO_PROC:
source = MPI_PROC_NULL
Expand All @@ -77,12 +75,12 @@ cdef class Message:
&message, statusp) )
if self is not __MESSAGE_NO_PROC__:
self.ob_mpi = message

def Irecv(self, buf):
"""
Nonblocking receive of matched message
"""
cdef MPI_Message message = self.ob_mpi
cdef MPI_Message message = self.ob_mpi
cdef int source = MPI_ANY_SOURCE
if message == MPI_MESSAGE_NO_PROC:
source = MPI_PROC_NULL
Expand All @@ -95,7 +93,54 @@ cdef class Message:
self.ob_mpi = message
request.ob_buf = rmsg
return request


# Python Communication
# --------------------
#
@classmethod
def probe(cls, Comm comm not None,
int source=0, int tag=0, Status status=None):
cdef Message message = <Message>Message.__new__(cls)
cdef MPI_Status *statusp = arg_Status(status)
message.ob_buf = PyMPI_mprobe(source, tag, comm.ob_mpi,
&message.ob_mpi, statusp)
return message
#
@classmethod
def iprobe(cls, Comm comm not None,
int source=0, int tag=0, Status status=None):
cdef int flag = 0
cdef Message message = <Message>Message.__new__(cls)
cdef MPI_Status *statusp = arg_Status(status)
message.ob_buf = PyMPI_improbe(source, tag, comm.ob_mpi, &flag,
&message.ob_mpi, statusp)
if flag == 0: return None
return message
#
def recv(self, obj=None, Status status=None):
"""
Blocking receive of matched message
"""
cdef MPI_Message message = self.ob_mpi
cdef object rmsg = self.ob_buf # if obj is None else obj
cdef MPI_Status *statusp = arg_Status(status)
rmsg = PyMPI_mrecv(rmsg, &message, statusp)
if self is not __MESSAGE_NO_PROC__: self.ob_mpi = message
if self.ob_mpi == MPI_MESSAGE_NULL: self.ob_buf = None
return rmsg
#
def irecv(self, obj=None, Status status=None):
"""
Nonblocking receive of matched message
"""
cdef MPI_Message message = self.ob_mpi
cdef object rmsg = self.ob_buf # if obj is None else obj
cdef Request request = <Request>Request.__new__(Request)
request.ob_buf = PyMPI_imrecv(rmsg, &message, &request.ob_mpi)
if self is not __MESSAGE_NO_PROC__: self.ob_mpi = message
if self.ob_mpi == MPI_MESSAGE_NULL: self.ob_buf = None
return request

# Fortran Handle
# --------------

Expand All @@ -113,8 +158,8 @@ cdef class Message:
return message


cdef Message __MESSAGE_NULL__ = new_Message ( MPI_MESSAGE_NULL )
cdef Message __MESSAGE_NO_PROC__ = new_Message ( MPI_MESSAGE_NO_PROC )
cdef Message __MESSAGE_NULL__ = new_Message ( MPI_MESSAGE_NULL )
cdef Message __MESSAGE_NO_PROC__ = new_Message ( MPI_MESSAGE_NO_PROC )


# Predefined message handles
Expand Down
69 changes: 68 additions & 1 deletion src/MPI/msgpickle.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ cdef object PyMPI_recv(object obj, int source, int tag,
if obj is None:
with nogil:
if USE_MATCHED_RECV:
CHKERR( MPI_Mprobe(source, tag,comm, &match, &rsts) )
CHKERR( MPI_Mprobe(source, tag, comm, &match, &rsts) )
else:
CHKERR( MPI_Probe(source, tag, comm, &rsts) )
CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
Expand Down Expand Up @@ -545,6 +545,73 @@ cdef object PyMPI_testall(requests, int *flag, statuses):

# -----------------------------------------------------------------------------

cdef object PyMPI_mprobe(int source, int tag, MPI_Comm comm,
MPI_Message *message, MPI_Status *status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef void* rbuf = NULL
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
cdef MPI_Status rsts
if (status == MPI_STATUS_IGNORE): status = &rsts
with nogil: CHKERR( MPI_Mprobe(source, tag, comm, message, status) )
if message[0] == MPI_MESSAGE_NO_PROC: return None
CHKERR( MPI_Get_count(status, rtype, &rcount) )
cdef object rmsg = pickle.alloc(&rbuf, rcount)
return rmsg

cdef object PyMPI_improbe(int source, int tag, MPI_Comm comm, int *flag,
MPI_Message *message, MPI_Status *status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef void* rbuf = NULL
cdef int rcount = 0
cdef MPI_Datatype rtype = MPI_BYTE
cdef MPI_Status rsts
if (status == MPI_STATUS_IGNORE): status = &rsts
with nogil: CHKERR( MPI_Improbe(source, tag, comm, flag, message, status) )
if flag[0] == 0 or message[0] == MPI_MESSAGE_NO_PROC: return None
CHKERR( MPI_Get_count(status, rtype, &rcount) )
cdef object rmsg = pickle.alloc(&rbuf, rcount)
return rmsg

cdef object PyMPI_mrecv(object rmsg,
MPI_Message *message, MPI_Status *status):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef void* rbuf = NULL
cdef MPI_Aint rlen = 0
cdef MPI_Datatype rtype = MPI_BYTE
if message[0] == MPI_MESSAGE_NO_PROC:
rmsg = None
elif rmsg is None:
pass
elif PyBytes_CheckExact(rmsg):
getbuffer_r(rmsg, &rbuf, &rlen)
else:
rmsg = getbuffer_w(rmsg, &rbuf, &rlen)
cdef int rcount = <int> rlen # XXX overflow?
with nogil: CHKERR( MPI_Mrecv(rbuf, rcount, rtype, message, status) )
rmsg = pickle.load(rmsg)
return rmsg

cdef object PyMPI_imrecv(object rmsg,
MPI_Message *message, MPI_Request *request):
cdef _p_Pickle pickle = PyMPI_pickle()
cdef void* rbuf = NULL
cdef MPI_Aint rlen = 0
cdef MPI_Datatype rtype = MPI_BYTE
if message[0] == MPI_MESSAGE_NO_PROC:
rmsg = None
elif rmsg is None:
pass
elif PyBytes_CheckExact(rmsg):
rmsg = getbuffer_r(rmsg, &rbuf, &rlen)
else:
rmsg = getbuffer_w(rmsg, &rbuf, &rlen)
cdef int rcount = <int> rlen # XXX overflow?
with nogil: CHKERR( MPI_Imrecv(rbuf, rcount, rtype, message, request) )
return rmsg

# -----------------------------------------------------------------------------

cdef object PyMPI_barrier(MPI_Comm comm):
with nogil: CHKERR( MPI_Barrier(comm) )
return None
Expand Down
1 change: 1 addition & 0 deletions src/include/mpi4py/MPI.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ ctypedef public api class Message [
]:
cdef MPI_Message ob_mpi
cdef int flags
cdef object ob_buf

ctypedef public api class Op [
type PyMPIOp_Type,
Expand Down
8 changes: 4 additions & 4 deletions test/test_p2p_matched.py → test/test_p2p_buf_matched.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def testProbeRecv(self):
m = comm.Mprobe(0, 0)
self.assertTrue(isinstance(m, MPI.Message))
self.assertTrue(m)
n = comm.Improbe(1, 0)
n = comm.Improbe(0, 0)
self.assertEqual(n, None)
rr = m.Irecv(rbuf.as_raw())
self.assertFalse(m)
Expand All @@ -99,7 +99,7 @@ def testProbeRecv(self):
comm.Send(sbuf.as_mpi(), 1, 0)
m = comm.Mprobe(1, 0)
self.assertTrue(m)
n = comm.Improbe(1, 0)
n = comm.Improbe(0, 0)
self.assertEqual(n, None)
m.Recv(rbuf.as_raw())
self.assertFalse(m)
Expand All @@ -123,9 +123,9 @@ def testProbeRecv(self):
m = None
while not m:
m = comm.Improbe(0, 1)
m.Irecv(rbuf).Wait()
m.Irecv(rbuf.as_mpi()).Wait()
comm.Send(sbuf.as_mpi(), 0, 1)
n = comm.Improbe(0, 0)
n = comm.Improbe(0, 1)
self.assertEqual(n, None)
else:
rbuf = sbuf
Expand Down
Loading

0 comments on commit 3383ce1

Please sign in to comment.