[jboss-svn-commits] JBL Code SVN: r19609 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb/notification and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Apr 17 16:59:57 EDT 2008


Author: mark.little at jboss.com
Date: 2008-04-17 16:59:57 -0400 (Thu, 17 Apr 2008)
New Revision: 19609

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTcp.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTcpUnitTest.java
Modified:
   labs/jbossesb/trunk/product/docs/MessageActionGuide.odt
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyConsole.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1143

Modified: labs/jbossesb/trunk/product/docs/MessageActionGuide.odt
===================================================================
(Binary files differ)

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyConsole.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyConsole.java	2008-04-17 16:21:20 UTC (rev 19608)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyConsole.java	2008-04-17 20:59:57 UTC (rev 19609)
@@ -27,13 +27,10 @@
 import org.jboss.soa.esb.message.MessagePayloadProxy;
 import org.jboss.soa.esb.message.body.content.BytesBody;
 import org.jboss.soa.esb.util.Util;
-import org.jboss.soa.esb.actions.ActionUtils;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
-import org.apache.log4j.Logger;
 
 public class NotifyConsole extends NotificationTarget 
 {
-    private static Logger logger = Logger.getLogger(NotifyConsole.class);
     private MessagePayloadProxy payloadProxy;
 
     public NotifyConsole(ConfigTree tree)

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTcp.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTcp.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTcp.java	2008-04-17 20:59:57 UTC (rev 19609)
@@ -0,0 +1,219 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.notification;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.MessagePayloadProxy;
+import org.jboss.soa.esb.message.body.content.BytesBody;
+import org.jboss.soa.esb.message.format.MessageType;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Send message via TCP. Each connection is maintained only for the
+ * duration of the notification.
+ */
+
+public class NotifyTcp extends NotificationTarget
+{
+    private Logger log = Logger.getLogger(NotifyTcp.class);
+
+    public static final String DESTINATIONS_TAG = "destination";
+    /**
+     * The URI for the destination (socket://host:port)
+     */
+    
+    public static final String ATT_URI = "URI";
+    
+    public static final String PROTOCOL_TAG = "tcp";
+
+    public static final int DEFAULT_PORT = 9090;  // default port if none is specified.
+    
+    /**
+     * Instantiate a NotifyFiles object according to contents of <arg 1>
+     * 
+     * @param configTree
+     *            ConfigTree - Should contain a nonempty set of child elements
+     *            with elementName="file". Each child element must have a "URI"
+     *            attribute and optionally a "append" element
+     */
+    public NotifyTcp(ConfigTree configTree)
+    {
+        super(configTree);
+
+        setDestinations(configTree.getChildren(DESTINATIONS_TAG));
+        
+        payloadProxy = new MessagePayloadProxy(configTree, new String[]
+        { BytesBody.BYTES_LOCATION }, new String[]
+        { BytesBody.BYTES_LOCATION });
+    }
+
+    /**
+     * Sends the result of the message to each of the destinations.
+     */
+    
+    public void sendNotification (Message message) throws NotificationException
+    {
+        final StringBuilder exceptions = new StringBuilder();
+        
+        for (URI destination : _destinations)
+        {
+            Socket sender = null;
+            
+            try
+            {
+                int port = destination.getPort();
+                
+                if (port == -1)
+                {
+                    port = DEFAULT_PORT;
+                    
+                    _logger.debug("No port specified for "+destination+" so will use default "+DEFAULT_PORT);
+                }
+                
+                sender = new Socket(destination.getHost(), port);
+                
+                if (MessageType.JAVA_SERIALIZED.equals(message.getType()))
+                {
+                    objectNotification(new ObjectOutputStream(sender.getOutputStream()), payloadProxy
+                            .getPayload(message));
+                }
+                else
+                {
+                    Object obj = payloadProxy.getPayload(message);
+                    String content = null;
+                    
+                    if (obj instanceof byte[])
+                    {
+                        content = new String((byte[]) obj);
+                    }
+                    else
+                    {
+                        content = obj.toString();
+                    }
+                    
+                    stringNotification(sender.getOutputStream(), content);
+                }
+            }
+            catch (IOException e)
+            {
+                handleException(destination, e, exceptions);
+            }
+            catch (MessageDeliverException e)
+            {
+                handleException(destination, e, exceptions);
+            }
+            finally
+            {
+                try
+                {
+                    if (sender != null)
+                        sender.close();
+                }
+                catch (IOException eCl)
+                {
+                    log.error("IOException while closing socket: ", eCl);
+                }
+            }
+        }
+        
+        if (exceptions.length() > 0)
+            throw new NotificationException(exceptions.toString());
+    } // __________________________________
+
+    private void handleException (URI destination,
+            Exception e, StringBuilder exceptions)
+    {
+        final String msg = "[Exception while notifying destination : "
+                + destination;
+        
+        log.error(msg, e);
+        
+        exceptions.append(NotifyUtil.createExceptionErrorString(msg, e));
+    }
+
+    protected void stringNotification (OutputStream sender, String p_s)
+            throws IOException
+    {
+        sender.write(p_s.getBytes());
+        if (!p_s.endsWith("\n"))
+            sender.write("\n".getBytes());
+        sender.flush();
+        sender.close();
+    } // __________________________________
+
+    protected void objectNotification (ObjectOutputStream sender, Object p_o)
+            throws IOException
+    {
+        sender.writeObject(p_o);
+        sender.flush();
+        sender.close();
+    }
+
+    private void setDestinations (ConfigTree[] dests)
+    {
+        if (dests != null)
+        {
+            _destinations = new URI[dests.length];
+            
+            for (int i = 0; i < dests.length; i++)
+            {
+                try
+                {
+                    _destinations[i] = new URI(dests[i].getAttribute(ATT_URI));
+                    
+                    if (!_destinations[i].getScheme().equals(PROTOCOL_TAG))
+                    {
+                        throw new IllegalArgumentException("Destination is not a tcp socket! "+_destinations[i]);
+                    }
+                }
+                catch (URISyntaxException ex)
+                {
+                    _logger.warn("Could not create notify destination: ", ex);
+                    
+                    throw new IllegalArgumentException("Could not create notify destination", ex);
+                }
+                catch (Exception ex)
+                {
+                    _logger.warn("Problem creating notify destination: ", ex);
+                    
+                    throw new IllegalArgumentException("Problem creating notify destination", ex);
+                }
+            }
+        }
+    }
+    
+    private URI[] _destinations = null;
+
+    private MessagePayloadProxy payloadProxy;
+    
+    private Logger _logger = Logger.getLogger( NotifyTcp.class );
+}
\ No newline at end of file

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTcpUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTcpUnitTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTcpUnitTest.java	2008-04-17 20:59:57 UTC (rev 19609)
@@ -0,0 +1,166 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.notification;
+
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import junit.framework.TestCase;
+
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+/**
+ * NotifyTcp unit tests.
+ */
+
+public class NotifyTcpUnitTest extends TestCase
+{
+    public static final String DATA = "Hello World";
+ 
+    public void testBadURI () throws Exception
+    {
+        ConfigTree rootEl = new ConfigTree("notif");
+        ConfigTree tcpEl = new ConfigTree("destination", rootEl);
+
+        try
+        {
+            new NotifyTcp(rootEl);
+            fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // Expected
+        }
+        
+        tcpEl.setAttribute("URI", "udp://foo:8080");
+
+        try
+        {
+            new NotifyTcp(rootEl);
+            fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // Expected
+        }
+    }
+    
+    public void testValidURI () throws Exception
+    {
+        ConfigTree rootEl = new ConfigTree("notif");
+        ConfigTree tcpEl = new ConfigTree("destination", rootEl);
+        
+        tcpEl.setAttribute("URI", "tcp://localhost:9090");
+        
+        try
+        {
+            new NotifyTcp(rootEl);
+        }
+        catch (IllegalArgumentException e)
+        {
+            fail();
+        }
+    }
+
+    public void testSend () throws Exception
+    {
+        try
+        {
+            ServerSocket sock = new ServerSocket(0);     
+            int port = sock.getLocalPort();
+            Sender sender = new Sender(port);
+            
+            sender.start();
+            
+            Socket receiver = sock.accept();
+
+            InputStream stream = receiver.getInputStream();
+            
+            while (stream.available() == 0)
+            {
+                Thread.yield();
+            }
+            
+            if (stream.available() != DATA.length())
+            {
+                System.err.println("Invalid stream size: "+stream.available());
+                
+                fail();
+            }
+            else
+            {
+                byte[] data = new byte[stream.available()];
+                
+                stream.read(data);
+                
+                String receivedString = new String(data);
+                
+                if (!receivedString.equals(DATA))
+                {
+                    System.err.println("Did not receive correct data: "+receivedString);
+                    
+                    fail();
+                }
+            }        
+        }
+        catch (IllegalArgumentException e)
+        {
+            fail();
+        }
+    }
+
+    class Sender extends Thread
+    {
+        public Sender (int s)
+        {
+            _port = s;
+        }
+        
+        public void run ()
+        {
+            try
+            {
+                ConfigTree rootEl = new ConfigTree("notif");
+                ConfigTree tcpEl = new ConfigTree("destination", rootEl);
+                
+                tcpEl.setAttribute("URI", "tcp://localhost:"+_port);
+                
+                Message msg = MessageFactory.getInstance().getMessage();
+                
+                msg.getBody().add(DATA);
+
+                NotifyTcp test = new NotifyTcp(rootEl);
+
+                test.sendNotification(msg);
+            }
+            catch (Throwable ex)
+            {
+                ex.printStackTrace();
+            }
+        }
+
+        private int _port;
+    }
+}




More information about the jboss-svn-commits mailing list