Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement: Create new message handler for inbound/outbound messages to support RabbitMQ/others #88

Open
justanu opened this issue Jun 4, 2018 · 20 comments
Assignees

Comments

@justanu
Copy link

justanu commented Jun 4, 2018

This is a new feature not a bug.
I would like to create a new handler for both inbound and outbound messages:

  • outbound messages would be retrieved from RabbitMQ/AMQP instead of file system (or any other messaging system);
  • inbound messages will be sent to a dedicated queue in RabbitMQ/AMPQ instead of file system (or any other messaging system).
    There will be two queues created, one for inbound, another for outbound. The MQ message will contain sender and receiver (eg AS2 identifier) at the header level.
    I would need in config.xml to be able to specify the implementation handler for both inbound and outbound message.
    I am willing to work on it once I get the workspace setup.
@jsmucr
Copy link
Contributor

jsmucr commented Jun 5, 2018

Good that you mention this. I've already developed modules that communicate with Apache ActiveMQ Artemis like this. The problem is that OpenAS2 isn't quite ready for this, so the solution is a bit hacky. But we're planning to put in into production. One day, soon.

@uhurusurfa
Copy link
Contributor

uhurusurfa commented Jun 5, 2018

@jsmucr Please feel free to suggest how OpenAS2 can be adapted to make it easier to create a message queue module

@justanu You can follow the suggestions on how to add the inbound and outbound handlers as discussed here: https://sourceforge.net/p/openas2/discussion/265567/thread/75b5def5/
Essentially, for inbound messages implement a class that replaces org.openas2.processor.storage.MessageFileModule for passing received messages into a queue. Declare the module in the same way as the MessageFileModule is declared .

Similarly for outbound messages implement a class that replaces org.openas2.processor.receiver.AS2DirectoryPollingModule for receiving messages to be sent via AS2 from the message queue

You should be able to copy the 2 classes listed above and then replace the relevant code in the class with code that reads/writes message queues.

@justanu
Copy link
Author

justanu commented Jun 5, 2018

@uhurusurfa will follow the instructions and update the ticket. Thanks everyone.

@jsmucr
Copy link
Contributor

jsmucr commented Jun 7, 2018

@uhurusurfa Sure. I didn't mean to be rude. I appreciate the software very much and will gladly help with its development (however my boss allows me to :-)).

This is an example of my code (Groovy). There's a nasty thing I had to use to be able to create an AS2Message instance with partnership details preset (since there's no input file path to extract these things from):

import cz.aimtec.mq.MapUtils
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.openas2.OpenAS2Exception
import org.openas2.Session
import org.openas2.message.AS2Message
import org.openas2.message.Message

class MessageSourceModule extends ClouEDIMQSourceModule {

    private ThreadLocal<Map> messageProperties = new ThreadLocal<>()

    @Override
    void init(Session session, Map<String, String> parameters) throws OpenAS2Exception {
        super.init(session, parameters)
        consumer.receiveMessageAsync(this.&receive)
    }

    @Override
    protected Message createMessage() {
        final msg = new AS2Message()
        final props = messageProperties.get()
        msg.partnership.senderIDs.name = props.message.sender.toString()
        msg.partnership.receiverIDs.name = props.message.receiver.toString()
        return msg
    }

    @Override
    void doStart() throws OpenAS2Exception {
        consumer.start()
    }

    @Override
    void doStop() throws OpenAS2Exception {
        consumer.stop()
    }

    @Override
    boolean healthcheck(List<String> failures) {
        return true
    }

    protected boolean receive(ClientMessage msg) {
        // HACK: I need to propagate certain received params to the createMessage() method
        messageProperties.set(MapUtils.explodeKeys(msg.toPropertyMap(), '.' as char))
        processDocument(msg.bodyInputStream, msg.getStringProperty('message.file.name'))
        messageProperties.set(null)
    }
}

@justanu
Copy link
Author

justanu commented Jun 14, 2018

Is there a way to easily debug the server?
I've built MessageAMQPModule to send AS2 messages to AMQP queue instead of file system, set new module on config.xml.
I would like now to start the server in debug mode, can someone explain shortly how I can do that? In the mean time , I will try on my own.

@jsmucr
Copy link
Contributor

jsmucr commented Jun 14, 2018

Try adding the server itself as a Maven dependency to your module. The only downside is that the latest version available there is OpenAS2 2.4.1.

@uhurusurfa
Copy link
Contributor

I would suggest your first step should be a unit test but you can use the unit test that does a full system test(/src/test/java/org/openas2/app/OpenAS2ServerTest.java) and run it in debug mode in your IDE.
This is easily accomplished in Eclipse and can provide more details to do this if you are using Eclipse

@justanu
Copy link
Author

justanu commented Jun 15, 2018

