Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New features #134

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ PYTHON=python
all: build

add-submodules:
-git submodule add -b v0.8.0 https://github.com/alanxz/rabbitmq-c.git
git submodule add -b v0.9.0 https://github.com/alanxz/rabbitmq-c.git

submodules:
git submodule init
git submodule update

rabbitmq-c: submodules
(cd $(RABBIT_DIR); test -f configure || autoreconf -i)
(cd $(RABBIT_DIR); test -f Makefile || automake --add-missing)

(cd $(RABBIT_DIR); cmake .; cmake --build .)

rabbitmq-clean:
-(cd $(RABBIT_DIR) && make clean)
Expand Down Expand Up @@ -50,8 +48,7 @@ distclean: pyclean rabbitmq-distclean removepyc
-rm -f erl_crash.dump

$(RABBIT_TARGET):
(test -f config.h || cd $(RABBIT_DIR); ./configure --disable-tools --disable-docs)
(cd $(RABBIT_DIR); make)
(cd $(RABBIT_DIR); cmake .; cmake --build .;)


dist: rabbitmq-c $(RABBIT_TARGET)
Expand Down
51 changes: 42 additions & 9 deletions Modules/_librabbitmq/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#include <amqp_ssl_socket.h>
#include <amqp_framing.h>

#include "connection.h"
#include "distmeta.h"
Expand Down Expand Up @@ -974,6 +977,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
"channel_max",
"frame_max",
"heartbeat",
"ssl",
"confirmed",
"client_properties",
NULL
};
Expand All @@ -985,12 +990,15 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
int channel_max = 0xffff;
int frame_max = 131072;
int heartbeat = 0;
int ssl = 0;
int confirmed = 0;
int port = 5672;
PyObject *client_properties = NULL;

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiiiO", kwlist,
&hostname, &userid, &password, &virtual_host, &port,
&channel_max, &frame_max, &heartbeat, &client_properties)) {
&channel_max, &frame_max, &heartbeat, &ssl, &confirmed,
&client_properties)) {
return -1;
}

Expand All @@ -1012,6 +1020,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
self->channel_max = channel_max;
self->frame_max = frame_max;
self->heartbeat = heartbeat;
self->ssl = ssl;
self->confirmed = confirmed;
self->weakreflist = NULL;
self->callbacks = PyDict_New();
if (self->callbacks == NULL) return -1;
Expand Down Expand Up @@ -1057,7 +1067,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
}
Py_BEGIN_ALLOW_THREADS;
self->conn = amqp_new_connection();
socket = amqp_tcp_socket_new(self->conn);
if (self->ssl == 1 ) {
socket = amqp_ssl_socket_new(self->conn);
amqp_ssl_socket_set_verify_peer(socket, 0);
amqp_ssl_socket_set_verify_hostname(socket, 0);
} else {
socket = amqp_tcp_socket_new(self->conn);
}
Py_END_ALLOW_THREADS;

if (!socket) {
Expand Down Expand Up @@ -1132,14 +1148,22 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self)
unsigned int
PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel)
{
amqp_rpc_reply_t reply;
amqp_rpc_reply_t replyopen;
amqp_rpc_reply_t replyconfirm;

Py_BEGIN_ALLOW_THREADS;
amqp_channel_open(self->conn, channel);
reply = amqp_get_rpc_reply(self->conn);
replyopen = amqp_get_rpc_reply(self->conn);
if (self->confirmed){
amqp_confirm_select(self->conn, (amqp_channel_t)channel);
replyconfirm = amqp_get_rpc_reply(self->conn);
}
Py_END_ALLOW_THREADS;

return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel");
if ((replyopen.reply_type != AMQP_RESPONSE_NORMAL) || !(self->confirmed)) {
return PyRabbitMQ_HandleAMQError(self, 0, replyopen, "Couldn't create channel");
} else {
return PyRabbitMQ_HandleAMQError(self, 0, replyconfirm, "Couldn't set confirm mode");
}
}


Expand Down Expand Up @@ -1811,13 +1835,16 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
PyObject *exchange = NULL;
PyObject *routing_key = NULL;
PyObject *propdict;
amqp_frame_t frame;

unsigned int channel = 0;
unsigned int mandatory = 0;
unsigned int immediate = 0;

char *body_buf = NULL;
Py_ssize_t body_size = 0;

int status = 0;
int ret = 0;
amqp_basic_properties_t props;
amqp_bytes_t bytes;
Expand Down Expand Up @@ -1852,21 +1879,27 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
(amqp_boolean_t)immediate,
&props,
bytes);
if (self->confirmed){
status = amqp_simple_wait_frame_on_channel(self->conn,channel,&frame);
}
amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;

if (!PyRabbitMQ_HandleError(ret, "basic.publish")) {
goto error;
}
if ((self->confirmed) && (status != AMQP_STATUS_OK) &&
(frame.frame_type != AMQP_FRAME_METHOD) &&
(frame.payload.method.id != AMQP_BASIC_ACK_METHOD )){
goto error;
}
Py_RETURN_NONE;

