[hornetq-commits] JBoss hornetq SVN: r10004 - in trunk: src/main/org/hornetq/core/client/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 6 23:22:51 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-06 23:22:51 -0500 (Mon, 06 Dec 2010)
New Revision: 10004

Removed:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
Modified:
   trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
   trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
   trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
   trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
HORNETQ-448 moving a few properties towards ServerLocator

Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -137,8 +137,4 @@
    ServerLocator getServerLocator();
    
    CoreRemotingConnection getConnection();
-
-   void setCompressLargeMessages(boolean compressLargeMessage);
-   
-   boolean isCompressLargeMessages();
 }

Modified: trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -574,6 +574,10 @@
    void close();
 
    boolean isHA();
+   
+   boolean isCompressLargeMessage();
+   
+   void setCompressLargeMessage(boolean compress);
 
    void addClusterTopologyListener(ClusterTopologyListener listener);
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -73,6 +73,11 @@
       super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
    }
 
+   public boolean isServerMessage()
+   {
+      return false;
+   }
+   
    public void onReceipt(final ClientConsumerInternal consumer)
    {
       this.consumer = consumer;

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -22,6 +22,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
@@ -363,6 +364,12 @@
 
       InputStream input = msgI.getBodyInputStream();
 
+      
+      if (msgI.isServerMessage())
+      {
+         largeMessageSendServer(sendBlocking, msgI, credits);
+      }
+      else
       if (input != null)
       {
          largeMessageSendStreamed(sendBlocking, msgI, input, credits);
@@ -372,7 +379,72 @@
          largeMessageSendBuffered(sendBlocking, msgI, credits);
       }
    }
+   
+   /**
+    * Used to send serverMessages through the bridges.
+    * No need to validate compression here since the message is only compressed at the client
+    * @param sendBlocking
+    * @param msgI
+    * @throws HornetQException
+    */
+   private void largeMessageSendServer(final boolean sendBlocking,
+                                         final MessageInternal msgI,
+                                         final ClientProducerCredits credits) throws HornetQException
+   {
+      BodyEncoder context = msgI.getBodyEncoder();
 
+      final long bodySize = context.getLargeBodySize();
+
+      context.open();
+      try
+      {
+
+         for (int pos = 0; pos < bodySize;)
+         {
+            final boolean lastChunk;
+
+            final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+
+            final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
+
+            context.encode(bodyBuffer, chunkLength);
+
+            pos += chunkLength;
+
+            lastChunk = pos >= bodySize;
+
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+                                                                                                      .array(),
+                                                                                            !lastChunk,
+                                                                                            lastChunk && sendBlocking);
+
+            if (sendBlocking && lastChunk)
+            {
+               // When sending it blocking, only the last chunk will be blocking.
+               channel.sendBlocking(chunk);
+            }
+            else
+            {
+               channel.send(chunk);
+            }
+
+            try
+            {
+               credits.acquireCredits(chunk.getPacketSize());
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+      }
+      finally
+      {
+         context.close();
+      }
+   }
+
+   
+
    /**
     * @param sendBlocking
     * @param msgI

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -147,8 +147,6 @@
    public final Exception e = new Exception();
 
    private final Object waitLock = new Object();
-   
-   private boolean compressLargeMessages;
 
    // Static
    // ---------------------------------------------------------------------------------------
@@ -205,9 +203,7 @@
       closeExecutor = orderedExecutorFactory.getExecutor();
 
       this.interceptors = interceptors;
-      
-      compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
+ 
    }
 
    public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
@@ -773,7 +769,7 @@
                                                                      serverLocator.isBlockOnDurableSend(),
                                                                      serverLocator.isCacheLargeMessagesClient(),
                                                                      serverLocator.getMinLargeMessageSize(),
-                                                                     compressLargeMessages,
+                                                                     serverLocator.isCompressLargeMessage(),
                                                                      serverLocator.getInitialMessagePacketSize(),
                                                                      serverLocator.getGroupID(),
                                                                      connection,
@@ -1364,14 +1360,4 @@
          cancelled = true;
       }
    }
-
-   public void setCompressLargeMessages(boolean compressLargeMessage)
-   {
-      this.compressLargeMessages = compressLargeMessage;
-   }
-
-   public boolean isCompressLargeMessages()
-   {
-      return this.compressLargeMessages;
-   }
 }

Deleted: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-
-/**
- * A ConnectionManager
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 27 Nov 2008 18:45:46
- *
- *
- */
-public interface FailoverManager
-{
-   ClientSession createSession(final String username,
-                               final String password,
-                               final boolean xa,
-                               final boolean autoCommitSends,
-                               final boolean autoCommitAcks,
-                               final boolean preAcknowledge,
-                               final int ackBatchSize,
-                               final boolean cacheLargeMessageClient,
-                               final int minLargeMessageSize,
-                               final boolean compressLargeMessages,
-                               final boolean blockOnAcknowledge,
-                               final boolean autoGroup,
-                               final int confirmationWindowSize,
-                               final int producerWindowSize,
-                               final int consumerWindowSize,
-                               final int producerMaxRate,
-                               final int consumerMaxRate,
-                               final boolean blockOnNonDurableSend,
-                               final boolean blockOnDurableSend,
-                               final int initialMessagePacketSize,
-                               final String groupID) throws HornetQException;
-
-   void removeSession(final ClientSessionInternal session);
-
-   public CoreRemotingConnection getConnection();
-
-   int numConnections();
-
-   int numSessions();
-
-   void addFailureListener(SessionFailureListener listener);
-
-   boolean removeFailureListener(SessionFailureListener listener);
-
-   void causeExit();
-
-}

Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -68,6 +68,8 @@
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
 
    private boolean receivedTopology;
+   
+   private boolean compressLargeMessage;
 
    private ExecutorService threadPool;
 
@@ -359,6 +361,8 @@
       initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
 
       cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+      
+      compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
 
       clusterConnection = false;
    }
