[jboss-cvs] JBoss Messaging SVN: r6483 - in trunk: examples/jms/large-message and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Apr 17 13:50:33 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-17 13:50:33 -0400 (Fri, 17 Apr 2009)
New Revision: 6483

Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java
Modified:
   trunk/build.xml
   trunk/examples/jms/large-message/readme.html
   trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java
Log:
tweaks on LargeMessage and adding dev-tests target on build.xml 

Modified: trunk/build.xml
===================================================================
--- trunk/build.xml	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/build.xml	2009-04-17 17:50:33 UTC (rev 6483)
@@ -241,6 +241,11 @@
       <ant antfile="build-messaging.xml" target="hudson-tests"/>
    </target>
 
+   <target name="dev-tests" depends="createthirdparty">
+      <ant antfile="build-messaging.xml" target="hudson-tests"/>
+      <ant antfile="build-messaging.xml" target="compile-reports"/>
+   </target>
+
    <target name="tests" depends="createthirdparty">
       <ant antfile="build-messaging.xml" target="tests"/>
       <ant antfile="build-messaging.xml" target="compile-reports"/>

Modified: trunk/examples/jms/large-message/readme.html
===================================================================
--- trunk/examples/jms/large-message/readme.html	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/examples/jms/large-message/readme.html	2009-04-17 17:50:33 UTC (rev 6483)
@@ -77,7 +77,7 @@
          
          BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
          
-         ((JBossMessage)message).setInputStream(bufferedInput);
+         message.setObjectProperty("JMS_JBM_InputStream", bufferedInput);
         </code></pre>
         
         <li>Send the Message.</li>
@@ -149,15 +149,13 @@
 
          BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
          
-         ((JBossMessage)messageReceived).setOutputStream(bufferedOutput);
         </code></pre>
          
-         <li>We wait until the entire message is written before continuing.</li> 
+         <li>This will save the stream and wait until the entire message is written before continuing.</li> 
         <pre><code>
-         ((JBossMessage)messageReceived).waitCompletionOnStream(300000);
+         messageReceived.setObjectProperty("JMS_JBM_SaveStream", bufferedOutput);
         </code></pre>
         
-
         <li>Be sure to close our resources!</li>
 
         <pre>

Modified: trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java	2009-04-17 17:50:33 UTC (rev 6483)
@@ -38,8 +38,6 @@
 import javax.jms.Session;
 import javax.naming.InitialContext;
 