error:
PyRabbitMQ_revive_channel(self, channel);
bail:
return 0;
}


/*
* Connection._basic_ack
*/
Expand Down
6 changes: 6 additions & 0 deletions Modules/_librabbitmq/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ typedef struct {
int frame_max;
int channel_max;
int heartbeat;
int ssl;
int confirmed;

int sockfd;
int connected;
Expand Down Expand Up @@ -271,6 +273,10 @@ static PyMemberDef PyRabbitMQ_ConnectionType_members[] = {
offsetof(PyRabbitMQ_Connection, port), READONLY, NULL},
{"heartbeat", T_INT,
offsetof(PyRabbitMQ_Connection, heartbeat), READONLY, NULL},
{"ssl", T_INT,
offsetof(PyRabbitMQ_Connection, ssl), READONLY, NULL},
{"confirmed", T_INT,
offsetof(PyRabbitMQ_Connection, confirmed), READONLY, NULL},
{"server_properties", T_OBJECT_EX,
offsetof(PyRabbitMQ_Connection, server_properties), READONLY, NULL},
{"connected", T_INT,
Expand Down
7 changes: 5 additions & 2 deletions librabbitmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,17 @@ class Connection(_librabbitmq.Connection):

def __init__(self, host='localhost', userid='guest', password='guest',
virtual_host='/', port=5672, channel_max=0xffff,
frame_max=131072, heartbeat=0, lazy=False,
frame_max=131072, heartbeat=0, ssl=False, confirmed=False, lazy=False,
client_properties=None, **kwargs):
if ':' in host:
host, port = host.split(':')
if ssl:
ssl = True
confirmed = confirmed if confirmed else kwargs.pop("confirm_publish",False)
super(Connection, self).__init__(
hostname=host, port=int(port), userid=userid, password=password,
virtual_host=virtual_host, channel_max=channel_max,
frame_max=frame_max, heartbeat=heartbeat,
frame_max=frame_max, heartbeat=heartbeat, ssl=int(ssl),confirmed=int(confirmed),
client_properties=client_properties,
)
self.channels = {}
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq-c
Submodule rabbitmq-c updated 104 files
36 changes: 18 additions & 18 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def append_env(L, e):
'amqp_socket.c',
'amqp_table.c',
'amqp_tcp_socket.c',
'amqp_openssl_hostname_validation.c',
'amqp_openssl.c',
'amqp_openssl_bio.c',
'amqp_time.c',
'amqp_url.c',
])
Expand All @@ -72,7 +75,7 @@ def append_env(L, e):

if is_linux: # Issue #42
libs.append('rt') # -lrt for clock_gettime

libs.append('ssl')
librabbitmq_ext = Extension(
'_librabbitmq',
sources=list(PyC_files) + list(librabbit_files),
Expand All @@ -96,6 +99,7 @@ def append_env(L, e):
class build(_build):
stdcflags = [
'-DHAVE_CONFIG_H',
'-DENABLE_SSL_SUPPORT=ON',
]
if os.environ.get('PEDANTIC'):
# Python.h breaks -pedantic, so can only use it while developing.
Expand Down Expand Up @@ -123,26 +127,22 @@ def run(self):
)

try:

if not os.path.isdir(os.path.join(LRMQDIST(), '.git')):
print('- pull submodule rabbitmq-c...')
if os.path.isfile('Makefile'):
os.system(' '.join([make, 'submodules']))
else:
os.system(' '.join(['git', 'clone', '-b', 'v0.8.0',
'https://github.com/alanxz/rabbitmq-c.git',
'rabbitmq-c']))
print('- pull submodule rabbitmq-c...')
if os.path.isfile('Makefile'):
os.system(' '.join([make, 'submodules']))
else:
os.system(' '.join(['git', 'clone', '-b', 'v0.9.0',
'https://github.com/alanxz/rabbitmq-c.git',
'rabbitmq-c']))

os.chdir(LRMQDIST())

if not os.path.isfile('configure'):
print('- autoreconf')
os.system('autoreconf -i')

if not os.path.isfile('config.h'):
print('- configure rabbitmq-c...')
if os.system('/bin/sh configure --disable-tools \
--disable-docs --disable-dependency-tracking'):
return
print('- cmake')
os.system('cmake .')
dhananjaysathe marked this conversation as resolved.
Show resolved Hide resolved
print(' -build')
os.system('cmake --build .')
finally:
os.environ.update(restore)
finally:
Expand All @@ -157,7 +157,7 @@ def run(self):
return librabbitmq_ext, build


def find_make(alt=('gmake', 'gnumake', 'make', 'nmake')):
def find_make(alt=('gmake', 'gnumake', 'make', 'nmake','cmake')):
for path in os.environ['PATH'].split(':'):
for make in (os.path.join(path, m) for m in alt):
if os.path.isfile(make):
Expand Down