diff --git a/.gitignore b/.gitignore index 3f54611f..240b7dc0 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ .classpath pom.xml.versionsBackup */dist/* -/Bundle/target/ \ No newline at end of file +/Bundle/target/ +/Plugins-MQ/target/ diff --git a/Plugins-MQ/docs/PluginMQHowTo.md b/Plugins-MQ/docs/PluginMQHowTo.md new file mode 100644 index 00000000..f7470f18 --- /dev/null +++ b/Plugins-MQ/docs/PluginMQHowTo.md @@ -0,0 +1,38 @@ +# OpenAS2 MQ Plugin + +## Overview +This plugin allows OpenAS2 to interconnect with a Message Queue for message exchange +and message tracking events +## Installation +The plugin is enabled by adding the included Jars in the lib folder of your OpenAS2 +installation and appending the module configuration section in the config.xml file +## Mode of Operation +The Module operates by creating 2 Outbound Topic publishing destinations and one +input consuming queue: + + - Messages published into the input consuming queue will get wrapped into an AS2 message +and sent out to the corresponding partner based on the Partnership configuration file + + - AS2 Messages received including MDNs will be published in the Messages Output +publishing Topic + + - Any tracking events from the messages generated from either the AS2 Messages sent/received +will be posted as Event messages in the Event Output Publishing Topic + +### Adapters +The module includes 2 adapters to connect with MQ Brokers: + - A generic JMS implementation of the MQ Broker connection + - A RabbitMQ implementation using the Java Native RabbitMQ client library + +To use either, the corresponding Jar for the Driver library used on the adapter +must be also added to the server's Lib folder. + +Authentication parameters and other details needed by the driver should be added +as attributes on the module declaration on config.xml + +#### JMS Adapter +The JMS adapter uses the following parameters: + - jms_factory_jndi : The JNDI path of the Connection Factory used for JMS Connection +#### RMQ Adapter + - uri: AMQP Connection string to RabbitMQ, including Username/password, Host and Port + - virtualhost: Virtual host to attach diff --git a/Plugins-MQ/pom.xml b/Plugins-MQ/pom.xml new file mode 100644 index 00000000..e6b2cc6d --- /dev/null +++ b/Plugins-MQ/pom.xml @@ -0,0 +1,102 @@ + + + 4.0.0 + + net.sf.openas2 + OpenAS2 + 2.10.0 + + openas2-plugins-mq + jar + + OpenAS2 Plugins MQ + + MQ Plugin for OpenAS2 + + + + ${project.parent.artifactId}-plugin-mq-${project.version}.zip + UTF-8 + ${project.basedir}/lib + PluginMQHowTo.md + ${project.basedir}/docs/${help.filename} + ${project.basedir}/dist + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-antrun-plugin + + + default-cli + + run + + package + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + + + + javax.jms + javax.jms-api + 2.0.1 + jar + + + com.rabbitmq + amqp-client + 5.6.0 + jar + + + commons-io + commons-io + 2.6 + jar + + + commons-logging + commons-logging + 1.2 + jar + + + net.sf.openas2 + openas2-server + ${project.version} + + + + \ No newline at end of file diff --git a/Plugins-MQ/src/config/config_example.xml b/Plugins-MQ/src/config/config_example.xml new file mode 100644 index 00000000..a2fc94e6 --- /dev/null +++ b/Plugins-MQ/src/config/config_example.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java new file mode 100644 index 00000000..e84a8b3f --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java @@ -0,0 +1,12 @@ +package com.greicodex.openas2.plugins.mq; + +import java.io.InputStream; +import java.util.Map; + +/** + * + * @author javier + */ +public interface ConsumerCallback { + public void onMessage(Map params,InputStream inputData); +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java new file mode 100644 index 00000000..7435373a --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java @@ -0,0 +1,232 @@ +package com.greicodex.openas2.plugins.mq; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.mail.MessagingException; +import org.openas2.OpenAS2Exception; +import org.openas2.Session; +import org.openas2.message.AS2Message; +import org.openas2.message.Message; +import org.openas2.processor.BaseActiveModule; +import org.openas2.processor.msgtracking.TrackingModule; +import org.openas2.processor.resender.ResenderModule; +import org.openas2.processor.storage.StorageModule; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openas2.ComponentNotFoundException; +import org.openas2.message.MessageMDN; +import org.openas2.partner.Partnership; +import org.openas2.processor.sender.SenderModule; + +/** + * + * @author javier + */ +public class MQConnector extends BaseActiveModule implements ResenderModule, TrackingModule, StorageModule, ConsumerCallback { + + public static final String PARAM_MQ_ADAPTER = "adapter"; + public static final String PARAM_MQ_MSG_PRODUCER_TOPIC = "msg_topic"; + public static final String PARAM_MQ_MSG_CONSUMER_QUEUE = "msg_queue"; + public static final String PARAM_MQ_EVT_PRODUCER_TOPIC = "evt_topic"; + + protected String mqAdapterFactoryClass; + protected int resendRetries; + protected String messageReceivedTopic; + protected String messageSendQueue; + protected String eventTopic; + + MessageBrokerAdapter broker; + MessageBuilderModule builder; + + private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); + + protected AS2Message createMessage() { + return new AS2Message(); + } + + @Override + public void handle(String action, Message msg, Map options) throws OpenAS2Exception { + Map headers = new HashMap<>(); + InputStream body = null; + if (msg != null) { + msg.getOptions().forEach((t, u) -> { + String name = "options."+ ((String)t).toLowerCase(); + headers.put(name,(String)u); + }); + msg.getAttributes().forEach((t, u) -> { + String name = "attributes."+ ((String)t).toLowerCase(); + headers.put(name,(String)u); + }); + } + + if (action.equalsIgnoreCase(StorageModule.DO_STORE)) { + try { + updateHeaders(msg, headers); + body = msg.getData().getRawInputStream(); + } catch (MessagingException ex) { + logger.error(ex.getMessage(), ex); + } + broker.sendMessage(body, headers); + } else if (action.equalsIgnoreCase(StorageModule.DO_STOREMDN)) { + try { + updateHeaders(msg, headers); + if (msg.getMDN() != null) { + MessageMDN mdn = msg.getMDN(); + updateHeaders((Message) mdn, headers); + headers.put("CorrelationId", msg.getMessageID()); + + mdn.getAttributes().forEach((t, u) -> { + String name = "attributes."+ ((String)t).toLowerCase(); + headers.put(name,(String)u); + }); + body = new ByteArrayInputStream(mdn.getText().getBytes()); + } + } catch (NullPointerException ex) { + logger.error(ex.getMessage(), ex); + } + broker.sendMessage(body, headers); + } else if (action.equalsIgnoreCase(TrackingModule.DO_TRACK_MSG)) { + body = new ByteArrayInputStream(options.toString().getBytes()); + broker.sendEvent(body, headers); + } + + if (action != null) { + logger.info(action); + } + if (msg != null) { + logger.info(msg); + } + if (options != null) { + logger.info(options); + } + } + + private void updateHeaders(Message msg, Map headers) { + headers.put("message.id",msg.getMessageID()); + headers.put("message.class", msg.getClass().getSimpleName()); + headers.put("sender.as2_id", msg.getPartnership().getSenderID(Partnership.PID_AS2)); + headers.put("receiver.as2_id", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + } + @Override + public boolean canHandle(String action, Message msg, Map options) { + if (action.equalsIgnoreCase(StorageModule.DO_STORE)) { + return true; + } else if (action.equalsIgnoreCase(StorageModule.DO_STOREMDN)) { + return true; + } else if (action.equalsIgnoreCase(ResenderModule.DO_RESEND)) { + return false; + } else if (action.equalsIgnoreCase(ResenderModule.DO_RESENDMDN)) { + return false; + } else if (action.equalsIgnoreCase(TrackingModule.DO_TRACK_MSG)) { + return true; + } else { + return super.canHandle(action, msg, options); + } + } + + @Override + public void destroy() throws Exception { + super.destroy(); + + } + + protected void initMQ(Map parameters) throws OpenAS2Exception { + + try { + mqAdapterFactoryClass = getParameter(MQConnector.PARAM_MQ_ADAPTER, true); + messageReceivedTopic = getParameter(MQConnector.PARAM_MQ_MSG_PRODUCER_TOPIC, true); + messageSendQueue = getParameter(MQConnector.PARAM_MQ_MSG_CONSUMER_QUEUE, true); + eventTopic = getParameter(MQConnector.PARAM_MQ_EVT_PRODUCER_TOPIC, true); + + broker = (MessageBrokerAdapter) Class.forName(mqAdapterFactoryClass).newInstance(); + broker.connect(parameters); + broker.createEventMQInterface(eventTopic); + broker.createMessageMQInterface(messageReceivedTopic, messageSendQueue, this); + } catch (Exception ex) { + logger.error("Unable to create message broker adapter: "+mqAdapterFactoryClass, ex); + throw new OpenAS2Exception(ex); + } + + } + + @Override + public void init(Session session, Map parameters) throws OpenAS2Exception { + + super.init(session, parameters); + logger.info("Initializing connector"); + initMQ(parameters); + + builder = new MessageBuilderModule() { + @Override + protected Message createMessage() { + return new AS2Message(); + } + + @Override + public void doStart() throws OpenAS2Exception { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void doStop() throws OpenAS2Exception { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public boolean healthcheck(List failures) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + }; + builder.init(session, parameters); + + } + + @Override + public void doStart() throws OpenAS2Exception { + try { + broker.start(); + } catch (Exception ex) { + logger.error("Error starting up MQ listener", ex); + throw new OpenAS2Exception(ex); + } + } + + @Override + public void doStop() throws OpenAS2Exception { + try { + broker.stop(); + } catch (Exception ex) { + logger.error("Error stopping up MQ listener", ex); + throw new OpenAS2Exception(ex); + } + } + + @Override + public boolean healthcheck(List failures) { + return true; + } + + @Override + public void onMessage(Map headers, InputStream inputData) { + Message message = null; + try { + message = builder.buildMessageMetadata(headers); + Map options = new HashMap<>(); + builder.buildMessageData(message, inputData); + + this.getSession().getProcessor().handle(SenderModule.DO_SEND, message, options); + } catch (ComponentNotFoundException ex) { + logger.error(ex.getMessage(), ex); + return; + } catch (OpenAS2Exception ex) { + //TODO implement trackmsg failures TrackingModule.DO_TRACK_MSG + logger.error(ex.getMessage(), ex); + return; + } + } + +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java new file mode 100644 index 00000000..1d60cdd7 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java @@ -0,0 +1,26 @@ +package com.greicodex.openas2.plugins.mq; + +import java.io.InputStream; +import java.util.Map; + +/** + * + * @author javier + */ +public interface MessageBrokerAdapter { + + void connect(Map parameters); + + void createEventMQInterface(String eventTopic) throws RuntimeException; + + void createMessageMQInterface(String messageReceivedTopic, String messageSendQueue, ConsumerCallback callback) throws RuntimeException; + + void start() throws RuntimeException; + + void stop() throws RuntimeException; + + public void sendMessage(InputStream rawInputStream, Map headers); + + public void sendEvent(InputStream rawInputStream, Map headers); + +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java new file mode 100644 index 00000000..a4c689e5 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java @@ -0,0 +1,167 @@ +package com.greicodex.openas2.plugins.mq; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.activation.DataHandler; +import javax.mail.MessagingException; +import javax.mail.internet.MimeBodyPart; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openas2.OpenAS2Exception; +import org.openas2.Session; +import org.openas2.WrappedException; +import org.openas2.message.FileAttribute; +import org.openas2.message.Message; +import org.openas2.params.CompositeParameters; +import org.openas2.params.DateParameters; +import org.openas2.params.InvalidParameterException; +import org.openas2.params.MessageParameters; +import org.openas2.params.ParameterParser; +import org.openas2.params.RandomParameters; +import org.openas2.partner.Partnership; +import org.openas2.processor.receiver.BaseReceiverModule; +import org.openas2.util.AS2Util; + +public abstract class MessageBuilderModule extends BaseReceiverModule { + + public static final String PARAM_ERROR_DIRECTORY = "errordir"; + public static final String PARAM_ERROR_FILENAME = "stored_error_filename"; + public static final String PARAM_SENT_DIRECTORY = "sentdir"; + public static final String PARAM_SENT_FILENAME = "stored_sent_filename"; + + public static final String PARAM_FORMAT = "format"; + public static final String PARAM_DELIMITERS = "delimiters"; + public static final String PARAM_MERGE_EXTRA = "mergeextratokens"; + public static final String PARAM_DEFAULTS = "defaults"; + public static final String PARAM_MIMETYPE = "mimetype"; + public static final String PARAM_RESEND_MAX_RETRIES = "resend_max_retries"; + + private Log logger = LogFactory.getLog(MessageBuilderModule.class.getSimpleName()); + + public void init(Session session, Map options) throws OpenAS2Exception { + super.init(session, options); + } + + protected CompositeParameters createParser(Message msg) { + return new CompositeParameters(false).add("date", new DateParameters()).add("rand", new RandomParameters()).add("msg", new MessageParameters(msg)); + } + + protected abstract Message createMessage(); + + public Message buildMessageMetadata(Map headers) throws OpenAS2Exception { + Message msg = createMessage(); + msg.setAttribute(FileAttribute.MA_FILENAME, headers.getOrDefault(FileAttribute.MA_FILENAME, FileAttribute.MA_FILENAME)); + msg.setPayloadFilename(headers.getOrDefault(FileAttribute.MA_FILENAME, FileAttribute.MA_FILENAME)); + MessageParameters params = new MessageParameters(msg); + + // Get the parameter that should provide the link between the polled directory + // and an AS2 sender and recipient + String defaults = getParameter(PARAM_DEFAULTS, false); + // Link the file to an AS2 sender and recipient via the Message object + // associated with the file + if (defaults != null) { + params.setParameters(defaults); + } + + headers.forEach((t, u) -> { + try { + params.setParameter(t, u); + } catch (InvalidParameterException ex) { + Logger.getLogger(MessageBuilderModule.class.getName()).log(Level.SEVERE, null, ex); + } + }); + + // Should have sender/receiver now so update the message's partnership with any + // stored information based on the identified partner IDs + getSession().getPartnershipFactory().updatePartnership(msg, true); + msg.updateMessageID(); + // Set the sender and receiver in the Message object headers + msg.setHeader("AS2-To", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + msg.setHeader("AS2-From", msg.getPartnership().getSenderID(Partnership.PID_AS2)); + // Now build the filename since it is by default dependent on having sender and + // receiver ID + String pendingFile = AS2Util.buildPendingFileName(msg, getSession().getProcessor(), "pendingmdn"); + msg.setAttribute(FileAttribute.MA_PENDINGFILE, pendingFile); + CompositeParameters parser = new CompositeParameters(false).add("date", new DateParameters()) + .add("msg", new MessageParameters(msg)).add("rand", new RandomParameters()); + msg.setAttribute(FileAttribute.MA_ERROR_DIR, ParameterParser.parse(getParameter(PARAM_ERROR_DIRECTORY, true), parser)); + msg.setAttribute(FileAttribute.MA_ERROR_FILENAME, getParameter(PARAM_ERROR_FILENAME, false)); + if (getParameter(PARAM_SENT_DIRECTORY, false) != null) { + msg.setAttribute(FileAttribute.MA_SENT_DIR, ParameterParser.parse(getParameter(PARAM_SENT_DIRECTORY, false), parser)); + msg.setAttribute(FileAttribute.MA_SENT_FILENAME, getParameter(PARAM_SENT_FILENAME, false)); + } + return msg; + + } + + public void buildMessageData(Message msg, InputStream inputData) throws OpenAS2Exception { + MessageParameters params = new MessageParameters(msg); + + try { + String contentType=msg.getAttribute(Partnership.PA_CONTENT_TYPE); + + // Allow Content-Type to be overridden at partnership level or as property + if(contentType == null || contentType.length() == 0 ) { + contentType = msg.getPartnership().getAttributeOrProperty(Partnership.PA_CONTENT_TYPE, null); + } + if (contentType == null){ + contentType = getParameter(PARAM_MIMETYPE, false); + } + if (contentType == null) { + contentType = "application/octet-stream"; + } else { + try { + contentType = ParameterParser.parse(contentType, params); + } catch (InvalidParameterException e) { + throw new OpenAS2Exception("Bad content-type" + contentType, e); + } + } + javax.mail.util.ByteArrayDataSource byteSource = new javax.mail.util.ByteArrayDataSource(inputData, contentType); + MimeBodyPart body = new MimeBodyPart(); + body.setDataHandler(new DataHandler(byteSource)); + + // below statement is not filename related, just want to make it + // consist with the parameter "mimetype="application/EDI-X12"" + // defined in config.xml 2007-06-01 + + body.setHeader("Content-Type", contentType); + + // add below statement will tell the receiver to save the filename + // as the one sent by sender. 2007-06-01 + String sendFileName = getParameter("sendfilename", false); + if (sendFileName != null && sendFileName.equals("true")) { + String contentDisposition = "Attachment; filename=\"" + msg.getAttribute(FileAttribute.MA_FILENAME) + + "\""; + body.setHeader("Content-Disposition", contentDisposition); + msg.setContentDisposition(contentDisposition); + } + + msg.setData(body); + } catch (MessagingException me) { + throw new WrappedException(me); + } catch (IOException ioe) { + throw new WrappedException(ioe); + } + + /* + * Not sure it should be set at this level as there is no encoding of the + * content at this point so make it configurable + */ + if (msg.getPartnership().isSetTransferEncodingOnInitialBodyPart()) { + String contentTxfrEncoding = msg.getPartnership().getAttribute(Partnership.PA_CONTENT_TRANSFER_ENCODING); + if (contentTxfrEncoding == null) + contentTxfrEncoding = Session.DEFAULT_CONTENT_TRANSFER_ENCODING; + try { + msg.getData().setHeader("Content-Transfer-Encoding", contentTxfrEncoding); + } catch (MessagingException e) { + throw new OpenAS2Exception("Failed to set content transfer encoding in created MimeBodyPart: " + + org.openas2.logging.Log.getExceptionMsg(e), e); + } + } + } +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java new file mode 100644 index 00000000..feab6459 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java @@ -0,0 +1,199 @@ +package com.greicodex.openas2.plugins.mq.adapters; + +import com.greicodex.openas2.plugins.mq.MQConnector; +import com.greicodex.openas2.plugins.mq.ConsumerCallback; +import com.greicodex.openas2.plugins.mq.MessageBrokerAdapter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * @author javier + */ +public class JmsAdapter implements MessageListener, ExceptionListener, MessageBrokerAdapter { + + public final static String JNDI_FACTORY_PARAM ="jms_factory_jndi"; + ConnectionFactory jmsFactory; + Connection jmsConnection; + MessageConsumer jmsMessageConsumer; + MessageProducer jmsMessageProducer; + MessageProducer jmsEventProducer; + JMSContext jmsContext; + javax.jms.Session jmsSession; + java.lang.Class JmsClass; + ConsumerCallback messageCallback; + private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); + + @Override + public void connect(Map parameters) { + javax.naming.Context ctx = null; + try { + ctx = new InitialContext(); + } catch (NamingException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException("Failed to initialize JNDI Context", ex); + } + try { + jmsFactory = (ConnectionFactory) ctx.lookup(parameters.get(JmsAdapter.JNDI_FACTORY_PARAM)); + } catch (NamingException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException("Unable to lookup JMS connection factory from JNDI: "+parameters.get(JmsAdapter.JNDI_FACTORY_PARAM), ex); + } + try { + //jmsContext = jmsFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE); + jmsConnection = jmsFactory.createConnection(); + jmsSession = jmsConnection.createSession(true, JMSContext.CLIENT_ACKNOWLEDGE); + jmsConnection.setExceptionListener(this); + } catch (JMSException ex) { + logger.error("Unable to create JMS connection ", ex); + throw new RuntimeException("Unable to create JMS connection factory", ex); + } + } + + @Override + public void createMessageMQInterface(String messageReceivedTopic, String messageSendQueue, ConsumerCallback callback) throws RuntimeException { + try { + jmsMessageConsumer = (MessageConsumer) jmsSession.createConsumer(jmsSession.createQueue(messageSendQueue)); + jmsMessageProducer = (MessageProducer) jmsSession.createProducer(jmsSession.createTopic(messageReceivedTopic)); + + jmsMessageConsumer.setMessageListener(this); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + public void createEventMQInterface(String eventTopic) throws RuntimeException { + try { + jmsEventProducer = (MessageProducer) jmsSession.createProducer(jmsSession.createTopic(eventTopic)); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + try { + jmsSession.commit(); + } finally { + logger.debug("Messages flushed"); + } + jmsEventProducer = null; + jmsMessageConsumer = null; + jmsMessageConsumer = null; + try { + jmsSession.close(); + } finally { + logger.debug("JMS Session closed"); + } + try { + jmsConnection.close(); + } finally { + logger.debug("JMS Connection closed"); + } + jmsFactory = null; + } + + @Override + public void onMessage(javax.jms.Message message) { + logger.debug(message); + InputStream inputData = new InputStream() { + @Override + public int read() throws IOException { + return 0; + } + } ; + Map params = new HashMap<>(); + messageCallback.onMessage(params, inputData); + + } + + @Override + public void start() throws RuntimeException { + try { + jmsConnection.start(); + javax.jms.BytesMessage msg = jmsSession.createBytesMessage(); + msg.writeBytes("this is a test".getBytes()); + jmsMessageProducer.send(msg); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + public void stop() throws RuntimeException { + try { + jmsConnection.stop(); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + public void onException(JMSException exception) { + logger.error("MQ Adapter Error", exception); + } + + @Override + public void sendMessage(InputStream rawInputStream, Map headers) { + try { + jmsMessageProducer.send(createJmsMessage(rawInputStream,headers)); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + } + + @Override + public void sendEvent(InputStream rawInputStream, Map headers) { + try { + jmsEventProducer.send(createJmsMessage(rawInputStream,headers)); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + } + + private Message createJmsMessage(InputStream rawInputStream, Map headers) throws JMSException { + BytesMessage msg = jmsSession.createBytesMessage(); + ByteArrayOutputStream result = new ByteArrayOutputStream(); + try { + IOUtils.copyLarge(rawInputStream, result); + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + msg.writeBytes(result.toByteArray()); + headers.forEach((t, u) -> { + try { + msg.setStringProperty(t, u); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + }); + return msg; + + } +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java new file mode 100644 index 00000000..2f7ce5d6 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java @@ -0,0 +1,210 @@ +package com.greicodex.openas2.plugins.mq.adapters; + +import com.greicodex.openas2.plugins.mq.MQConnector; +import com.greicodex.openas2.plugins.mq.ConsumerCallback; +import com.greicodex.openas2.plugins.mq.MessageBrokerAdapter; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BasicProperties; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.MessageProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.jms.DeliveryMode; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * @author javier + */ +public class RMQAdapter implements MessageBrokerAdapter { + final static String RMQ_URI_PARAM="uri"; + final static String RMQ_VHOST_PARAM="virtualhost"; + ConnectionFactory factory; + Connection connection; + String eventProducer; + Consumer messageConsumer; + String messageQueue; + String messageProducer; + Channel channel; + private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); + + @Override + public void connect(Map parameters) { + if(!parameters.containsKey(RMQAdapter.RMQ_URI_PARAM)) { + throw new RuntimeException(RMQAdapter.class.getSimpleName() + " requires parameter: "+RMQAdapter.RMQ_URI_PARAM); + } + if(!parameters.containsKey(RMQAdapter.RMQ_VHOST_PARAM)) { + throw new RuntimeException(RMQAdapter.class.getSimpleName() + " requires parameter: "+RMQAdapter.RMQ_VHOST_PARAM); + } + factory = new ConnectionFactory(); + try { + factory.setUri(parameters.get(RMQAdapter.RMQ_URI_PARAM)); + factory.setVirtualHost(parameters.get(RMQAdapter.RMQ_VHOST_PARAM)); + factory.setRequestedHeartbeat(1); + factory.setConnectionTimeout(5000); + factory.setAutomaticRecoveryEnabled(true); + factory.setNetworkRecoveryInterval(5); + factory.setTopologyRecoveryEnabled(true); + } catch (Exception ex) { + logger.error("Error setting uri",ex); + throw new RuntimeException("Error setting uri",ex); + } + try { + logger.info("Creating connection to uri" + parameters.get(RMQAdapter.RMQ_URI_PARAM) + parameters.get(RMQAdapter.RMQ_VHOST_PARAM)); + connection=factory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException sse) { + logger.error("Connection shutdown",sse); + } + }); + } catch (Exception ex) { + logger.error("Error newConnection",ex); + throw new RuntimeException("Error newConnection",ex); + } + try { + channel=connection.createChannel(); + } catch (Exception ex) { + logger.error("Failed createChannel",ex); + throw new RuntimeException("Error createChannel",ex); + } + + } + + @Override + public void createEventMQInterface(String eventTopic) throws RuntimeException { + try { + channel.exchangeDeclare(eventTopic, BuiltinExchangeType.TOPIC,true); + eventProducer=eventTopic; + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + + } + + @Override + public void createMessageMQInterface(String messageReceivedTopic, String messageSendQueue, ConsumerCallback callback) throws RuntimeException { + try { + channel.exchangeDeclare(messageReceivedTopic, BuiltinExchangeType.TOPIC, true); + messageProducer = messageReceivedTopic; + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + + try { + channel.queueDeclare(messageSendQueue, true, false, false, null); + messageQueue=messageSendQueue; + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + + messageConsumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + super.handleDelivery(consumerTag, envelope, properties, body); + logger.info("handleDelivery "+envelope.getDeliveryTag()); + Map params=new HashMap<>(); + properties.getHeaders().forEach((t, u) -> { + params.put(t, u.toString()); + }); + InputStream inputData = new ByteArrayInputStream(body); + try { + callback.onMessage(params, inputData); + channel.basicAck(envelope.getDeliveryTag(), true); + }catch(Throwable t) { + logger.error("Failed to process message", t); + channel.basicNack(envelope.getDeliveryTag(), true,false); + } + + } + }; + + } + + @Override + public void start() throws RuntimeException { + try { + channel.basicConsume(messageQueue, messageConsumer); + } catch (IOException ex) { + logger.error("Error starting", ex); + throw new RuntimeException(ex); + } + } + + @Override + public void stop() throws RuntimeException { + try { + channel.abort(); + } catch (IOException ex) { + logger.error("Aborting Channel error",ex); + throw new RuntimeException(ex); + } + connection.abort(); + } + + @Override + public void sendMessage(InputStream rawInputStream, Map headers) { + this.publishOnQueue(messageProducer, rawInputStream, headers); + } + + protected void publishOnQueue(String exchange,InputStream rawInputStream, Map headers) { + String routingKey=""; + String contentEncoding=null; + String contentType=null; + int deliveryMode=DeliveryMode.PERSISTENT; + int priority=0; + Date timestamp=null; + String correlationId=null; + String replyTo=null; + String expiration=null; + String messageId=null; + String type=null; + String userid=null; + String appid=null; + String clusterid=null; + Map amqHeaders = new HashMap<>(); + headers.forEach((t, u) -> { + amqHeaders.put(t, u); + }); + ByteArrayOutputStream result = new ByteArrayOutputStream(); + try { + IOUtils.copyLarge(rawInputStream, result); + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + BasicProperties bp = new AMQP.BasicProperties(contentType, + contentEncoding, amqHeaders, deliveryMode, priority, + correlationId, replyTo, expiration, messageId, timestamp, type, userid, appid, clusterid); + try { + channel.basicPublish(exchange, routingKey, (AMQP.BasicProperties) bp, result.toByteArray()); + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + } + + @Override + public void sendEvent(InputStream rawInputStream, Map headers) { + this.publishOnQueue(eventProducer, rawInputStream, headers); + } +} diff --git a/pom.xml b/pom.xml index 11c270cd..412e62f1 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ Remote Server Bundle + Plugins-MQ