[hornetq-commits] JBoss hornetq SVN: r10004 - in trunk: src/main/org/hornetq/core/client/impl and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Dec 6 23:22:51 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-06 23:22:51 -0500 (Mon, 06 Dec 2010)
New Revision: 10004
Removed:
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
Modified:
trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
HORNETQ-448 moving a few properties towards ServerLocator
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -137,8 +137,4 @@
ServerLocator getServerLocator();
CoreRemotingConnection getConnection();
-
- void setCompressLargeMessages(boolean compressLargeMessage);
-
- boolean isCompressLargeMessages();
}
Modified: trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -574,6 +574,10 @@
void close();
boolean isHA();
+
+ boolean isCompressLargeMessage();
+
+ void setCompressLargeMessage(boolean compress);
void addClusterTopologyListener(ClusterTopologyListener listener);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -73,6 +73,11 @@
super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
}
+ public boolean isServerMessage()
+ {
+ return false;
+ }
+
public void onReceipt(final ClientConsumerInternal consumer)
{
this.consumer = consumer;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
@@ -363,6 +364,12 @@
InputStream input = msgI.getBodyInputStream();
+
+ if (msgI.isServerMessage())
+ {
+ largeMessageSendServer(sendBlocking, msgI, credits);
+ }
+ else
if (input != null)
{
largeMessageSendStreamed(sendBlocking, msgI, input, credits);
@@ -372,7 +379,72 @@
largeMessageSendBuffered(sendBlocking, msgI, credits);
}
}
+
+ /**
+ * Used to send serverMessages through the bridges.
+ * No need to validate compression here since the message is only compressed at the client
+ * @param sendBlocking
+ * @param msgI
+ * @throws HornetQException
+ */
+ private void largeMessageSendServer(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws HornetQException
+ {
+ BodyEncoder context = msgI.getBodyEncoder();
+ final long bodySize = context.getLargeBodySize();
+
+ context.open();
+ try
+ {
+
+ for (int pos = 0; pos < bodySize;)
+ {
+ final boolean lastChunk;
+
+ final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+
+ final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
+
+ context.encode(bodyBuffer, chunkLength);
+
+ pos += chunkLength;
+
+ lastChunk = pos >= bodySize;
+
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ .array(),
+ !lastChunk,
+ lastChunk && sendBlocking);
+
+ if (sendBlocking && lastChunk)
+ {
+ // When sending it blocking, only the last chunk will be blocking.
+ channel.sendBlocking(chunk);
+ }
+ else
+ {
+ channel.send(chunk);
+ }
+
+ try
+ {
+ credits.acquireCredits(chunk.getPacketSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ finally
+ {
+ context.close();
+ }
+ }
+
+
+
/**
* @param sendBlocking
* @param msgI
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -147,8 +147,6 @@
public final Exception e = new Exception();
private final Object waitLock = new Object();
-
- private boolean compressLargeMessages;
// Static
// ---------------------------------------------------------------------------------------
@@ -205,9 +203,7 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
-
- compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
+
}
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
@@ -773,7 +769,7 @@
serverLocator.isBlockOnDurableSend(),
serverLocator.isCacheLargeMessagesClient(),
serverLocator.getMinLargeMessageSize(),
- compressLargeMessages,
+ serverLocator.isCompressLargeMessage(),
serverLocator.getInitialMessagePacketSize(),
serverLocator.getGroupID(),
connection,
@@ -1364,14 +1360,4 @@
cancelled = true;
}
}
-
- public void setCompressLargeMessages(boolean compressLargeMessage)
- {
- this.compressLargeMessages = compressLargeMessage;
- }
-
- public boolean isCompressLargeMessages()
- {
- return this.compressLargeMessages;
- }
}
Deleted: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-
-/**
- * A ConnectionManager
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 27 Nov 2008 18:45:46
- *
- *
- */
-public interface FailoverManager
-{
- ClientSession createSession(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize,
- final boolean cacheLargeMessageClient,
- final int minLargeMessageSize,
- final boolean compressLargeMessages,
- final boolean blockOnAcknowledge,
- final boolean autoGroup,
- final int confirmationWindowSize,
- final int producerWindowSize,
- final int consumerWindowSize,
- final int producerMaxRate,
- final int consumerMaxRate,
- final boolean blockOnNonDurableSend,
- final boolean blockOnDurableSend,
- final int initialMessagePacketSize,
- final String groupID) throws HornetQException;
-
- void removeSession(final ClientSessionInternal session);
-
- public CoreRemotingConnection getConnection();
-
- int numConnections();
-
- int numSessions();
-
- void addFailureListener(SessionFailureListener listener);
-
- boolean removeFailureListener(SessionFailureListener listener);
-
- void causeExit();
-
-}
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -68,6 +68,8 @@
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
private boolean receivedTopology;
+
+ private boolean compressLargeMessage;
private ExecutorService threadPool;
@@ -359,6 +361,8 @@
initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
clusterConnection = false;
}
@@ -917,6 +921,22 @@
return groupID;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
+ */
+ public boolean isCompressLargeMessage()
+ {
+ return compressLargeMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
+ */
+ public void setCompressLargeMessage(boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
private void checkWrite()
{
if (readOnly)
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -43,6 +43,8 @@
void bodyChanged();
void resetCopied();
+
+ boolean isServerMessage();
HornetQBuffer getEncodedBuffer();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -89,6 +89,11 @@
{
super(other);
}
+
+ public boolean isServerMessage()
+ {
+ return true;
+ }
public void setMessageID(final long id)
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -533,6 +533,16 @@
return serverLocator.getGroupID();
}
+ public boolean isCompressLargeMessage()
+ {
+ return serverLocator.isCompressLargeMessage();
+ }
+
+ public void setCompressLargeMessage(boolean compress)
+ {
+ serverLocator.setCompressLargeMessage(compress);
+ }
+
public void close()
{
serverLocator.close();
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -17,17 +17,19 @@
import java.util.List;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
-import org.hornetq.api.core.SimpleString;;
-
/**
- * A ConnectionFactoryConfigurationImpl
+ * This class contains the configuration properties of a connection factory.
+ *
+ * It is also persisted on the journal at the time of management is used to created a connection factory and set to store.
+ *
+ * Every property on this class has to be also set through encoders through EncodingSupport implementation at this class.
*
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -570,6 +572,8 @@
reconnectAttempts = buffer.readInt();
failoverOnInitialConnection = buffer.readBoolean();
+
+ compressLargeMessage = buffer.readBoolean();
groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
@@ -654,6 +658,8 @@
buffer.writeInt(reconnectAttempts);
buffer.writeBoolean(failoverOnInitialConnection);
+
+ buffer.writeBoolean(compressLargeMessage);
BufferHelper.writeAsNullableSimpleString(buffer, groupID);
@@ -760,6 +766,9 @@
DataConstants.SIZE_BOOLEAN +
// failoverOnInitialConnection
+
+ DataConstants.SIZE_BOOLEAN +
+ // compress-large-message
BufferHelper.sizeOfNullableSimpleString(groupID) +
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -1079,6 +1079,7 @@
cf.setMaxRetryInterval(cfConfig.getMaxRetryInterval());
cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
+ cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
}
connectionFactories.put(cfConfig.getName(), cf);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
/**
* A LargeMessageCompressTest
@@ -45,14 +46,13 @@
return false;
}
- protected ClientSessionFactory createSessionFactory() throws Exception
+ protected ServerLocator createFactory(final boolean isNetty) throws Exception
{
- ClientSessionFactory sf = locator.createSessionFactory();
- sf.setCompressLargeMessages(true);
- return sf;
+ ServerLocator locator = super.createFactory(isNetty);
+ locator.setCompressLargeMessage(true);
+ return locator;
}
-
public void testLargeMessageCompression() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -66,7 +66,6 @@
server.start();
ClientSessionFactory sf = locator.createSessionFactory();
- sf.setCompressLargeMessages(true);
session = sf.createSession(false, false, false);
@@ -134,7 +133,6 @@
server.start();
ClientSessionFactory sf = locator.createSessionFactory();
- sf.setCompressLargeMessages(true);
session = sf.createSession(false, false, false);
@@ -214,7 +212,6 @@
server.start();
ClientSessionFactory sf = locator.createSessionFactory();
- sf.setCompressLargeMessages(true);
session = sf.createSession(false, false, false);
@@ -280,7 +277,7 @@
}
- // below are large message tests that are not applied to compressed messages
+ // TODO: Fix these tests on LargeMessageCompression
public void testResendSmallStreamMessage() throws Exception
{
@@ -306,5 +303,8 @@
{
}
-
+ public void testSendServerMessage() throws Exception
+ {
+ // doesn't make sense as compressed
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -70,11 +70,6 @@
return false;
}
- protected ClientSessionFactory createSessionFactory() throws Exception
- {
- return locator.createSessionFactory();
- }
-
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -87,7 +82,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -158,7 +153,7 @@
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
- final int messageSize = 3 * journalsize + 5;
+ final int messageSize = 3 * journalsize;
// final int messageSize = 5 * 1024;
ClientSession session = null;
@@ -175,9 +170,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
-
- sf.setCompressLargeMessages(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(!transacted, !transacted, 0);
@@ -261,7 +254,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -314,7 +307,7 @@
server.start();
- sf = createSessionFactory();
+ sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -388,7 +381,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -471,7 +464,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -606,7 +599,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -682,7 +675,7 @@
server.start();
- sf = createSessionFactory();
+ sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -749,7 +742,7 @@
server.getAddressSettingsRepository().addMatch("*", addressSettings);
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -791,7 +784,7 @@
server.start();
- sf = createSessionFactory();
+ sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -856,7 +849,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -894,7 +887,6 @@
}
catch (Throwable e)
{
- log.error("failed", e);
failed = true;
}
@@ -963,7 +955,7 @@
locator.setCacheLargeMessagesClient(true);
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
@@ -1895,7 +1887,7 @@
SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1978,7 +1970,7 @@
SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2006,7 +1998,7 @@
server.start();
- sf = createSessionFactory();
+ sf = locator.createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
}
@@ -2060,7 +2052,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(isXA, false, false);
@@ -2090,7 +2082,7 @@
session.close();
server.stop();
server.start();
- sf = createSessionFactory();
+ sf = locator.createSessionFactory();
session = sf.createSession(isXA, false, false);
session.rollback(xid);
@@ -2147,7 +2139,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(isXA, false, false);
@@ -2284,7 +2276,7 @@
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(null, null, false, false, false, false, 0);
@@ -2388,7 +2380,7 @@
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(null, null, false, false, false, false, 0);
@@ -2491,7 +2483,7 @@
locator.setMinLargeMessageSize(100 * 1024);
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2565,7 +2557,7 @@
locator.setMinLargeMessageSize(1024);
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2641,7 +2633,7 @@
server.start();
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false);
@@ -2695,77 +2687,6 @@
}
}
- public void testLargeMessageCompression() throws Exception
- {
- final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
- ClientSession session = null;
-
- try
- {
- server = createServer(true, isNetty());
-
- server.start();
-
- ClientSessionFactory sf = createSessionFactory();
- sf.setCompressLargeMessages(true);
-
- session = sf.createSession(false, false, false);
-
- session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
-
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
- Message clientFile = createLargeClientMessage(session, messageSize, true);
-
- producer.send(clientFile);
-
- session.commit();
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
- ClientMessage msg1 = consumer.receive(1000);
- Assert.assertNotNull(msg1);
-
- for (int i = 0 ; i < messageSize; i++)
- {
- //System.out.print(msg1.getBodyBuffer().readByte() + " ");
- //if (i % 100 == 0) System.out.println();
- byte b = msg1.getBodyBuffer().readByte();
- //System.out.println("Byte read: " + (char)b + " i " + i);
- assertEquals("position = " + i, getSamplebyte(i), b);
- }
-
- msg1.acknowledge();
- session.commit();
-
- consumer.close();
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -2810,7 +2731,7 @@
try
{
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
if (sendBlocking)
{
@@ -2855,7 +2776,7 @@
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
- sf = createSessionFactory();
+ sf = locator.createSessionFactory();
}
session = sf.createSession(null, null, false, true, true, false, 0);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-12-06 16:57:19 UTC (rev 10003)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-12-07 04:22:51 UTC (rev 10004)
@@ -974,6 +974,15 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.impl.MessageInternal#isServerMessage()
+ */
+ public boolean isServerMessage()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
class FakeFilter implements Filter
More information about the hornetq-commits
mailing list