@@ -917,6 +921,22 @@
       return groupID;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
+    */
+   public boolean isCompressLargeMessage()
+   {
+      return compressLargeMessage;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
+    */
+   public void setCompressLargeMessage(boolean compress)
+   {
+      this.compressLargeMessage = compress;
+   }
+
    private void checkWrite()
    {
       if (readOnly)

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -43,6 +43,8 @@
    void bodyChanged();
 
    void resetCopied();
+   
+   boolean isServerMessage();
 
    HornetQBuffer getEncodedBuffer();
    

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -89,6 +89,11 @@
    {
       super(other);
    }
+   
+   public boolean isServerMessage()
+   {
+      return true;
+   }
 
    public void setMessageID(final long id)
    {

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -533,6 +533,16 @@
       return serverLocator.getGroupID();
    }
    
+   public boolean isCompressLargeMessage()
+   {
+      return serverLocator.isCompressLargeMessage();
+   }
+   
+   public void setCompressLargeMessage(boolean compress)
+   {
+      serverLocator.setCompressLargeMessage(compress);
+   }
+   
    public void close()
    {
       serverLocator.close();

Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -17,17 +17,19 @@
 import java.util.List;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
 import org.hornetq.jms.server.impl.JMSFactoryType;
 import org.hornetq.utils.BufferHelper;
 import org.hornetq.utils.DataConstants;
 
-import org.hornetq.api.core.SimpleString;;
-
 /**
- * A ConnectionFactoryConfigurationImpl
+ * This class contains the configuration properties of a connection factory.
+ * 
+ * It is also persisted on the journal at the time of management is used to created a connection factory and set to store.
+ * 
+ * Every property on this class has to be also set through encoders through EncodingSupport implementation at this class.
  *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  *
@@ -570,6 +572,8 @@
       reconnectAttempts = buffer.readInt();
 
       failoverOnInitialConnection = buffer.readBoolean();
+      
+      compressLargeMessage = buffer.readBoolean();
 
       groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
 
@@ -654,6 +658,8 @@
       buffer.writeInt(reconnectAttempts);
 
       buffer.writeBoolean(failoverOnInitialConnection);
+      
+      buffer.writeBoolean(compressLargeMessage);
 
       BufferHelper.writeAsNullableSimpleString(buffer, groupID);
 
@@ -760,6 +766,9 @@
 
               DataConstants.SIZE_BOOLEAN +
               // failoverOnInitialConnection
+              
+              DataConstants.SIZE_BOOLEAN + 
+              // compress-large-message
 
               BufferHelper.sizeOfNullableSimpleString(groupID) +
 

Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -1079,6 +1079,7 @@
          cf.setMaxRetryInterval(cfConfig.getMaxRetryInterval());
          cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
          cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
+         cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
       }
       
       connectionFactories.put(cfConfig.getName(), cf);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -26,6 +26,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 
 /**
  * A LargeMessageCompressTest
@@ -45,14 +46,13 @@
       return false;
    }
 
-   protected ClientSessionFactory createSessionFactory() throws Exception
+   protected ServerLocator createFactory(final boolean isNetty) throws Exception
    {
-      ClientSessionFactory sf = locator.createSessionFactory();
-      sf.setCompressLargeMessages(true);
-      return sf;
+      ServerLocator locator = super.createFactory(isNetty);
+      locator.setCompressLargeMessage(true);
+      return locator;
    }
 
-
    public void testLargeMessageCompression() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -66,7 +66,6 @@
          server.start();
 
          ClientSessionFactory sf = locator.createSessionFactory();
-         sf.setCompressLargeMessages(true);
 
          session = sf.createSession(false, false, false);
 
@@ -134,7 +133,6 @@
          server.start();
 
          ClientSessionFactory sf = locator.createSessionFactory();
-         sf.setCompressLargeMessages(true);
 
          session = sf.createSession(false, false, false);
 
@@ -214,7 +212,6 @@
          server.start();
 
          ClientSessionFactory sf = locator.createSessionFactory();
-         sf.setCompressLargeMessages(true);
 
          session = sf.createSession(false, false, false);
 
@@ -280,7 +277,7 @@
    }
 
 
-   // below are large message tests that are not applied to compressed messages 
+   // TODO: Fix these tests on LargeMessageCompression
 
    public void testResendSmallStreamMessage() throws Exception
    {
@@ -306,5 +303,8 @@
    {
    }
 
-
+   public void testSendServerMessage() throws Exception
+   {
+      // doesn't make sense as compressed
+   }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -70,11 +70,6 @@
       return false;
    }
 
-   protected ClientSessionFactory createSessionFactory() throws Exception
-   {
-      return locator.createSessionFactory();
-   }
-   
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -87,7 +82,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -158,7 +153,7 @@
    public void doTestLargeBuffer(boolean transacted) throws Exception
    {
       final int journalsize = 100 * 1024;
-      final int messageSize = 3 * journalsize + 5;
+      final int messageSize = 3 * journalsize;
       // final int messageSize = 5 * 1024;
 
       ClientSession session = null;
@@ -175,9 +170,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
-         
-         sf.setCompressLargeMessages(true);
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(!transacted, !transacted, 0);
 
@@ -261,7 +254,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -314,7 +307,7 @@
 
          server.start();
 
-         sf = createSessionFactory();
+         sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -388,7 +381,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -471,7 +464,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -606,7 +599,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -682,7 +675,7 @@
 
          server.start();
 
-         sf = createSessionFactory();
+         sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -749,7 +742,7 @@
 
          server.getAddressSettingsRepository().addMatch("*", addressSettings);
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -791,7 +784,7 @@
 
          server.start();
 
-         sf = createSessionFactory();
+         sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -856,7 +849,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -894,7 +887,6 @@
          }
          catch (Throwable e)
          {
-            log.error("failed", e);
             failed = true;
          }
 
@@ -963,7 +955,7 @@
          
          locator.setCacheLargeMessagesClient(true);
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -1895,7 +1887,7 @@
 
          SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -1978,7 +1970,7 @@
 
          SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -2006,7 +1998,7 @@
 
             server.start();
 
-            sf = createSessionFactory();
+            sf = locator.createSessionFactory();
 
             session = sf.createSession(null, null, false, true, true, false, 0);
          }
@@ -2060,7 +2052,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(isXA, false, false);
 
@@ -2090,7 +2082,7 @@
             session.close();
             server.stop();
             server.start();
-            sf = createSessionFactory();
+            sf = locator.createSessionFactory();
             session = sf.createSession(isXA, false, false);
 
             session.rollback(xid);
@@ -2147,7 +2139,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(isXA, false, false);
 
@@ -2284,7 +2276,7 @@
          locator.setMinLargeMessageSize(1024);
          locator.setConsumerWindowSize(1024 * 1024);
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(null, null, false, false, false, false, 0);
 
@@ -2388,7 +2380,7 @@
          locator.setMinLargeMessageSize(1024);
          locator.setConsumerWindowSize(1024 * 1024);
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(null, null, false, false, false, false, 0);
 
@@ -2491,7 +2483,7 @@
 
          locator.setMinLargeMessageSize(100 * 1024);
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -2565,7 +2557,7 @@
 
          locator.setMinLargeMessageSize(1024);
 
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -2641,7 +2633,7 @@
 
       server.start();
 
-      ClientSessionFactory sf = createSessionFactory();
+      ClientSessionFactory sf = locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, false);
 
@@ -2695,77 +2687,6 @@
       }
    }
 
-   public void testLargeMessageCompression() throws Exception
-   {
-      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
-      ClientSession session = null;
-
-      try
-      {
-         server = createServer(true, isNetty());
-
-         server.start();
-
-         ClientSessionFactory sf = createSessionFactory();
-         sf.setCompressLargeMessages(true);
-
-         session = sf.createSession(false, false, false);
-
-         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
-
-         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
-         Message clientFile = createLargeClientMessage(session, messageSize, true);
-
-         producer.send(clientFile);
-
-         session.commit();
-
-         session.start();
-
-         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-         ClientMessage msg1 = consumer.receive(1000);
-         Assert.assertNotNull(msg1);
-         
-         for (int i = 0 ; i < messageSize; i++)
-         {
-            //System.out.print(msg1.getBodyBuffer().readByte() + "  ");
-            //if (i % 100 == 0) System.out.println();
-            byte b = msg1.getBodyBuffer().readByte();
-            //System.out.println("Byte read: " + (char)b + " i " + i);
-            assertEquals("position = "  + i, getSamplebyte(i), b);
-         }
-
-         msg1.acknowledge();
-         session.commit();
-
-         consumer.close();
-
-         session.close();
-
-         validateNoFilesOnLargeDir();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         try
-         {
-            session.close();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -2810,7 +2731,7 @@
 
       try
       {
-         ClientSessionFactory sf = createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
          if (sendBlocking)
          {
@@ -2855,7 +2776,7 @@
             server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
             server.start();
 
-            sf = createSessionFactory();
+            sf = locator.createSessionFactory();
          }
 
          session = sf.createSession(null, null, false, true, true, false, 0);

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-12-07 04:22:51 UTC (rev 10004)
@@ -974,6 +974,15 @@
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.message.impl.MessageInternal#isServerMessage()
+       */
+      public boolean isServerMessage()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list