I got the inbound working nicely:

  • defined new module, eg:
<module classname="org.openas2.processor.storage.MessageAMQPModule"
			mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol"
			mq_exchange="as2"
			mq_routingkey="as2"/>
  • MQ shows message with headers set to AS2 message headers and content body in the MQ payload.

Header:
AS2-From: | OpenAS2A_OID
AS2-To: | OpenAS2B_OID
AS2-Version: | 1.1
....
Payload: Cg8wh2Qu.....

Will work on outbound as well.

@justanu
Copy link
Author

justanu commented Jun 17, 2018

Got the outbound working as well, however there are few other things to consider, the retry and the MDN.

So far here is an outbound configuration module:
<module classname="org.openas2.processor.receiver.AS2AMQPPollingModule" interval="20" mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol" mq_queue="AS2_Outbound" format="sender.as2_id, receiver.as2_id, attributes.fileid" errordir="%home%/../data/toOpenAS2B/error"/>

The interval is irrelevant since whenever the module starts, it listen to the messages on the AMQP queue. This is good since it actions immedialtely on receving of the message.

The MQ message must have AS2-From and AS2-To set to match the AS2 ides as defined on the partnership.xml configuration file.

I still need to see how I can replace the "errorDir" with a different mechanism, and probably create a retry module from queue instead of file system.

@justanu
Copy link
Author

justanu commented Jun 18, 2018

Changed it slightly as following:

INBOUND
<module classname="org.openas2.processor.storage.MessageAMQPModule" mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol" mq_queue="AS2_Inbound"/>

OUTBOUND
<module classname="org.openas2.processor.receiver.AS2AMQPPollingModule" interval="9999" mq_connection_string="amqp://user1:user1@ubuntudev:5672/soasol" mq_queue="AS2_Outbound" mq_error_queue="AS2_Error" format="sender.as2_id, receiver.as2_id, attributes.fileid" errordir=""/>

The interval and errordir are not used. Will build test cases and provide the code

@uhurusurfa
Copy link
Contributor

Make sure you test with the latest version in the "dev" branch. There are significant changes to the base code for handling HTTP transport as well as fixes for resend functionality but no change to the module interface API so it should work the same

@uhurusurfa
Copy link
Contributor

@justanu Have you managed to complete this feature to include in the next release?

@uhurusurfa
Copy link
Contributor

@justanu Are you still going to provide the solution back to the project or shall I close this issue?

@igwtech
Copy link
Member

igwtech commented Oct 24, 2019

If there is no progress I'll pick up on this.

@igwtech
Copy link
Member

igwtech commented Nov 4, 2019

I'm picking up on this. After some work, I'll have to do some refactoring and code reorganization of the packages. Is that possible?

@igwtech
Copy link
Member

igwtech commented Nov 4, 2019

Basically, I want to extract the DirectoryPollingModule.java Interface and relocate it into its own package to simplify the creation of similar extensions as external libraries to the project.

@uhurusurfa
Copy link
Contributor

I am prpbably missing something but not sure what exactly you want to do in terms of refactoring.
The DirectoryPollingModule is already a pluggable module. A RabbitMQ implementation would simply use the same interface as the DirectoryPollingModule to pass files into OpenAS2.

ie. You simply extend MessageBuilderModule

Any module you plug in would then pass files it has to the AS2SenderModule via the MessageBuilder implementation.

@igwtech igwtech self-assigned this Nov 26, 2019
@mstroming
Copy link

mstroming commented Nov 21, 2024

@igwtech @justanu @uhurusurfa @jsmucr Somewhat related to this topic, are you aware of an existing module that allows inboxdir and outboxdir in polling configurations to be directories on an FTP server?

@uhurusurfa
Copy link
Contributor

Not sure what your use case is but I am not sure why you want OpenAS2 to do.
If you want to use an FTP server to send files to OpenAS2 then that would be something you configure in an FTP server to target the outboxdir in OpenAS2 for the FTP servers target and nothing you need doing in OpenAS2.
For using FTP to send files received over OpenAS2 to a downstream destination you would need a poller that invokes the FTP server. That could be done by a module in OpenAS2 that invokes a configured FTP server when aa file arrives (could be on a per partnership basis).

However, it is unclear why you would not simply run OpenAS2 on a server that is the ultimate destination of the file since OpenAS2 is in effect an FTP server with added features for security.

@mstroming
Copy link

@uhurusurfa Thanks very much for your feedback. To your point below, that's what I'm going with. My use case is that I have an existing FTP server that runs on a different box that OpenAS2, but I'm just going to spin up FTP on the same box that I run OpenAS2 on.

However, it is unclear why you would not simply run OpenAS2 on a server that is the ultimate destination of the file since OpenAS2 is in effect an FTP server with added features for security.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants