From 0f3f020751381d3fd76ae16ae752720b3c28ed2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Mun=CC=83oz?= Date: Mon, 11 Nov 2019 23:19:56 -0400 Subject: [PATCH 1/5] Message Queue plugin as Source and Destination of Messages and Tracking --- .gitignore | 3 +- Plugins-MQ/pom.xml | 103 ++++++++ .../openas2/plugins/mq/ConsumerCallback.java | 17 ++ .../openas2/plugins/mq/MQConnector.java | 241 ++++++++++++++++++ .../plugins/mq/MessageBrokerAdapter.java | 31 +++ .../plugins/mq/MessageBuilderModule.java | 167 ++++++++++++ .../plugins/mq/adapters/JmsAdapter.java | 200 +++++++++++++++ .../plugins/mq/adapters/RMQAdapter.java | 206 +++++++++++++++ pom.xml | 1 + 9 files changed, 968 insertions(+), 1 deletion(-) create mode 100644 Plugins-MQ/pom.xml create mode 100644 Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java create mode 100644 Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java create mode 100644 Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java create mode 100644 Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java create mode 100644 Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java create mode 100644 Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java diff --git a/.gitignore b/.gitignore index 3f54611f..240b7dc0 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ .classpath pom.xml.versionsBackup */dist/* -/Bundle/target/ \ No newline at end of file +/Bundle/target/ +/Plugins-MQ/target/ diff --git a/Plugins-MQ/pom.xml b/Plugins-MQ/pom.xml new file mode 100644 index 00000000..c46f6820 --- /dev/null +++ b/Plugins-MQ/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + net.sf.openas2 + OpenAS2 + 2.10.0 + + openas2-plugins-mq + jar + + OpenAS2 Plugins MQ + + MQ Plugin for OpenAS2 + + + + ${project.parent.artifactId}-plugin-mq-${project.version}.zip + UTF-8 + ${project.basedir}/lib + OpenAS2HowTo.pdf + ../docs/${help.filename} + ${project.basedir}/dist + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-antrun-plugin + + + default-cli + + run + + package + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + + + + javax.jms + javax.jms-api + 2.0.1 + jar + + + com.rabbitmq + amqp-client + 5.6.0 + jar + + + commons-io + commons-io + 2.6 + jar + + + commons-logging + commons-logging + 1.2 + jar + + + net.sf.openas2 + openas2-server + ${project.version} + jar + + + + \ No newline at end of file diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java new file mode 100644 index 00000000..1740fd06 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java @@ -0,0 +1,17 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.greicodex.openas2.plugins.mq; + +import java.io.InputStream; +import java.util.Map; + +/** + * + * @author javier + */ +public interface ConsumerCallback { + public void onMessage(Map params,InputStream inputData); +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java new file mode 100644 index 00000000..a71be89a --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java @@ -0,0 +1,241 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.greicodex.openas2.plugins.mq; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.mail.MessagingException; +import org.openas2.OpenAS2Exception; +import org.openas2.Session; +import org.openas2.message.AS2Message; +import org.openas2.message.Message; +import org.openas2.processor.BaseActiveModule; +import org.openas2.processor.msgtracking.TrackingModule; +import org.openas2.processor.resender.ResenderModule; +import org.openas2.processor.storage.StorageModule; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openas2.ComponentNotFoundException; +import org.openas2.message.MessageMDN; +import org.openas2.partner.Partnership; +import org.openas2.processor.sender.SenderModule; + +/** + * + * @author javier + */ +public class MQConnector extends BaseActiveModule implements ResenderModule, TrackingModule, StorageModule, ConsumerCallback { + + public static final String PARAM_JMS_DRIVER = "jms_driver"; + public static final String PARAM_JMS_PARAMS = "jms_parameters"; + public static final String PARAM_JMS_MSG_PRODUCER_TOPIC = "msg_topic"; + public static final String PARAM_JMS_MSG_CONSUMER_QUEUE = "msg_queue"; + public static final String PARAM_JMS_EVT_PRODUCER_TOPIC = "evt_topic"; + + protected String mqBrokerUri; + protected String mqFactoryClass; + protected int resendRetries; + protected String messageReceivedTopic; + protected String messageSendQueue; + protected String eventTopic; + + MessageBrokerAdapter broker; + MessageBuilderModule builder; + + private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); + + protected AS2Message createMessage() { + return new AS2Message(); + } + + @Override + public void handle(String action, Message msg, Map options) throws OpenAS2Exception { + Map headers = new HashMap<>(); + InputStream body = null; + if (msg != null) { + msg.getOptions().forEach((t, u) -> { + String name = "options."+ ((String)t).toLowerCase(); + headers.put(name,(String)u); + }); + msg.getAttributes().forEach((t, u) -> { + String name = "attributes."+ ((String)t).toLowerCase(); + headers.put(name,(String)u); + }); + } + + if (action.equalsIgnoreCase(StorageModule.DO_STORE)) { + try { + headers.put("message.id",msg.getMessageID()); + headers.put("message.class", msg.getClass().getSimpleName()); + headers.put("sender.as2_id", msg.getPartnership().getSenderID(Partnership.PID_AS2)); + headers.put("receiver.as2_id", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + body = msg.getData().getRawInputStream(); + } catch (MessagingException ex) { + logger.error(ex.getMessage(), ex); + } + broker.sendMessage(body, headers); + } else if (action.equalsIgnoreCase(StorageModule.DO_STOREMDN)) { + try { + headers.put("message.id",msg.getMessageID()); + headers.put("message.class", msg.getClass().getSimpleName()); + headers.put("sender.as2_id", msg.getPartnership().getSenderID(Partnership.PID_AS2)); + headers.put("receiver.as2_id", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + headers.put("CorrelationId", msg.getMessageID()); + if (msg.getMDN() != null) { + MessageMDN mdn = msg.getMDN(); + mdn.getAttributes().forEach((t, u) -> { + String name = "attributes."+ ((String)t).toLowerCase(); + headers.put(name,(String)u); + }); + headers.put("message.id",mdn.getMessageID()); + headers.put("message.class", mdn.getClass().getSimpleName()); + headers.put("sender.as2_id", mdn.getPartnership().getSenderID(Partnership.PID_AS2)); + headers.put("receiver.as2_id", mdn.getPartnership().getReceiverID(Partnership.PID_AS2)); + body = new ByteArrayInputStream(mdn.getText().getBytes()); + } + } catch (NullPointerException ex) { + logger.error(ex.getMessage(), ex); + } + broker.sendMessage(body, headers); + } else if (action.equalsIgnoreCase(TrackingModule.DO_TRACK_MSG)) { + body = new ByteArrayInputStream(options.toString().getBytes()); + broker.sendEvent(body, headers); + } + + if (action != null) { + logger.info(action); + } + if (msg != null) { + logger.info(msg); + } + if (options != null) { + logger.info(options); + } + } + + @Override + public boolean canHandle(String action, Message msg, Map options) { + if (action.equalsIgnoreCase(StorageModule.DO_STORE)) { + return true; + } else if (action.equalsIgnoreCase(StorageModule.DO_STOREMDN)) { + return true; + } else if (action.equalsIgnoreCase(ResenderModule.DO_RESEND)) { + return false; + } else if (action.equalsIgnoreCase(ResenderModule.DO_RESENDMDN)) { + return false; + } else if (action.equalsIgnoreCase(TrackingModule.DO_TRACK_MSG)) { + return true; + } else { + return super.canHandle(action, msg, options); + } + } + + @Override + public void destroy() throws Exception { + super.destroy(); + + } + + protected void initMQ(Map parameters) throws OpenAS2Exception { + + try { + mqFactoryClass = getParameter(MQConnector.PARAM_JMS_DRIVER, true); + messageReceivedTopic = getParameter(MQConnector.PARAM_JMS_MSG_PRODUCER_TOPIC, true); + messageSendQueue = getParameter(MQConnector.PARAM_JMS_MSG_CONSUMER_QUEUE, true); + eventTopic = getParameter(MQConnector.PARAM_JMS_EVT_PRODUCER_TOPIC, true); + + broker = (MessageBrokerAdapter) Class.forName(mqFactoryClass).newInstance(); + broker.connect(parameters); + broker.createEventMQInterface(eventTopic); + broker.createMessageMQInterface(messageReceivedTopic, messageSendQueue, this); + } catch (Exception ex) { + logger.error("Unable to create broker interface", ex); + throw new OpenAS2Exception(ex); + } + + } + + @Override + public void init(Session session, Map parameters) throws OpenAS2Exception { + + super.init(session, parameters); + logger.info("Initializing connector"); + initMQ(parameters); + + builder = new MessageBuilderModule() { + @Override + protected Message createMessage() { + return new AS2Message(); + } + + @Override + public void doStart() throws OpenAS2Exception { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void doStop() throws OpenAS2Exception { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public boolean healthcheck(List failures) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + }; + builder.init(session, parameters); + + } + + @Override + public void doStart() throws OpenAS2Exception { + try { + broker.start(); + } catch (Exception ex) { + logger.error("Error starting up JMS listener", ex); + throw new OpenAS2Exception(ex); + } + } + + @Override + public void doStop() throws OpenAS2Exception { + try { + broker.stop(); + } catch (Exception ex) { + logger.error("Error stopping up JMS listener", ex); + throw new OpenAS2Exception(ex); + } + } + + @Override + public boolean healthcheck(List failures) { + return true; + } + + @Override + public void onMessage(Map headers, InputStream inputData) { + Message message = null; + try { + message = builder.buildMessageMetadata(headers); + Map options = new HashMap<>(); + builder.buildMessageData(message, inputData); + + this.getSession().getProcessor().handle(SenderModule.DO_SEND, message, options); + } catch (ComponentNotFoundException ex) { + logger.error(ex.getMessage(), ex); + return; + } catch (OpenAS2Exception ex) { + //TODO implement trackmsg failures TrackingModule.DO_TRACK_MSG + logger.error(ex.getMessage(), ex); + return; + } + } + +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java new file mode 100644 index 00000000..1918809e --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java @@ -0,0 +1,31 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.greicodex.openas2.plugins.mq; + +import java.io.InputStream; +import java.util.Map; + +/** + * + * @author javier + */ +public interface MessageBrokerAdapter { + + void connect(Map parameters); + + void createEventMQInterface(String eventTopic) throws RuntimeException; + + void createMessageMQInterface(String messageReceivedTopic, String messageSendQueue, ConsumerCallback callback) throws RuntimeException; + + void start() throws RuntimeException; + + void stop() throws RuntimeException; + + public void sendMessage(InputStream rawInputStream, Map headers); + + public void sendEvent(InputStream rawInputStream, Map headers); + +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java new file mode 100644 index 00000000..a4c689e5 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBuilderModule.java @@ -0,0 +1,167 @@ +package com.greicodex.openas2.plugins.mq; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.activation.DataHandler; +import javax.mail.MessagingException; +import javax.mail.internet.MimeBodyPart; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openas2.OpenAS2Exception; +import org.openas2.Session; +import org.openas2.WrappedException; +import org.openas2.message.FileAttribute; +import org.openas2.message.Message; +import org.openas2.params.CompositeParameters; +import org.openas2.params.DateParameters; +import org.openas2.params.InvalidParameterException; +import org.openas2.params.MessageParameters; +import org.openas2.params.ParameterParser; +import org.openas2.params.RandomParameters; +import org.openas2.partner.Partnership; +import org.openas2.processor.receiver.BaseReceiverModule; +import org.openas2.util.AS2Util; + +public abstract class MessageBuilderModule extends BaseReceiverModule { + + public static final String PARAM_ERROR_DIRECTORY = "errordir"; + public static final String PARAM_ERROR_FILENAME = "stored_error_filename"; + public static final String PARAM_SENT_DIRECTORY = "sentdir"; + public static final String PARAM_SENT_FILENAME = "stored_sent_filename"; + + public static final String PARAM_FORMAT = "format"; + public static final String PARAM_DELIMITERS = "delimiters"; + public static final String PARAM_MERGE_EXTRA = "mergeextratokens"; + public static final String PARAM_DEFAULTS = "defaults"; + public static final String PARAM_MIMETYPE = "mimetype"; + public static final String PARAM_RESEND_MAX_RETRIES = "resend_max_retries"; + + private Log logger = LogFactory.getLog(MessageBuilderModule.class.getSimpleName()); + + public void init(Session session, Map options) throws OpenAS2Exception { + super.init(session, options); + } + + protected CompositeParameters createParser(Message msg) { + return new CompositeParameters(false).add("date", new DateParameters()).add("rand", new RandomParameters()).add("msg", new MessageParameters(msg)); + } + + protected abstract Message createMessage(); + + public Message buildMessageMetadata(Map headers) throws OpenAS2Exception { + Message msg = createMessage(); + msg.setAttribute(FileAttribute.MA_FILENAME, headers.getOrDefault(FileAttribute.MA_FILENAME, FileAttribute.MA_FILENAME)); + msg.setPayloadFilename(headers.getOrDefault(FileAttribute.MA_FILENAME, FileAttribute.MA_FILENAME)); + MessageParameters params = new MessageParameters(msg); + + // Get the parameter that should provide the link between the polled directory + // and an AS2 sender and recipient + String defaults = getParameter(PARAM_DEFAULTS, false); + // Link the file to an AS2 sender and recipient via the Message object + // associated with the file + if (defaults != null) { + params.setParameters(defaults); + } + + headers.forEach((t, u) -> { + try { + params.setParameter(t, u); + } catch (InvalidParameterException ex) { + Logger.getLogger(MessageBuilderModule.class.getName()).log(Level.SEVERE, null, ex); + } + }); + + // Should have sender/receiver now so update the message's partnership with any + // stored information based on the identified partner IDs + getSession().getPartnershipFactory().updatePartnership(msg, true); + msg.updateMessageID(); + // Set the sender and receiver in the Message object headers + msg.setHeader("AS2-To", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + msg.setHeader("AS2-From", msg.getPartnership().getSenderID(Partnership.PID_AS2)); + // Now build the filename since it is by default dependent on having sender and + // receiver ID + String pendingFile = AS2Util.buildPendingFileName(msg, getSession().getProcessor(), "pendingmdn"); + msg.setAttribute(FileAttribute.MA_PENDINGFILE, pendingFile); + CompositeParameters parser = new CompositeParameters(false).add("date", new DateParameters()) + .add("msg", new MessageParameters(msg)).add("rand", new RandomParameters()); + msg.setAttribute(FileAttribute.MA_ERROR_DIR, ParameterParser.parse(getParameter(PARAM_ERROR_DIRECTORY, true), parser)); + msg.setAttribute(FileAttribute.MA_ERROR_FILENAME, getParameter(PARAM_ERROR_FILENAME, false)); + if (getParameter(PARAM_SENT_DIRECTORY, false) != null) { + msg.setAttribute(FileAttribute.MA_SENT_DIR, ParameterParser.parse(getParameter(PARAM_SENT_DIRECTORY, false), parser)); + msg.setAttribute(FileAttribute.MA_SENT_FILENAME, getParameter(PARAM_SENT_FILENAME, false)); + } + return msg; + + } + + public void buildMessageData(Message msg, InputStream inputData) throws OpenAS2Exception { + MessageParameters params = new MessageParameters(msg); + + try { + String contentType=msg.getAttribute(Partnership.PA_CONTENT_TYPE); + + // Allow Content-Type to be overridden at partnership level or as property + if(contentType == null || contentType.length() == 0 ) { + contentType = msg.getPartnership().getAttributeOrProperty(Partnership.PA_CONTENT_TYPE, null); + } + if (contentType == null){ + contentType = getParameter(PARAM_MIMETYPE, false); + } + if (contentType == null) { + contentType = "application/octet-stream"; + } else { + try { + contentType = ParameterParser.parse(contentType, params); + } catch (InvalidParameterException e) { + throw new OpenAS2Exception("Bad content-type" + contentType, e); + } + } + javax.mail.util.ByteArrayDataSource byteSource = new javax.mail.util.ByteArrayDataSource(inputData, contentType); + MimeBodyPart body = new MimeBodyPart(); + body.setDataHandler(new DataHandler(byteSource)); + + // below statement is not filename related, just want to make it + // consist with the parameter "mimetype="application/EDI-X12"" + // defined in config.xml 2007-06-01 + + body.setHeader("Content-Type", contentType); + + // add below statement will tell the receiver to save the filename + // as the one sent by sender. 2007-06-01 + String sendFileName = getParameter("sendfilename", false); + if (sendFileName != null && sendFileName.equals("true")) { + String contentDisposition = "Attachment; filename=\"" + msg.getAttribute(FileAttribute.MA_FILENAME) + + "\""; + body.setHeader("Content-Disposition", contentDisposition); + msg.setContentDisposition(contentDisposition); + } + + msg.setData(body); + } catch (MessagingException me) { + throw new WrappedException(me); + } catch (IOException ioe) { + throw new WrappedException(ioe); + } + + /* + * Not sure it should be set at this level as there is no encoding of the + * content at this point so make it configurable + */ + if (msg.getPartnership().isSetTransferEncodingOnInitialBodyPart()) { + String contentTxfrEncoding = msg.getPartnership().getAttribute(Partnership.PA_CONTENT_TRANSFER_ENCODING); + if (contentTxfrEncoding == null) + contentTxfrEncoding = Session.DEFAULT_CONTENT_TRANSFER_ENCODING; + try { + msg.getData().setHeader("Content-Transfer-Encoding", contentTxfrEncoding); + } catch (MessagingException e) { + throw new OpenAS2Exception("Failed to set content transfer encoding in created MimeBodyPart: " + + org.openas2.logging.Log.getExceptionMsg(e), e); + } + } + } +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java new file mode 100644 index 00000000..35817df8 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java @@ -0,0 +1,200 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.greicodex.openas2.plugins.mq.adapters; + +import com.greicodex.openas2.plugins.mq.MQConnector; +import com.greicodex.openas2.plugins.mq.ConsumerCallback; +import com.greicodex.openas2.plugins.mq.MessageBrokerAdapter; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * @author javier + */ +public class JmsAdapter implements MessageListener, ExceptionListener, MessageBrokerAdapter { + + ConnectionFactory jmsFactory; + Connection jmsConnection; + MessageConsumer jmsMessageConsumer; + MessageProducer jmsMessageProducer; + MessageProducer jmsEventProducer; + JMSContext jmsContext; + javax.jms.Session jmsSession; + java.lang.Class JmsClass; + ConsumerCallback messageCallback; + private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); + + public JmsAdapter(String jmsFactoryClass) throws ClassNotFoundException { + try { + + JmsClass = Class.forName(jmsFactoryClass); + + } catch (ClassNotFoundException | IllegalArgumentException ex) { + logger.error("Unable to create JMS connection factory", ex); + throw ex; + } + + } + + @Override + public void connect(Map parameters) { + java.lang.reflect.Method[] list = JmsClass.getDeclaredMethods(); + logger.info("Creating instance of " + JmsClass.getName()); + try { + jmsFactory = (ConnectionFactory) JmsClass.newInstance(); + } catch (InstantiationException ex) { + logger.error("Error creating JMS instance", ex); + throw new RuntimeException("Error creating JMS instance", ex); + } catch (IllegalAccessException ex) { + logger.error("Error creating JMS instance", ex); + throw new RuntimeException("Error creating JMS instance", ex); + } + logger.info("Configuring instance " + JmsClass.getName()); + for (java.lang.reflect.Method m : list) { + if (!(m.getName().substring(0, 3).equalsIgnoreCase("set") + && m.getParameterCount() == 1 + && m.getParameterTypes()[0].equals(String.class))) { + continue; + } + String paramName = m.getName().substring(3).toLowerCase(); + logger.info("Checking config " + paramName); + if (!parameters.containsKey(paramName)) { + continue; + } + String paramValue = parameters.get(paramName); + logger.info("Setting config " + paramName + "=" + paramValue); + try { + m.invoke(jmsFactory, paramValue); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { + logger.error("Error configuring JMS instance", ex); + } + } + try { + //jmsContext = jmsFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE); + jmsConnection = jmsFactory.createConnection(); + jmsSession = jmsConnection.createSession(true, JMSContext.CLIENT_ACKNOWLEDGE); + jmsConnection.setExceptionListener(this); + } catch (JMSException ex) { + logger.error("Unable to create JMS connection factory", ex); + throw new RuntimeException("Unable to create JMS connection factory", ex); + } + } + + @Override + public void createMessageMQInterface(String messageReceivedTopic, String messageSendQueue, ConsumerCallback callback) throws RuntimeException { + try { + jmsMessageConsumer = (MessageConsumer) jmsSession.createConsumer(jmsSession.createQueue(messageSendQueue)); + jmsMessageProducer = (MessageProducer) jmsSession.createProducer(jmsSession.createTopic(messageReceivedTopic)); + + jmsMessageConsumer.setMessageListener(this); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + public void createEventMQInterface(String eventTopic) throws RuntimeException { + try { + jmsEventProducer = (MessageProducer) jmsSession.createProducer(jmsSession.createTopic(eventTopic)); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + try { + jmsSession.commit(); + } finally { + logger.debug("Messages flushed"); + } + jmsEventProducer = null; + jmsMessageConsumer = null; + jmsMessageConsumer = null; + try { + jmsSession.close(); + } finally { + logger.debug("JMS Session closed"); + } + try { + jmsConnection.close(); + } finally { + logger.debug("JMS Connection closed"); + } + jmsFactory = null; + } + + @Override + public void onMessage(javax.jms.Message message) { + logger.debug(message); + InputStream inputData = new InputStream() { + @Override + public int read() throws IOException { + return 0; + } + } ; + Map params = new HashMap<>(); + messageCallback.onMessage(params, inputData); + + } + + @Override + public void start() throws RuntimeException { + try { + jmsConnection.start(); + javax.jms.BytesMessage msg = jmsSession.createBytesMessage(); + msg.writeBytes("this is a test".getBytes()); + jmsMessageProducer.send(msg); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + public void stop() throws RuntimeException { + try { + jmsConnection.stop(); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + } + + @Override + public void onException(JMSException exception) { + logger.error("MQ Adapter Error", exception); + } + + @Override + public void sendMessage(InputStream rawInputStream, Map headers) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void sendEvent(InputStream rawInputStream, Map headers) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } +} diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java new file mode 100644 index 00000000..694f7861 --- /dev/null +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java @@ -0,0 +1,206 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.greicodex.openas2.plugins.mq.adapters; + +import com.greicodex.openas2.plugins.mq.MQConnector; +import com.greicodex.openas2.plugins.mq.ConsumerCallback; +import com.greicodex.openas2.plugins.mq.MessageBrokerAdapter; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BasicProperties; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.MessageProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.jms.DeliveryMode; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * @author javier + */ +public class RMQAdapter implements MessageBrokerAdapter { + ConnectionFactory factory; + Connection connection; + String eventProducer; + Consumer messageConsumer; + String messageQueue; + String messageProducer; + Channel channel; + private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); + + @Override + public void connect(Map parameters) { + factory = new ConnectionFactory(); + try { + factory.setUri(parameters.get("uri")); + factory.setVirtualHost("/"); + factory.setRequestedHeartbeat(1); + factory.setConnectionTimeout(5000); + factory.setAutomaticRecoveryEnabled(true); + factory.setNetworkRecoveryInterval(5); + factory.setTopologyRecoveryEnabled(true); + } catch (Exception ex) { + logger.error("Error setting uri",ex); + throw new RuntimeException("Error setting uri",ex); + } + try { + connection=factory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException sse) { + logger.error("Connection shutdown",sse); + } + }); + } catch (Exception ex) { + logger.error("Error newConnection",ex); + throw new RuntimeException("Error newConnection",ex); + } + try { + channel=connection.createChannel(); + } catch (Exception ex) { + logger.error("Failed createChannel",ex); + throw new RuntimeException("Error createChannel",ex); + } + + } + + @Override + public void createEventMQInterface(String eventTopic) throws RuntimeException { + try { + channel.exchangeDeclare(eventTopic, BuiltinExchangeType.TOPIC,true); + eventProducer=eventTopic; + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + + } + + @Override + public void createMessageMQInterface(String messageReceivedTopic, String messageSendQueue, ConsumerCallback callback) throws RuntimeException { + try { + channel.exchangeDeclare(messageReceivedTopic, BuiltinExchangeType.TOPIC, true); + messageProducer = messageReceivedTopic; + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + + try { + channel.queueDeclare(messageSendQueue, true, false, false, null); + messageQueue=messageSendQueue; + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException(ex); + } + + messageConsumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + super.handleDelivery(consumerTag, envelope, properties, body); + logger.info("handleDelivery "+envelope.getDeliveryTag()); + Map params=new HashMap<>(); + properties.getHeaders().forEach((t, u) -> { + params.put(t, u.toString()); + }); + InputStream inputData = new ByteArrayInputStream(body); + try { + callback.onMessage(params, inputData); + channel.basicAck(envelope.getDeliveryTag(), true); + }catch(Throwable t) { + logger.error("Failed to process message", t); + channel.basicNack(envelope.getDeliveryTag(), true,false); + } + + } + }; + + } + + @Override + public void start() throws RuntimeException { + try { + channel.basicConsume(messageQueue, messageConsumer); + } catch (IOException ex) { + logger.error("Error starting", ex); + throw new RuntimeException(ex); + } + } + + @Override + public void stop() throws RuntimeException { + try { + channel.abort(); + } catch (IOException ex) { + logger.error("Aborting Channel error",ex); + throw new RuntimeException(ex); + } + connection.abort(); + } + + @Override + public void sendMessage(InputStream rawInputStream, Map headers) { + this.publishOnQueue(messageProducer, rawInputStream, headers); + } + + protected void publishOnQueue(String exchange,InputStream rawInputStream, Map headers) { + String routingKey=""; + String contentEncoding=null; + String contentType=null; + int deliveryMode=DeliveryMode.PERSISTENT; + int priority=0; + Date timestamp=null; + String correlationId=null; + String replyTo=null; + String expiration=null; + String messageId=null; + String type=null; + String userid=null; + String appid=null; + String clusterid=null; + Map amqHeaders = new HashMap<>(); + headers.forEach((t, u) -> { + amqHeaders.put(t, u); + }); + ByteArrayOutputStream result = new ByteArrayOutputStream(); + try { + IOUtils.copyLarge(rawInputStream, result); + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + BasicProperties bp = new AMQP.BasicProperties(contentType, + contentEncoding, amqHeaders, deliveryMode, priority, + correlationId, replyTo, expiration, messageId, timestamp, type, userid, appid, clusterid); + try { + channel.basicPublish(exchange, routingKey, (AMQP.BasicProperties) bp, result.toByteArray()); + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + } + + @Override + public void sendEvent(InputStream rawInputStream, Map headers) { + this.publishOnQueue(eventProducer, rawInputStream, headers); + } +} diff --git a/pom.xml b/pom.xml index 11c270cd..412e62f1 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ Remote Server Bundle + Plugins-MQ From 6ce7695facf563f454d70accf56728df0497b8b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Mun=CC=83oz?= Date: Thu, 14 Nov 2019 16:09:22 -0400 Subject: [PATCH 2/5] JMS clean up, configuration added --- Plugins-MQ/docs/PluginMQHowTo.md | 1 + Plugins-MQ/pom.xml | 5 +- Plugins-MQ/src/config/config_example.xml | 32 +++++++ .../openas2/plugins/mq/MQConnector.java | 30 +++--- .../plugins/mq/adapters/JmsAdapter.java | 91 ++++++++++--------- .../plugins/mq/adapters/RMQAdapter.java | 13 ++- 6 files changed, 107 insertions(+), 65 deletions(-) create mode 100644 Plugins-MQ/docs/PluginMQHowTo.md create mode 100644 Plugins-MQ/src/config/config_example.xml diff --git a/Plugins-MQ/docs/PluginMQHowTo.md b/Plugins-MQ/docs/PluginMQHowTo.md new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/Plugins-MQ/docs/PluginMQHowTo.md @@ -0,0 +1 @@ + diff --git a/Plugins-MQ/pom.xml b/Plugins-MQ/pom.xml index c46f6820..e6b2cc6d 100644 --- a/Plugins-MQ/pom.xml +++ b/Plugins-MQ/pom.xml @@ -18,8 +18,8 @@ ${project.parent.artifactId}-plugin-mq-${project.version}.zip UTF-8 ${project.basedir}/lib - OpenAS2HowTo.pdf - ../docs/${help.filename} + PluginMQHowTo.md + ${project.basedir}/docs/${help.filename} ${project.basedir}/dist @@ -96,7 +96,6 @@ net.sf.openas2 openas2-server ${project.version} - jar diff --git a/Plugins-MQ/src/config/config_example.xml b/Plugins-MQ/src/config/config_example.xml new file mode 100644 index 00000000..a2fc94e6 --- /dev/null +++ b/Plugins-MQ/src/config/config_example.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java index a71be89a..50f2c6d5 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java @@ -33,14 +33,12 @@ */ public class MQConnector extends BaseActiveModule implements ResenderModule, TrackingModule, StorageModule, ConsumerCallback { - public static final String PARAM_JMS_DRIVER = "jms_driver"; - public static final String PARAM_JMS_PARAMS = "jms_parameters"; - public static final String PARAM_JMS_MSG_PRODUCER_TOPIC = "msg_topic"; - public static final String PARAM_JMS_MSG_CONSUMER_QUEUE = "msg_queue"; - public static final String PARAM_JMS_EVT_PRODUCER_TOPIC = "evt_topic"; - - protected String mqBrokerUri; - protected String mqFactoryClass; + public static final String PARAM_MQ_ADAPTER = "adapter"; + public static final String PARAM_MQ_MSG_PRODUCER_TOPIC = "msg_topic"; + public static final String PARAM_MQ_MSG_CONSUMER_QUEUE = "msg_queue"; + public static final String PARAM_MQ_EVT_PRODUCER_TOPIC = "evt_topic"; + + protected String mqAdapterFactoryClass; protected int resendRetries; protected String messageReceivedTopic; protected String messageSendQueue; @@ -146,17 +144,17 @@ public void destroy() throws Exception { protected void initMQ(Map parameters) throws OpenAS2Exception { try { - mqFactoryClass = getParameter(MQConnector.PARAM_JMS_DRIVER, true); - messageReceivedTopic = getParameter(MQConnector.PARAM_JMS_MSG_PRODUCER_TOPIC, true); - messageSendQueue = getParameter(MQConnector.PARAM_JMS_MSG_CONSUMER_QUEUE, true); - eventTopic = getParameter(MQConnector.PARAM_JMS_EVT_PRODUCER_TOPIC, true); + mqAdapterFactoryClass = getParameter(MQConnector.PARAM_MQ_ADAPTER, true); + messageReceivedTopic = getParameter(MQConnector.PARAM_MQ_MSG_PRODUCER_TOPIC, true); + messageSendQueue = getParameter(MQConnector.PARAM_MQ_MSG_CONSUMER_QUEUE, true); + eventTopic = getParameter(MQConnector.PARAM_MQ_EVT_PRODUCER_TOPIC, true); - broker = (MessageBrokerAdapter) Class.forName(mqFactoryClass).newInstance(); + broker = (MessageBrokerAdapter) Class.forName(mqAdapterFactoryClass).newInstance(); broker.connect(parameters); broker.createEventMQInterface(eventTopic); broker.createMessageMQInterface(messageReceivedTopic, messageSendQueue, this); } catch (Exception ex) { - logger.error("Unable to create broker interface", ex); + logger.error("Unable to create message broker adapter: "+mqAdapterFactoryClass, ex); throw new OpenAS2Exception(ex); } @@ -199,7 +197,7 @@ public void doStart() throws OpenAS2Exception { try { broker.start(); } catch (Exception ex) { - logger.error("Error starting up JMS listener", ex); + logger.error("Error starting up MQ listener", ex); throw new OpenAS2Exception(ex); } } @@ -209,7 +207,7 @@ public void doStop() throws OpenAS2Exception { try { broker.stop(); } catch (Exception ex) { - logger.error("Error stopping up JMS listener", ex); + logger.error("Error stopping up MQ listener", ex); throw new OpenAS2Exception(ex); } } diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java index 35817df8..a6f7d9d9 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java @@ -8,21 +8,26 @@ import com.greicodex.openas2.plugins.mq.MQConnector; import com.greicodex.openas2.plugins.mq.ConsumerCallback; import com.greicodex.openas2.plugins.mq.MessageBrokerAdapter; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.ExceptionListener; import javax.jms.JMSContext; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,50 +48,20 @@ public class JmsAdapter implements MessageListener, ExceptionListener, MessageBr ConsumerCallback messageCallback; private Log logger = LogFactory.getLog(MQConnector.class.getSimpleName()); - public JmsAdapter(String jmsFactoryClass) throws ClassNotFoundException { - try { - - JmsClass = Class.forName(jmsFactoryClass); - - } catch (ClassNotFoundException | IllegalArgumentException ex) { - logger.error("Unable to create JMS connection factory", ex); - throw ex; - } - - } - @Override public void connect(Map parameters) { - java.lang.reflect.Method[] list = JmsClass.getDeclaredMethods(); - logger.info("Creating instance of " + JmsClass.getName()); + javax.naming.Context ctx = null; try { - jmsFactory = (ConnectionFactory) JmsClass.newInstance(); - } catch (InstantiationException ex) { - logger.error("Error creating JMS instance", ex); - throw new RuntimeException("Error creating JMS instance", ex); - } catch (IllegalAccessException ex) { - logger.error("Error creating JMS instance", ex); - throw new RuntimeException("Error creating JMS instance", ex); + ctx = new InitialContext(); + } catch (NamingException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException("Failed to initialize JNDI Context", ex); } - logger.info("Configuring instance " + JmsClass.getName()); - for (java.lang.reflect.Method m : list) { - if (!(m.getName().substring(0, 3).equalsIgnoreCase("set") - && m.getParameterCount() == 1 - && m.getParameterTypes()[0].equals(String.class))) { - continue; - } - String paramName = m.getName().substring(3).toLowerCase(); - logger.info("Checking config " + paramName); - if (!parameters.containsKey(paramName)) { - continue; - } - String paramValue = parameters.get(paramName); - logger.info("Setting config " + paramName + "=" + paramValue); - try { - m.invoke(jmsFactory, paramValue); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { - logger.error("Error configuring JMS instance", ex); - } + try { + jmsFactory = (ConnectionFactory) ctx.lookup(parameters.get("jms_factory_jndi")); + } catch (NamingException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + throw new RuntimeException("Unable to lookup JMS connection factory from JNDI: "+parameters.get("jms_factory_jndi"), ex); } try { //jmsContext = jmsFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE); @@ -94,7 +69,7 @@ public void connect(Map parameters) { jmsSession = jmsConnection.createSession(true, JMSContext.CLIENT_ACKNOWLEDGE); jmsConnection.setExceptionListener(this); } catch (JMSException ex) { - logger.error("Unable to create JMS connection factory", ex); + logger.error("Unable to create JMS connection ", ex); throw new RuntimeException("Unable to create JMS connection factory", ex); } } @@ -190,11 +165,39 @@ public void onException(JMSException exception) { @Override public void sendMessage(InputStream rawInputStream, Map headers) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + try { + jmsMessageProducer.send(createJmsMessage(rawInputStream,headers)); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + } } @Override public void sendEvent(InputStream rawInputStream, Map headers) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + try { + jmsEventProducer.send(createJmsMessage(rawInputStream,headers)); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + } + + private Message createJmsMessage(InputStream rawInputStream, Map headers) throws JMSException { + BytesMessage msg = jmsSession.createBytesMessage(); + ByteArrayOutputStream result = new ByteArrayOutputStream(); + try { + IOUtils.copyLarge(rawInputStream, result); + } catch (IOException ex) { + Logger.getLogger(RMQAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + msg.writeBytes(result.toByteArray()); + headers.forEach((t, u) -> { + try { + msg.setStringProperty(t, u); + } catch (JMSException ex) { + Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); + } + }); + return msg; + } } diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java index 694f7861..68ab445c 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java @@ -40,6 +40,8 @@ * @author javier */ public class RMQAdapter implements MessageBrokerAdapter { + final static String RMQ_URI_PARAM="uri"; + final static String RMQ_VHOST_PARAM="virtualhost"; ConnectionFactory factory; Connection connection; String eventProducer; @@ -51,10 +53,16 @@ public class RMQAdapter implements MessageBrokerAdapter { @Override public void connect(Map parameters) { + if(!parameters.containsKey(RMQAdapter.RMQ_URI_PARAM)) { + throw new RuntimeException(RMQAdapter.class.getSimpleName() + " requires parameter: "+RMQAdapter.RMQ_URI_PARAM); + } + if(!parameters.containsKey(RMQAdapter.RMQ_VHOST_PARAM)) { + throw new RuntimeException(RMQAdapter.class.getSimpleName() + " requires parameter: "+RMQAdapter.RMQ_VHOST_PARAM); + } factory = new ConnectionFactory(); try { - factory.setUri(parameters.get("uri")); - factory.setVirtualHost("/"); + factory.setUri(parameters.get(RMQAdapter.RMQ_URI_PARAM)); + factory.setVirtualHost(parameters.get(RMQAdapter.RMQ_VHOST_PARAM)); factory.setRequestedHeartbeat(1); factory.setConnectionTimeout(5000); factory.setAutomaticRecoveryEnabled(true); @@ -65,6 +73,7 @@ public void connect(Map parameters) { throw new RuntimeException("Error setting uri",ex); } try { + logger.info("Creating connection to uri" + parameters.get(RMQAdapter.RMQ_URI_PARAM) + parameters.get(RMQAdapter.RMQ_VHOST_PARAM)); connection=factory.newConnection(); connection.addShutdownListener(new ShutdownListener() { @Override From 7b6c8eb283c1d5b86028267f4eed4c024ebd72b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Mun=CC=83oz?= Date: Thu, 14 Nov 2019 16:28:18 -0400 Subject: [PATCH 3/5] Added documentation --- Plugins-MQ/docs/PluginMQHowTo.md | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/Plugins-MQ/docs/PluginMQHowTo.md b/Plugins-MQ/docs/PluginMQHowTo.md index 8b137891..f7470f18 100644 --- a/Plugins-MQ/docs/PluginMQHowTo.md +++ b/Plugins-MQ/docs/PluginMQHowTo.md @@ -1 +1,38 @@ +# OpenAS2 MQ Plugin +## Overview +This plugin allows OpenAS2 to interconnect with a Message Queue for message exchange +and message tracking events +## Installation +The plugin is enabled by adding the included Jars in the lib folder of your OpenAS2 +installation and appending the module configuration section in the config.xml file +## Mode of Operation +The Module operates by creating 2 Outbound Topic publishing destinations and one +input consuming queue: + + - Messages published into the input consuming queue will get wrapped into an AS2 message +and sent out to the corresponding partner based on the Partnership configuration file + + - AS2 Messages received including MDNs will be published in the Messages Output +publishing Topic + + - Any tracking events from the messages generated from either the AS2 Messages sent/received +will be posted as Event messages in the Event Output Publishing Topic + +### Adapters +The module includes 2 adapters to connect with MQ Brokers: + - A generic JMS implementation of the MQ Broker connection + - A RabbitMQ implementation using the Java Native RabbitMQ client library + +To use either, the corresponding Jar for the Driver library used on the adapter +must be also added to the server's Lib folder. + +Authentication parameters and other details needed by the driver should be added +as attributes on the module declaration on config.xml + +#### JMS Adapter +The JMS adapter uses the following parameters: + - jms_factory_jndi : The JNDI path of the Connection Factory used for JMS Connection +#### RMQ Adapter + - uri: AMQP Connection string to RabbitMQ, including Username/password, Host and Port + - virtualhost: Virtual host to attach From 742e754e42d7f579bfa30d5e81c28ba48f0fcf53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Mun=CC=83oz?= Date: Thu, 14 Nov 2019 16:28:44 -0400 Subject: [PATCH 4/5] Refactor JMS Adapter parameter naming --- .../greicodex/openas2/plugins/mq/adapters/JmsAdapter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java index a6f7d9d9..00403f96 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java @@ -37,6 +37,7 @@ */ public class JmsAdapter implements MessageListener, ExceptionListener, MessageBrokerAdapter { + public final static String JNDI_FACTORY_PARAM ="jms_factory_jndi"; ConnectionFactory jmsFactory; Connection jmsConnection; MessageConsumer jmsMessageConsumer; @@ -58,10 +59,10 @@ public void connect(Map parameters) { throw new RuntimeException("Failed to initialize JNDI Context", ex); } try { - jmsFactory = (ConnectionFactory) ctx.lookup(parameters.get("jms_factory_jndi")); + jmsFactory = (ConnectionFactory) ctx.lookup(parameters.get(JmsAdapter.JNDI_FACTORY_PARAM)); } catch (NamingException ex) { Logger.getLogger(JmsAdapter.class.getName()).log(Level.SEVERE, null, ex); - throw new RuntimeException("Unable to lookup JMS connection factory from JNDI: "+parameters.get("jms_factory_jndi"), ex); + throw new RuntimeException("Unable to lookup JMS connection factory from JNDI: "+parameters.get(JmsAdapter.JNDI_FACTORY_PARAM), ex); } try { //jmsContext = jmsFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE); From eff51cb7065523589b2aa4a7c081c04368c72c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Mun=CC=83oz?= Date: Fri, 15 Nov 2019 16:00:59 -0400 Subject: [PATCH 5/5] Code cleanup --- .../openas2/plugins/mq/ConsumerCallback.java | 5 ---- .../openas2/plugins/mq/MQConnector.java | 29 +++++++------------ .../plugins/mq/MessageBrokerAdapter.java | 5 ---- .../plugins/mq/adapters/JmsAdapter.java | 5 ---- .../plugins/mq/adapters/RMQAdapter.java | 5 ---- 5 files changed, 11 insertions(+), 38 deletions(-) diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java index 1740fd06..e84a8b3f 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/ConsumerCallback.java @@ -1,8 +1,3 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ package com.greicodex.openas2.plugins.mq; import java.io.InputStream; diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java index 50f2c6d5..7435373a 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MQConnector.java @@ -1,8 +1,3 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ package com.greicodex.openas2.plugins.mq; import java.io.ByteArrayInputStream; @@ -70,10 +65,7 @@ public void handle(String action, Message msg, Map options) thro if (action.equalsIgnoreCase(StorageModule.DO_STORE)) { try { - headers.put("message.id",msg.getMessageID()); - headers.put("message.class", msg.getClass().getSimpleName()); - headers.put("sender.as2_id", msg.getPartnership().getSenderID(Partnership.PID_AS2)); - headers.put("receiver.as2_id", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + updateHeaders(msg, headers); body = msg.getData().getRawInputStream(); } catch (MessagingException ex) { logger.error(ex.getMessage(), ex); @@ -81,21 +73,16 @@ public void handle(String action, Message msg, Map options) thro broker.sendMessage(body, headers); } else if (action.equalsIgnoreCase(StorageModule.DO_STOREMDN)) { try { - headers.put("message.id",msg.getMessageID()); - headers.put("message.class", msg.getClass().getSimpleName()); - headers.put("sender.as2_id", msg.getPartnership().getSenderID(Partnership.PID_AS2)); - headers.put("receiver.as2_id", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); - headers.put("CorrelationId", msg.getMessageID()); + updateHeaders(msg, headers); if (msg.getMDN() != null) { MessageMDN mdn = msg.getMDN(); + updateHeaders((Message) mdn, headers); + headers.put("CorrelationId", msg.getMessageID()); + mdn.getAttributes().forEach((t, u) -> { String name = "attributes."+ ((String)t).toLowerCase(); headers.put(name,(String)u); }); - headers.put("message.id",mdn.getMessageID()); - headers.put("message.class", mdn.getClass().getSimpleName()); - headers.put("sender.as2_id", mdn.getPartnership().getSenderID(Partnership.PID_AS2)); - headers.put("receiver.as2_id", mdn.getPartnership().getReceiverID(Partnership.PID_AS2)); body = new ByteArrayInputStream(mdn.getText().getBytes()); } } catch (NullPointerException ex) { @@ -118,6 +105,12 @@ public void handle(String action, Message msg, Map options) thro } } + private void updateHeaders(Message msg, Map headers) { + headers.put("message.id",msg.getMessageID()); + headers.put("message.class", msg.getClass().getSimpleName()); + headers.put("sender.as2_id", msg.getPartnership().getSenderID(Partnership.PID_AS2)); + headers.put("receiver.as2_id", msg.getPartnership().getReceiverID(Partnership.PID_AS2)); + } @Override public boolean canHandle(String action, Message msg, Map options) { if (action.equalsIgnoreCase(StorageModule.DO_STORE)) { diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java index 1918809e..1d60cdd7 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/MessageBrokerAdapter.java @@ -1,8 +1,3 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ package com.greicodex.openas2.plugins.mq; import java.io.InputStream; diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java index 00403f96..feab6459 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/JmsAdapter.java @@ -1,8 +1,3 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ package com.greicodex.openas2.plugins.mq.adapters; import com.greicodex.openas2.plugins.mq.MQConnector; diff --git a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java index 68ab445c..2f7ce5d6 100644 --- a/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java +++ b/Plugins-MQ/src/main/java/com/greicodex/openas2/plugins/mq/adapters/RMQAdapter.java @@ -1,8 +1,3 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ package com.greicodex.openas2.plugins.mq.adapters; import com.greicodex.openas2.plugins.mq.MQConnector;