-import org.jboss.messaging.jms.client.JBossMessage;
-
 /**
  * This example demonstrates the ability of JBoss Messaging to send and consume a very large message, much
  * bigger than can fit in RAM.
@@ -111,7 +109,8 @@
          // file, however we could use any InputStream not just a FileInputStream.
          FileInputStream fileInputStream = new FileInputStream(fileInput);
          BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
-         ((JBossMessage)message).setInputStream(bufferedInput);
+         
+         message.setObjectProperty("JMS_JBM_InputStream", bufferedInput);
 
          System.out.println("Sending the huge message.");
 
@@ -178,11 +177,9 @@
 
          BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
 
-         ((JBossMessage)messageReceived).setOutputStream(bufferedOutput);
+         // Step 14. This will save the stream and wait until the entire message is written before continuing.
+         messageReceived.setObjectProperty("JMS_JBM_SaveStream", bufferedOutput);
 
-         // Step 14. We wait until the entire message is written before continuing.
-         ((JBossMessage)messageReceived).waitCompletionOnStream(300000);
-
          fileOutputStream.close();
 
          System.out.println("File streamed to disk. Size of received file on disk is " + outputFile.length());

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-04-17 17:50:33 UTC (rev 6483)
@@ -45,6 +45,7 @@
  * A StorageManager
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-04-17 17:50:33 UTC (rev 6483)
@@ -270,7 +270,7 @@
       }
    }
 
-   // TODO: Optimize this per https://jira.jboss.org/jira/browse/JBMESSAGING-1468
+   // TODO: Optimise this per https://jira.jboss.org/jira/browse/JBMESSAGING-1496
    @Override
    public synchronized ServerMessage copy(final long newID) throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2009-04-17 17:50:33 UTC (rev 6483)
@@ -22,6 +22,7 @@
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -52,7 +53,9 @@
  * @author Hiram Chirino (Cojonudo14 at hotmail.com)
  * @author David Maplesden (David.Maplesden at orion.co.nz)
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a> $Id: JBossMessage.java 3466 2007-12-10 18:44:52Z timfox $
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a> 
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a> 
+ * $Id: JBossMessage.java 3466 2007-12-10 18:44:52Z timfox $
  */
 public class JBossMessage implements javax.jms.Message
 {
@@ -73,7 +76,13 @@
    private static final SimpleString JMS_ = new SimpleString("JMS_");
 
    public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
+   
+   public static final String JMS_JBM_INPUT_STREAM = "JMS_JBM_InputStream";
 
+   public static final String JMS_JBM_OUTPUT_STREAM = "JMS_JBM_OutputStream";
+
+   public static final String JMS_JBM_SAVE_STREAM = "JMS_JBM_SaveStream";
+
    public static final String JMSXGROUPID = "JMSXGroupID";
 
    // Used when bridging a message
@@ -755,6 +764,11 @@
 
    public Object getObjectProperty(final String name) throws JMSException
    {
+      if (JMS_JBM_INPUT_STREAM.equals(name))
+      {
+         return message.getBodyInputStream();
+      }
+      else
       if (JMSXDELIVERYCOUNT.equals(name))
       {
          return String.valueOf(message.getDeliveryCount());
@@ -849,8 +863,28 @@
 
    public void setObjectProperty(final String name, final Object value) throws JMSException
    {
+      
+      if (JMS_JBM_OUTPUT_STREAM.equals(name))
+      {
+         this.setOutputStream((OutputStream)value);
+         return;
+      }
+      else
+      if (JMS_JBM_SAVE_STREAM.equals(name))
+      {
+         this.saveToOutputStream((OutputStream)value);
+         return;
+      }
+      
       checkProperty(name, value);
+      
 
+      if (JMS_JBM_INPUT_STREAM.equals(name))
+      {
+         this.setInputStream((InputStream)value);
+         return;
+      }
+      
       SimpleString key = new SimpleString(name);
 
       if (value instanceof Boolean)
@@ -933,25 +967,64 @@
    }
    
    
-   public void setInputStream(final InputStream input) throws MessagingException
+   public void setInputStream(final InputStream input) throws JMSException
    {
+      checkStream();
+      if (readOnly)
+      {
+         throw new MessageNotWriteableException("Message is read-only");
+      }
+
       message.setBodyInputStream(input);
    }
    
-   
-   public void setOutputStream(final OutputStream output) throws MessagingException
+   public void setOutputStream(final OutputStream output) throws JMSException
    {
-      message.setOutputStream(output);
+      checkStream();
+      if (!readOnly)
+      {
+         throw new IllegalStateException("OutputStream property is only valid on received messages");
+      }
+      
+      try
+      {
+         message.setOutputStream(output);
+      }
+      catch (MessagingException e)
+      {
+         throw JMSExceptionHelper.convertFromMessagingException(e);
+      }
    }
 
-   public void saveToOutputStream(final OutputStream output) throws MessagingException
+   public void saveToOutputStream(final OutputStream output) throws JMSException
    {
-      message.saveToOutputStream(output);
+      checkStream();
+      if (!readOnly)
+      {
+         throw new IllegalStateException("OutputStream property is only valid on received messages");
+      }
+      
+      try
+      {
+         message.saveToOutputStream(output);
+      }
+      catch (MessagingException e)
+      {
+         throw JMSExceptionHelper.convertFromMessagingException(e);
+      }
    }
 
-   public boolean waitCompletionOnStream(long timeWait) throws MessagingException
+   public boolean waitCompletionOnStream(long timeWait) throws JMSException
    {
-      return message.waitOutputStreamCompletion(timeWait);
+      checkStream();
+      try
+      {
+         return message.waitOutputStreamCompletion(timeWait);
+      }
+      catch (MessagingException e)
+      {
+         throw JMSExceptionHelper.convertFromMessagingException(e);
+      }
    }
    
 
@@ -992,6 +1065,14 @@
 
    // Private ------------------------------------------------------------
 
+   private void checkStream() throws JMSException
+   {
+      if (!(message.getType() == JBossBytesMessage.TYPE || message.getType() == JBossStreamMessage.TYPE))
+      {
+         throw new IllegalStateException("LargeMessage streaming is only possible on ByteMessage or StreamMessage");
+      }
+   }
+   
    private void checkProperty(final String name, final Object value) throws JMSException
    {
       if (propertiesReadOnly)

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java	2009-04-17 17:50:33 UTC (rev 6483)
@@ -30,12 +30,12 @@
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
-import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
-import org.jboss.messaging.jms.client.JBossMessage;
 import org.jboss.test.messaging.jms.JMSTestCase;
 
 /**
@@ -68,11 +68,10 @@
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          MessageProducer prod = session.createProducer(queue1);
-         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
          BytesMessage m = session.createBytesMessage();
 
-         ((JBossMessage)m).setInputStream(createFakeLargeStream(1024 * 1024));
+         m.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(1024 * 1024));
 
          prod.send(m);
 
@@ -127,11 +126,10 @@
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          MessageProducer prod = session.createProducer(queue1);
-         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
          BytesMessage m = session.createBytesMessage();
 
-         ((JBossMessage)m).setInputStream(createFakeLargeStream(10));
+         m.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(10));
 
          prod.send(m);
 
@@ -171,11 +169,82 @@
 
    }
 
+   public void testExceptionsOnSettingNonStreaming() throws Exception
+   {
+      Connection conn = null;
 
+      try
+      {
+         conn = cf.createConnection();
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         TextMessage msg = session.createTextMessage();
+
+         try
+         {
+            msg.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(10));
+            fail("Exception was expected");
+         }
+         catch (JMSException e)
+         {
+         }
+
+         msg.setText("hello");
+
+         MessageProducer prod = session.createProducer(queue1);
+
+         prod.send(msg);
+
+         conn.close();
+
+         conn = cf.createConnection();
+
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         conn.start();
+
+         TextMessage rm = (TextMessage)cons.receive(10000);
+
+         try
+         {
+            rm.setObjectProperty("JMS_JBM_OutputStream", new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+                  System.out.println("b = " + b);
+               }
+
+            });
+            fail("Exception was expected");
+         }
+         catch (JMSException e)
+         {
+         }
+
+         
+         assertEquals("hello", rm.getText());
+
+         assertNotNull(rm);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+
+   }
+
    public void testWaitOnOutputStream() throws Exception
    {
       int msgSize = 1024 * 1024;
-      
+
       Connection conn = null;
 
       try
@@ -185,11 +254,10 @@
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          MessageProducer prod = session.createProducer(queue1);
-         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
          BytesMessage m = session.createBytesMessage();
 
-         ((JBossMessage)m).setInputStream(createFakeLargeStream(msgSize));
+         m.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(msgSize));
 
          prod.send(m);
 
@@ -207,13 +275,14 @@
          assertNotNull(rm);
 
          final AtomicLong numberOfBytes = new AtomicLong(0);
-         
+
          final AtomicInteger numberOfErrors = new AtomicInteger(0);
 
          OutputStream out = new OutputStream()
          {
 
             int position = 0;
+
             @Override
             public void write(int b) throws IOException
             {
@@ -224,18 +293,14 @@
                   numberOfErrors.incrementAndGet();
                }
             }
-            
+
          };
 
-         
-         ((JBossMessage)rm).setOutputStream(out);
-         
-         assertTrue(((JBossMessage)rm).waitCompletionOnStream(10000));
-         
+         rm.setObjectProperty("JMS_JBM_SaveStream", out);
+
          assertEquals(msgSize, numberOfBytes.get());
-         
+
          assertEquals(0, numberOfErrors.get());
-         
 
       }
       finally
@@ -251,14 +316,14 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
-   protected  byte getSamplebyte(final long position)
+
+   protected byte getSamplebyte(final long position)
    {
       return (byte)('a' + (position) % ('z' - 'a' + 1));
    }
 
    // Creates a Fake LargeStream without using a real file
-   protected  InputStream createFakeLargeStream(final long size) throws Exception
+   protected InputStream createFakeLargeStream(final long size) throws Exception
    {
       return new InputStream()
       {
@@ -294,8 +359,6 @@
 
    }
 
-   
-
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java	2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java	2009-04-17 17:50:33 UTC (rev 6483)
@@ -1,74 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ClientLargeMessageTest extends ServiceTestBase
-{
-   public final SimpleString addressA = new SimpleString("addressA");
-
-   public final SimpleString queueA = new SimpleString("queueA");
-
-   public void testSendConsumeLargeMessage() throws Exception
-   {
-      MessagingServer server = createServer(false);
-      try
-      {
-         server.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setMinLargeMessageSize(1000);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession recSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = recSession.createConsumer(queueA);
-         recSession.start();
-         ClientMessage message = recSession.createClientMessage(false);
-         byte[] bytes = new byte[3000];
-         message.getBody().writeBytes(bytes);
-         cp.send(message);
-         ClientMessage m = cc.receive(5000);
-         assertNotNull(m);
-         byte[] recBytes = new byte[3000];
-         m.getBody().readBytes(recBytes);
-         assertEqualsByteArrays(bytes, recBytes);
-      }
-      finally
-      {
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-}




More information about the jboss-cvs-commits mailing list